gRPC

    Network()

    配置服务端的 network 协议,如 tcp

    Address()

    配置服务端监听的地址

    Timeout()

    配置服务端的超时设置

    Logger()

    配置服务端使用日志

    Middleware()

    UnaryInterceptor()

    配置服务端使用的 grpc 拦截器

    Options()

    配置一些额外的 grpc.ServerOption

    主要的实现细节

    NewServer()

    unaryServerInterceptor()

    1. func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
    2. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    3. // 把两个 ctx 合并成一个
    4. ctx, cancel := ic.Merge(ctx, s.ctx)
    5. defer cancel()
    6. // 从 ctx 中取出 metadata
    7. md, _ := grpcmd.FromIncomingContext(ctx)
    8. ctx = transport.NewServerContext(ctx, &Transport{
    9. endpoint: s.endpoint.String(),
    10. operation: info.FullMethod,
    11. header: headerCarrier(md),
    12. // ctx 超时设置
    13. if s.timeout > 0 {
    14. ctx, cancel = context.WithTimeout(ctx, s.timeout)
    15. defer cancel()
    16. }
    17. // 中间件处理
    18. h := func(ctx context.Context, req interface{}) (interface{}, error) {
    19. return handler(ctx, req)
    20. }
    21. if len(s.middleware) > 0 {
    22. h = middleware.Chain(s.middleware...)(h)
    23. }
    24. return h(ctx, req)
    25. }
    26. }

    简单列举了一些 kratos 中 grpc 的用法,其他 grpc 用法可以到 grpc 仓库中查看。

    注册 grpc server

    1. gs := grpc.NewServer()
    2. app := kratos.New(
    3. kratos.Name("kratos"),
    4. kratos.Version("v1.0.0"),
    5. kratos.Server(gs),
    6. )

    grpc server 中使用 kratos middleware

    middleware 中处理 grpc 请求

    1. if info, ok := transport.FromServerContext(ctx); ok {
    2. kind = info.Kind().String()
    3. operation = info.Operation()
    4. }

    配置

    WithEndpoint()

    配置客户端使用的对端连接地址,如果不使用服务发现则为ip:port,如果使用服务发现则格式为discovery://\<authority>/\<serviceName>

    WithTimeout()

    WithMiddleware()

    配置客户端使用的 kratos 中间件

    WithDiscovery()

    配置客户端使用的服务发现

    WithUnaryInterceptor()

    配置客户端使用的 grpc 原生拦截器

    WithOptions()

    配置一些额外的 grpc.ClientOption

    dial()

    1. func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
    2. // 默认配置
    3. options := clientOptions{
    4. timeout: 500 * time.Millisecond,
    5. }
    6. // 遍历 opts
    7. o(&options)
    8. // 将 kratos 中间件转化成 grpc 拦截器
    9. var ints = []grpc.UnaryClientInterceptor{
    10. unaryClientInterceptor(options.middleware, options.timeout),
    11. }
    12. if len(options.ints) > 0 {
    13. ints = append(ints, options.ints...)
    14. }
    15. var grpcOpts = []grpc.DialOption{
    16. // 负载均衡
    17. grpc.WithBalancerName(roundrobin.Name),
    18. grpc.WithChainUnaryInterceptor(ints...),
    19. }
    20. if options.discovery != nil {
    21. // 如果存在服务发现配置,就配置 grpc 的 Resolvers
    22. grpcOpts = append(grpcOpts, grpc.WithResolvers(discovery.NewBuilder(options.discovery)))
    23. }
    24. if insecure {
    25. // 跳过证书验证
    26. grpcOpts = append(grpcOpts, grpc.WithInsecure())
    27. }
    28. if len(options.grpcOpts) > 0 {
    29. grpcOpts = append(grpcOpts, options.grpcOpts...)
    30. }
    31. return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
    32. }

    使用方式

    创建客户端连接

    1. conn, err := grpc.DialInsecure(
    2. context.Background(),
    3. grpc.WithEndpoint("127.0.0.1:9000"),
    4. )

    使用中间件

    1. conn, err := grpc.DialInsecure(
    2. context.Background(),
    3. transport.WithEndpoint("127.0.0.1:9000"),
    4. transport.WithMiddleware(
    5. recovery.Recovery(),
    6. ),
    7. )

    使用服务发现