基础概念

  • 通信层:TCP/TPC_SSL/HTTP/HTTPS/HTTP2
  • 协议层:Thrift-binary/BRPC-std/SogouRPC-std
  • 压缩层:不压缩/gzip/zlib/lz4/snappy
  • 数据层:PB binary/Thrift binary/Json string
  • IDL序列化层:PB/Thrift serialization
  • RPC调用层:Service/Client IMPL

RPC Global

  • 获取srpc版本号srpc::SRPCGlobal::get_instance()->get_srpc_version()

RPC IDL

  • 描述文件
  • 前后兼容
  • Protobuf/Thrift
  • 我们拿pb举例,定义一个ServiceName为Exampleexample.proto文件
  • rpc接口名为Echo,输入参数为EchoRequest,输出参数为EchoResponse
  • EchoRequest包括两个string:messagename
  • EchoResponse包括一个string:message

RPC Service

  • 组成sogouRPC服务的基本单元
  • 每一个Service一定由某一种IDL生成
  • Service只与IDL有关,与网络通信具体协议无关

示例

下面我们通过一个具体例子来呈现

  • 沿用上面的example.protoIDL描述文件
  • 执行官方的protoc example.proto --cpp_out=./ --proto_path=./获得example.pb.hexample.pb.cpp两个文件
  • 执行SogouRPC的srpc_generator protobuf ./example.proto ./获得example.srpc.h文件
  • 我们派生Example::Service来实现具体的rpc业务逻辑,这就是一个RPC Service
  1. class ExampleServiceImpl : public Example::Service
  2. {
  3. public:
  4. void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
  5. {
  6. response->set_message("Hi, " + request->name());
  7. printf("get_req:\n%s\nset_resp:\n%s\n",
  8. request->DebugString().c_str(),
  9. response->DebugString().c_str());
  10. }
  11. };
  • 每一个Server对应一个端口
  • 每一个Server对应一个确定的网络通信协议
  • 每一个Service可以添加到任意的Server里
  • 每一个Server可以拥有任意的Service,但在当前Server里ServiceName必须唯一
  • 不同IDL的Service是可以放进同一个Server中的

示例

下面我们通过一个具体例子来呈现

  • 沿用上面的ExampleServiceImplService
  • 首先,我们创建1个RPC Server、需要确定协议
  • 然后,我们可以创建任意个数的Service实例、任意不同proto形成的Service,把这些Service通过add_service()接口添加到Server里
  • 最后,通过Server的或者serve开启服务,处理即将到来的rpc请求
  • 想像一下,我们也可以从Example::Service派生更多的功能的rpcEcho不同实现的Service
  • 想像一下,我们可以在N个不同的端口创建N个不同的RPC Server、代表着不同的协议
  • 想像一下,我们可以把同一个ServiceIMPL实例add_service到不同的Server上,我们也可以把不同的ServiceIMPL实例add_service到同一个Server上
  • 想像一下,我们可以用同一个ExampleServiceImpl,在三个不同端口、同时服务于BPRC-STD、SogouRPC-STD、SogouRPC-Http
  • 甚至,我们可以将1个PB的ExampleServiceImpl和1个Thrift的AnotherThriftServiceImpladd_service到同一个SogouRPC-STD Server,两种IDL在同一个端口上完美工作!

RPC Client

  • 每一个Client对应着一个确定的目标/一个确定的集群
  • 每一个Client对应着一个确定的网络通信协议
  • 每一个Client对应着一个确定的IDL

示例

下面我们通过一个具体例子来呈现

  • 沿用上面的例子,client相对简单,直接调用即可
  • 通过Example::XXXClient创建某种RPC的client实例,需要目标的ip+port或url
  • 利用client实例直接调用rpc函数Echo即可,这是一次异步请求,请求完成后会进入回调函数
  • 具体的RPC Context用法请看下一个段落
  1. #include <stdio.h>
  2. #include "example.srpc.h"
  3. using namespace srpc;
  4. int main()
  5. {
  6. Example::SRPCClient client("127.0.0.1", 1412);
  7. EchoRequest req;
  8. req.set_message("Hello, sogou rpc!");
  9. req.set_name("Li Yingxin");
  10. client.Echo(&req, [](EchoResponse *response, RPCContext *ctx) {
  11. if (ctx->success())
  12. printf("%s\n", response->DebugString().c_str());
  13. else
  14. printf("status[%d] error[%d] errmsg:%s\n",
  15. ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  16. });
  17. pause();
  18. return 0;
  19. }

RPC Context

  • RPCContext专门用来辅助异步接口,Service和Client通用
  • 每一个异步接口都会提供Context,用来给用户提供更高级的功能,比如获取对方ip、获取连接seqid等
  • Context上一些功能是Server或Client独有的,比如Server可以设置回复数据的压缩方式,Client可以获取请求成功或失败
  • Context上可以通过get_series获得所在的series,与workflow的异步模式无缝结合

long long get_seqid() const;

请求+回复视为1次完整通信,获得当前socket连接上的通信sequence id,seqid=0代表第1次

std::string get_remote_ip() const;

获得对方IP地址,支持ipv4/ipv6

int get_peer_addr(struct sockaddr *addr, socklen_t *addrlen) const;

获得对方地址,in/out参数为更底层的数据结构sockaddr

const std::string& get_service_name() const;

获取RPC Service Name

const std::string& get_method_name() const;

SeriesWork *get_series() const;

获取当前ServerTask/ClientTask所在series

RPCContext API - Only for client done

bool success() const;

client专用。这次请求是否成功

int get_status_code() const;

client专用。这次请求的rpc status code

const char *get_errmsg() const;

client专用。这次请求的错误信息

int get_error() const;

client专用。这次请求的错误码

void *get_user_data() const;

client专用。获取ClientTask的user_data。如果用户通过create_xxx_task接口产生task,则可以通过user_data域记录上下文,在创建task时设置,在回调函数中拿回。

RPCContext API - Only for server process

void set_data_type(RPCDataType type);

Server专用。设置数据打包类型

  • RPCDataProtobuf
  • RPCDataJson

void set_compress_type(RPCCompressType type);

  • RPCCompressNone
  • RPCCompressSnappy
  • RPCCompressGzip
  • RPCCompressZlib
  • RPCCompressLz4

void set_attachment_nocopy(const char *attachment, size_t len);

Server专用。设置attachment附件。

bool get_attachment(const char **attachment, size_t *len) const;

Server专用。获取attachment附件。

void set_reply_callback(std::function<void (RPCContext *ctx)> cb);

Server专用。设置reply callback,操作系统写入socket缓冲区成功后被调用。

void set_send_timeout(int timeout);

Server专用。设置发送超时,单位毫秒。-1代表无限。

Server专用。设置连接保活时间,单位毫秒。-1代表无限。

Server Params

Task Params

与workflow异步框架的结合

Server

下面我们通过一个具体例子来呈现

  • Echo RPC在接收到请求时,向下游发起一次http请求
  • 对下游请求完成后,我们将http response的body信息填充到response的message里,回复给客户端
  • 我们不希望阻塞/占据着Handler的线程,所以对下游的请求一定是一次异步请求
  • 首先,我们通过Workflow框架的工厂WFTaskFactory::create_http_task创建一个异步任务http_task
  • 然后,我们利用RPCContext的ctx->get_series()获取到ServerTask所在的SeriesWork
  • 最后,我们使用SeriesWork的push_back接口将http_task放到SeriesWork的后面

Client

下面我们通过一个具体例子来呈现

  • 我们并行发出两个请求,1个是rpc请求,1个是http请求
  • 两个请求都结束后,我们再发起一次计算任务,计算两个数的平方和
  • 首先,我们通过RPC Client的create_Echo_task创建一个rpc异步请求的网络任务rpc_task
  • 然后,我们通过Workflow框架的工厂WFTaskFactory::create_http_taskWFTaskFactory::create_go_task分别创建异步网络任务http_task,和异步计算任务calc_task
  • 最后,我们利用串并联流程图,乘号代表并行、大于号代表串行,将3个异步任务组合起来执行start
  1. void calc(int x, int y)
  2. {
  3. int z = x * x + y * y;
  4. printf("calc result: %d\n", z);
  5. }
  6. int main()
  7. {
  8. Example::SRPCClient client("127.0.0.1", 1412);
  9. auto *rpc_task = client.create_Echo_task([](EchoResponse *response, RPCContext *ctx) {
  10. if (ctx->success())
  11. printf("%s\n", response->DebugString().c_str());
  12. else
  13. printf("status[%d] error[%d] errmsg:%s\n",
  14. ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  15. });
  16. auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0, [](WFHttpTask *task) {
  17. if (task->get_state() == WFT_STATE_SUCCESS)
  18. {
  19. std::string body;
  20. const void *data;
  21. size_t len;
  22. task->get_resp()->get_parsed_body(&data, &len);
  23. body.assign((const char *)data, len);
  24. printf("%s\n\n", body.c_str());
  25. }
  26. else
  27. printf("Http request fail\n\n");
  28. });
  29. auto *calc_task = WFTaskFactory::create_go_task(calc, 3, 4);
  30. EchoRequest req;
  31. req.set_message("Hello, sogou rpc!");
  32. req.set_name("1412");
  33. rpc_task->serialize_input(&req);
  34. ((*http_task * rpc_task) > calc_task).start();
  35. pause();