每个 rpc 框架基本都有自己的编解码器,下面我们就来说说 grpc 的编解码过程。

我们还是从我们的 examples 目录下的 helloworld demo 中 server 的 main 函数入手

在 s.Serve(lis) ——> s.handleRawConn(rawConn) —— > s.serveStreams(st) ——> s.handleStream(st, stream, s.traceInfo(st, stream)) ——> s.processUnaryRPC(t, stream, srv, md, trInfo) 方法中有一段代码:

  1. ...
  2. df := func(v interface{}) error {
  3. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
  4. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  5. }
  6. if sh != nil {
  7. sh.HandleRPC(stream.Context(), &stats.InPayload{
  8. RecvTime: time.Now(),
  9. Payload: v,
  10. WireLength: payInfo.wireLength,
  11. Data: d,
  12. Length: len(d),
  13. })
  14. }
  15. if binlog != nil {
  16. binlog.Log(&binarylog.ClientMessage{
  17. Message: d,
  18. })
  19. }
  20. if trInfo != nil {
  21. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  22. }
  23. return nil
  24. }

这段代码的逻辑先调 getCodec 获取解包类,然后调用这个类的 Unmarshal 方法进行解包。将业务数据取出来,然后调用 handler 进行处理。

  1. func (s *Server) getCodec(contentSubtype string) baseCodec {
  2. if s.opts.codec != nil {
  3. return s.opts.codec
  4. }
  5. if contentSubtype == "" {
  6. return encoding.GetCodec(proto.Name)
  7. }
  8. codec := encoding.GetCodec(contentSubtype)
  9. if codec == nil {
  10. return encoding.GetCodec(proto.Name)
  11. }
  12. return codec
  13. }

我们来看 getCodec 这个方法,它是通过 contentSubtype 这个字段来获取解包类的。假如不设置 contentSubtype ,那么默认会用名字为 proto 的解码器。

我们来看看 contentSubtype 是如何设置的。之前说到了 grpc 的底层默认是基于 http2 的。在 serveHttp 时调用了 NewServerHandlerTransport 这个方法来创建一个 ServerTransport,然后我们发现,其实就是根据 content-type 这个字段去生成的。

  1. func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
  2. ...
  3. contentType := r.Header.Get("Content-Type")
  4. // TODO: do we assume contentType is lowercase? we did before
  5. contentSubtype, validContentType := contentSubtype(contentType)
  6. if !validContentType {
  7. return nil, errors.New("invalid gRPC request content-type")
  8. }
  9. if _, ok := w.(http.Flusher); !ok {
  10. return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
  11. rw: w,
  12. req: r,
  13. closedCh: make(chan struct{}),
  14. writes: make(chan func()),
  15. contentType: contentType,
  16. contentSubtype: contentSubtype,
  17. stats: stats,
  18. }
  19. }
  1. ...
  2. baseContentType = "application/grpc"
  3. ...
  4. func contentSubtype(contentType string) (string, bool) {
  5. if contentType == baseContentType {
  6. return "", true
  7. }
  8. if !strings.HasPrefix(contentType, baseContentType) {
  9. return "", false
  10. }
  11. // guaranteed since != baseContentType and has baseContentType prefix
  12. switch contentType[len(baseContentType)] {
  13. case '+', ';':
  14. // this will return true for "application/grpc+" or "application/grpc;"
  15. // which the previous validContentType function tested to be valid, so we
  16. // just say that no content-subtype is specified in this case
  17. return contentType[len(baseContentType)+1:], true
  18. default:
  19. return "", false
  20. }
  21. }

可以看到 grpc 协议默认以 application/grpc 开头,假如不一这个开头会返回错误,假如我们想使用 json 的解码器,应该设置 content-type = application/grpc+json 。下面是一个基于 grpc 协议的请求 request :

详细可参考

怎么拿的呢,再看一下 encoding.getCodec 方法

  1. func GetCodec(contentSubtype string) Codec {
  2. return registeredCodecs[contentSubtype]
  3. }

它其实取得是 registeredCodecs 这个 map 中的 codec,这个 map 是 RegisterCodec 方法注册进去的。

  1. var registeredCodecs = make(map[string]Codec)
  2. func RegisterCodec(codec Codec) {
  3. if codec == nil {
  4. panic("cannot register a nil Codec")
  5. }
  6. if codec.Name() == "" {
  7. panic("cannot register Codec with empty string result for Name()")
  8. }
  9. contentSubtype := strings.ToLower(codec.Name())
  10. registeredCodecs[contentSubtype] = codec
  11. }

毫无疑问, encoding 目录的 proto 包下肯定在初始化时调用注册方法了。果然

  1. func init() {
  2. encoding.RegisterCodec(codec{})
  3. }

绕了一圈,调用的其实是 proto 的 Unmarshal 方法,如下:

  1. func (codec) Unmarshal(data []byte, v interface{}) error {
  2. if pu, ok := protoMsg.(proto.Unmarshaler); ok {
  3. // object can unmarshal itself, no need for buffer
  4. return pu.Unmarshal(data)
  5. }
  6. cb := protoBufferPool.Get().(*cachedProtoBuffer)
  7. cb.SetBuf(data)
  8. err := cb.Unmarshal(protoMsg)
  9. cb.SetBuf(nil)
  10. protoBufferPool.Put(cb)
  11. return err
  12. }

于是我们很快就找到了调用路径,也是这个路径:

s.Serve(lis) ——> s.handleRawConn(rawConn) —— > s.serveStreams(st) ——> s.handleStream(st, stream, s.traceInfo(st, stream)) ——> s.processUnaryRPC(t, stream, srv, md, trInfo)

processUnaryRPC 方法中有一段 server 发送响应数据的代码。其实也就是这一行:

我们其实也能猜到,发送数据给 client 之前肯定要编码。果然调用了 encode 方法

  1. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  2. data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
  3. if err != nil {
  4. grpclog.Errorln("grpc: server failed to encode response: ", err)
  5. return err
  6. }
  7. ...
  8. }

来看一下 encode

  1. func encode(c baseCodec, msg interface{}) ([]byte, error) {
  2. if msg == nil { // NOTE: typed nils will not be caught by this check
  3. return nil, nil
  4. }
  5. b, err := c.Marshal(msg)
  6. if err != nil {
  7. return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
  8. }
  9. if uint(len(b)) > math.MaxUint32 {
  10. return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
  11. }
  12. return b, nil
  13. }

它调用了 c.Marshal 方法, Marshal 方法其实是 baseCodec 定义的一个通用抽象方法

  1. type baseCodec interface {
  2. Marshal(v interface{}) ([]byte, error)
  3. Unmarshal(data []byte, v interface{}) error
  4. }
  1. func (codec) Marshal(v interface{}) ([]byte, error) {
  2. if pm, ok := v.(proto.Marshaler); ok {
  3. // object can marshal itself, no need for buffer
  4. return pm.Marshal()
  5. }
  6. cb := protoBufferPool.Get().(*cachedProtoBuffer)
  7. out, err := marshal(v, cb)
  8. // put back buffer and lose the ref to the slice
  9. cb.SetBuf(nil)
  10. protoBufferPool.Put(cb)
  11. }

ok,那么至此,grpc 的整个编解码的流程我们就已经剖析完了