基础概念
- 通信层:TCP/TPC_SSL/HTTP/HTTPS/HTTP2
- 协议层:Thrift-binary/BRPC-std/SogouRPC-std
- 压缩层:不压缩/gzip/zlib/lz4/snappy
- 数据层:PB binary/Thrift binary/Json string
- IDL序列化层:PB/Thrift serialization
- RPC调用层:Service/Client IMPL
RPC Global
- 获取srpc版本号
srpc::SRPCGlobal::get_instance()->get_srpc_version()
RPC IDL
- 描述文件
- 前后兼容
- Protobuf/Thrift
- 我们拿pb举例,定义一个ServiceName为
Example
的example.proto
文件 - rpc接口名为
Echo
,输入参数为EchoRequest
,输出参数为EchoResponse
EchoRequest
包括两个string:message
和name
EchoResponse
包括一个string:message
RPC Service
- 组成sogouRPC服务的基本单元
- 每一个Service一定由某一种IDL生成
- Service只与IDL有关,与网络通信具体协议无关
示例
下面我们通过一个具体例子来呈现
- 沿用上面的
example.proto
IDL描述文件 - 执行官方的
protoc example.proto --cpp_out=./ --proto_path=./
获得example.pb.h
和example.pb.cpp
两个文件 - 执行SogouRPC的
srpc_generator protobuf ./example.proto ./
获得example.srpc.h
文件 - 我们派生
Example::Service
来实现具体的rpc业务逻辑,这就是一个RPC Service
class ExampleServiceImpl : public Example::Service
{
public:
void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
{
response->set_message("Hi, " + request->name());
printf("get_req:\n%s\nset_resp:\n%s\n",
request->DebugString().c_str(),
response->DebugString().c_str());
}
};
- 每一个Server对应一个端口
- 每一个Server对应一个确定的网络通信协议
- 每一个Service可以添加到任意的Server里
- 每一个Server可以拥有任意的Service,但在当前Server里ServiceName必须唯一
- 不同IDL的Service是可以放进同一个Server中的
示例
下面我们通过一个具体例子来呈现
- 沿用上面的
ExampleServiceImpl
Service - 首先,我们创建1个RPC Server、需要确定协议
- 然后,我们可以创建任意个数的Service实例、任意不同proto形成的Service,把这些Service通过
add_service()
接口添加到Server里 - 最后,通过Server的或者
serve
开启服务,处理即将到来的rpc请求 - 想像一下,我们也可以从
Example::Service
派生更多的功能的rpcEcho
不同实现的Service - 想像一下,我们可以在N个不同的端口创建N个不同的RPC Server、代表着不同的协议
- 想像一下,我们可以把同一个ServiceIMPL实例
add_service
到不同的Server上,我们也可以把不同的ServiceIMPL实例add_service
到同一个Server上 - 想像一下,我们可以用同一个
ExampleServiceImpl
,在三个不同端口、同时服务于BPRC-STD、SogouRPC-STD、SogouRPC-Http - 甚至,我们可以将1个PB的
ExampleServiceImpl
和1个Thrift的AnotherThriftServiceImpl
,add_service
到同一个SogouRPC-STD Server,两种IDL在同一个端口上完美工作!
RPC Client
- 每一个Client对应着一个确定的目标/一个确定的集群
- 每一个Client对应着一个确定的网络通信协议
- 每一个Client对应着一个确定的IDL
示例
下面我们通过一个具体例子来呈现
- 沿用上面的例子,client相对简单,直接调用即可
- 通过
Example::XXXClient
创建某种RPC的client实例,需要目标的ip+port或url - 利用client实例直接调用rpc函数
Echo
即可,这是一次异步请求,请求完成后会进入回调函数 - 具体的RPC Context用法请看下一个段落
#include <stdio.h>
#include "example.srpc.h"
using namespace srpc;
int main()
{
Example::SRPCClient client("127.0.0.1", 1412);
EchoRequest req;
req.set_message("Hello, sogou rpc!");
req.set_name("Li Yingxin");
client.Echo(&req, [](EchoResponse *response, RPCContext *ctx) {
if (ctx->success())
printf("%s\n", response->DebugString().c_str());
else
printf("status[%d] error[%d] errmsg:%s\n",
ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
});
pause();
return 0;
}
RPC Context
- RPCContext专门用来辅助异步接口,Service和Client通用
- 每一个异步接口都会提供Context,用来给用户提供更高级的功能,比如获取对方ip、获取连接seqid等
- Context上一些功能是Server或Client独有的,比如Server可以设置回复数据的压缩方式,Client可以获取请求成功或失败
- Context上可以通过get_series获得所在的series,与workflow的异步模式无缝结合
long long get_seqid() const;
请求+回复视为1次完整通信,获得当前socket连接上的通信sequence id,seqid=0代表第1次
std::string get_remote_ip() const;
获得对方IP地址,支持ipv4/ipv6
int get_peer_addr(struct sockaddr *addr, socklen_t *addrlen) const;
获得对方地址,in/out参数为更底层的数据结构sockaddr
const std::string& get_service_name() const;
获取RPC Service Name
const std::string& get_method_name() const;
SeriesWork *get_series() const;
获取当前ServerTask/ClientTask所在series
RPCContext API - Only for client done
bool success() const;
client专用。这次请求是否成功
int get_status_code() const;
client专用。这次请求的rpc status code
const char *get_errmsg() const;
client专用。这次请求的错误信息
int get_error() const;
client专用。这次请求的错误码
void *get_user_data() const;
client专用。获取ClientTask的user_data。如果用户通过create_xxx_task接口产生task,则可以通过user_data域记录上下文,在创建task时设置,在回调函数中拿回。
RPCContext API - Only for server process
void set_data_type(RPCDataType type);
Server专用。设置数据打包类型
- RPCDataProtobuf
- RPCDataJson
void set_compress_type(RPCCompressType type);
- RPCCompressNone
- RPCCompressSnappy
- RPCCompressGzip
- RPCCompressZlib
- RPCCompressLz4
void set_attachment_nocopy(const char *attachment, size_t len);
Server专用。设置attachment附件。
bool get_attachment(const char **attachment, size_t *len) const;
Server专用。获取attachment附件。
void set_reply_callback(std::function<void (RPCContext *ctx)> cb);
Server专用。设置reply callback,操作系统写入socket缓冲区成功后被调用。
void set_send_timeout(int timeout);
Server专用。设置发送超时,单位毫秒。-1代表无限。
Server专用。设置连接保活时间,单位毫秒。-1代表无限。
Server Params
Task Params
与workflow异步框架的结合
Server
下面我们通过一个具体例子来呈现
- Echo RPC在接收到请求时,向下游发起一次http请求
- 对下游请求完成后,我们将http response的body信息填充到response的message里,回复给客户端
- 我们不希望阻塞/占据着Handler的线程,所以对下游的请求一定是一次异步请求
- 首先,我们通过Workflow框架的工厂
WFTaskFactory::create_http_task
创建一个异步任务http_task - 然后,我们利用RPCContext的
ctx->get_series()
获取到ServerTask所在的SeriesWork - 最后,我们使用SeriesWork的
push_back
接口将http_task放到SeriesWork的后面
Client
下面我们通过一个具体例子来呈现
- 我们并行发出两个请求,1个是rpc请求,1个是http请求
- 两个请求都结束后,我们再发起一次计算任务,计算两个数的平方和
- 首先,我们通过RPC Client的
create_Echo_task
创建一个rpc异步请求的网络任务rpc_task - 然后,我们通过Workflow框架的工厂
WFTaskFactory::create_http_task
和WFTaskFactory::create_go_task
分别创建异步网络任务http_task,和异步计算任务calc_task - 最后,我们利用串并联流程图,乘号代表并行、大于号代表串行,将3个异步任务组合起来执行start
void calc(int x, int y)
{
int z = x * x + y * y;
printf("calc result: %d\n", z);
}
int main()
{
Example::SRPCClient client("127.0.0.1", 1412);
auto *rpc_task = client.create_Echo_task([](EchoResponse *response, RPCContext *ctx) {
if (ctx->success())
printf("%s\n", response->DebugString().c_str());
else
printf("status[%d] error[%d] errmsg:%s\n",
ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
});
auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0, [](WFHttpTask *task) {
if (task->get_state() == WFT_STATE_SUCCESS)
{
std::string body;
const void *data;
size_t len;
task->get_resp()->get_parsed_body(&data, &len);
body.assign((const char *)data, len);
printf("%s\n\n", body.c_str());
}
else
printf("Http request fail\n\n");
});
auto *calc_task = WFTaskFactory::create_go_task(calc, 3, 4);
EchoRequest req;
req.set_message("Hello, sogou rpc!");
req.set_name("1412");
rpc_task->serialize_input(&req);
((*http_task * rpc_task) > calc_task).start();
pause();