grpc 数据流转

    我们知道协议是一款 rpc 框架的基础。协议里面定义了一次客户端需要携带的信息,包括请求的后端服务名 ServiceName,方法名 Method、超时时间 Timeout、编码 Encoding、认证信息 Authority 等等。

    前面我们已经说到了,grpc 是基于 http2 协议的,我们来看看 grpc 协议里面的一些关键信息:

    数据承载体

    为了回答上面的问题,我们需要一个数据承载体结构,来保存协议里面的一些需要透传的一些重要信息,比如 Method 等。在 grpc 中,这个结构就是 Stream, 我们来看一下 Stream 的定义。

    接下来我们来看看 server 端 Stream 的构造。前面的内容已经说过 server 的处理流程了。我们直接进入 serveStreams 这个方法。路径为:s.Serve(lis) ——> s.handleRawConn(rawConn) ——> s.serveStreams(st)

    1. defer st.Close()
    2. var wg sync.WaitGroup
    3. st.HandleStreams(func(stream *transport.Stream) {
    4. wg.Add(1)
    5. go func() {
    6. defer wg.Done()
    7. s.handleStream(st, stream, s.traceInfo(st, stream))
    8. }()
    9. }, func(ctx context.Context, method string) context.Context {
    10. if !EnableTracing {
    11. return ctx
    12. }
    13. tr := trace.New("grpc.Recv."+methodFamily(method), method)
    14. return trace.NewContext(ctx, tr)
    15. })
    16. wg.Wait()
    17. }

    最上层 HandleStreams 是对 http2 数据帧的处理。grpc 一共处理了 MetaHeadersFrame 、DataFrame、RSTStreamFrame、SettingsFrame、PingFrame、WindowUpdateFrame、GoAwayFrame 等 7 种帧。

    1. // A MetaHeadersFrame is the representation of one HEADERS frame and
    2. // zero or more contiguous CONTINUATION frames and the decoding of
    3. //
    4. // This type of frame does not appear on the wire and is only returned
    5. // by the Framer when Framer.ReadMetaHeaders is set.
    6. *HeadersFrame
    7. Fields []hpack.HeaderField
    8. Truncated bool
    9. }

    所以是在 MetaHeadersFrame 这个帧里去处理包头数据。所以会去执行 operateHeaders 这个方法,在这个方法里面会去构造一个 stream ,这个 stream 里面包含了传输层请求上下文的数据。包括方法名等。

    构造完 stream 后,接下来 tranport 对数据的处理都会将 stream 层层透传下去。所以整个请求内所需要的数据都从 stream 中可以得到,这样就实现了 server 端的数据流转。

    client 端数据流转

    与 server 相对应,client 端也有一个 clientStream 结构,定义如下:

    1. // clientStream implements a client side Stream.
    2. type clientStream struct {
    3. callHdr *transport.CallHdr
    4. opts []CallOption
    5. callInfo *callInfo
    6. cc *ClientConn
    7. desc *StreamDesc
    8. codec baseCodec
    9. cp Compressor
    10. comp encoding.Compressor
    11. cancel context.CancelFunc // cancels all attempts
    12. sentLast bool // sent an end stream
    13. beginTime time.Time
    14. methodConfig *MethodConfig
    15. retryThrottler *retryThrottler // The throttler active when the RPC began.
    16. binlog *binarylog.MethodLogger // Binary logger, can be nil.
    17. // serverHeaderBinlogged is a boolean for whether server header has been
    18. // logged. Server header will be logged when the first time one of those
    19. // happens: stream.Header(), stream.Recv().
    20. //
    21. // It's only read and used by Recv() and Header(), so it doesn't need to be
    22. // synchronized.
    23. serverHeaderBinlogged bool
    24. mu sync.Mutex
    25. firstAttempt bool // if true, transparent retry is valid
    26. numRetries int // exclusive of transparent retry attempt(s)
    27. numRetriesSincePushback int // retries since pushback; to reset backoff
    28. finished bool // TODO: replace with atomic cmpxchg or sync.Once?
    29. attempt *csAttempt // the active client stream attempt
    30. // TODO(hedging): hedging will have multiple attempts simultaneously.
    31. committed bool // active attempt committed for retry?
    32. buffer []func(a *csAttempt) error // operations to replay on retry
    33. }

    stream 这个结构承载了数据流转之外,同时 grpc 流式传输的实现也是基于 stream 去实现的。