我们知道 grpc 传输层是基于 http2 协议规范,所以我们先要了解 http2 协议帧的格式。http 2 协议帧格式如下:

对于一个网络包而言,首先要知道这个包的格式,然后才能按照约定的格式解析出这个包。那么 grpc 的包是什么样的格式呢? 看了源码后,先直接揭晓出来,它其实是这样的

http 帧格式为:length (3 byte) + type(1 byte) + flag (1 byte) + R (1 bit) + stream identifier (31 bit) + paypoad,payload 是消息具体内容

前 9 个字节是 http 包头,length 表示消息长度,type 表示 http 帧的类型,http 一共规定了 10 种帧类型:

  • HEADERS帧 头信息,对应于HTTP HEADER
  • DATA帧 对应于HTTP Response Body
  • PRIORITY帧 用于调整流的优先级
  • RST_STREAM帧 流终止帧,用于中断资源的传输
  • SETTINGS帧 用户客户服务器交流连接配置信息
  • PUSH_PROMISE帧 服务器向客户端主动推送资源
  • GOAWAY帧 通知对方断开连接
  • PING帧 心跳帧,检测往返时间和连接可用性
  • WINDOW_UPDATE帧 调整帧大小
  • CONTINUATION帧 HEADERS太大时的续帧

flag 表示标志位,http 一共三种标志位:

  • END_STREAM 流结束标志,表示当前帧是流的最后一个帧
  • END_HEADERS 头结束表示,表示当前帧是头信息的最后一个帧
  • PADDED 填充标志,在数据Payload里填充无用信息,用于干扰信道监听

具体可以参考:

回到 examples 目录的 helloworld 目录, 在 server main 函数中跟踪 s.Serve 方法: s.Serve() ——> s.handleRawConn(rawConn) ——> s.serveStreams(st)

来看一下 serveStreams 这个方法

  1. defer st.Close()
  2. var wg sync.WaitGroup
  3. st.HandleStreams(func(stream *transport.Stream) {
  4. wg.Add(1)
  5. go func() {
  6. defer wg.Done()
  7. s.handleStream(st, stream, s.traceInfo(st, stream))
  8. }()
  9. }, func(ctx context.Context, method string) context.Context {
  10. if !EnableTracing {
  11. return ctx
  12. }
  13. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  14. return trace.NewContext(ctx, tr)
  15. })
  16. wg.Wait()
  17. }

这里调用了 transport 的 HandleStreams 方法, 这个方法就是 http 帧的处理的具体实现。它的底层直接调用的 http2 包的 ReadFrame 方法去读取一个 http 帧数据。

  1. type framer struct {
  2. writer *bufWriter
  3. fr *http2.Framer
  4. }
  5. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  6. defer close(t.readerDone)
  7. for {
  8. frame, err := t.framer.fr.ReadFrame()
  9. atomic.StoreUint32(&t.activity, 1)
  10. ...
  11. switch frame := frame.(type) {
  12. case *http2.MetaHeadersFrame:
  13. t.Close()
  14. break
  15. }
  16. case *http2.DataFrame:
  17. t.handleData(frame)
  18. case *http2.RSTStreamFrame:
  19. t.handleRSTStream(frame)
  20. case *http2.SettingsFrame:
  21. t.handleSettings(frame)
  22. case *http2.PingFrame:
  23. t.handlePing(frame)
  24. case *http2.WindowUpdateFrame:
  25. t.handleWindowUpdate(frame)
  26. case *http2.GoAwayFrame:
  27. // TODO: Handle GoAway from the client appropriately.
  28. default:
  29. errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  30. }
  31. }
  32. }

通过 http2 包的 ReadFrame 直接读取出一个帧的数据。

fh, err := readFrameHeader(fr.headerBuf[:], fr.r) 这一行代码读取了 http 的包头数据,我们来看一下 headerBuf 的长度,发现果然是 9 个字节。

  1. const frameHeaderLen = 9
  2. func readFrameHeader(buf []byte, r io.Reader) (FrameHeader, error) {
  3. _, err := io.ReadFull(r, buf[:frameHeaderLen])
  4. if err != nil {
  5. return FrameHeader{}, err
  6. return FrameHeader{
  7. Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
  8. Type: FrameType(buf[3]),
  9. Flags: Flags(buf[4]),
  10. StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
  11. }, nil
  12. }

回到 http2 读帧的部分,当发现帧的格式是 MetaHeadersFrame,也就是第一个帧时,会调用 operateHeaders 方法

  1. case *http2.MetaHeadersFrame:
  2. if t.operateHeaders(frame, handle, traceCtx) {
  3. t.Close()
  4. break
  5. }

看一下 operateHeaders ,里面会去调用 handle(s) , 这个handle 其实是 之前 s.Serve() ——> s.handleRawConn(rawConn) ——> s.serveStreams(st) 这个路径下的 HandleStreams 方法传入的,也就是会去调用 handleStream 这个方法

s.handleStream(st, stream, s.traceInfo(st, stream)) ——> s.processUnaryRPC(t, stream, srv, md, trInfo) ———> d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) ,进入 recvAndDecompress 这个函数,里面调用了

  1. pf, d, err := p.recvMsg(maxReceiveMessageSize)

进入 recvMsg,发现它就是解析 grpc 协议 的函数,先把协议头读出来,用了 5 个字节。从协议头中得知协议体消息的长度,然后用一个相应长度的 byte 数组把协议体读出来

  1. type parser struct {
  2. r io.Reader
  3. header [5]byte
  4. }
  5. func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
  6. if _, err := p.r.Read(p.header[:]); err != nil {
  7. return 0, nil, err
  8. }
  9. pf = payloadFormat(p.header[0])
  10. length := binary.BigEndian.Uint32(p.header[1:])
  11. ...
  12. msg = make([]byte, int(length))
  13. if _, err := p.r.Read(msg); err != nil {
  14. if err == io.EOF {
  15. err = io.ErrUnexpectedEOF
  16. }
  17. return 0, nil, err
  18. }

继续回到这个图,前面说到了 grpc 协议头是 5个字节。 11. http2 协议帧格式 - 图1 compressed-flag 表示是否压缩,值为 1 是压缩消息体数据,0 不压缩。 length 表示消息体数据长度。现在终于知道了这个数据结构的由来!

读取出来的数据是二进制的,读出来原数据之后呢,我们就可以针对相应的数据做解包操作了。这里可以参考我的另一篇文章 :10-grpc 协议编解码器