gRPC
Network()
配置服务端的 network 协议,如 tcp
Address()
配置服务端监听的地址
Timeout()
配置服务端的超时设置
Logger()
配置服务端使用日志
Middleware()
UnaryInterceptor()
配置服务端使用的 grpc 拦截器
Options()
配置一些额外的 grpc.ServerOption
主要的实现细节
NewServer()
unaryServerInterceptor()
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 把两个 ctx 合并成一个
ctx, cancel := ic.Merge(ctx, s.ctx)
defer cancel()
// 从 ctx 中取出 metadata
md, _ := grpcmd.FromIncomingContext(ctx)
ctx = transport.NewServerContext(ctx, &Transport{
endpoint: s.endpoint.String(),
operation: info.FullMethod,
header: headerCarrier(md),
// ctx 超时设置
if s.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
}
// 中间件处理
h := func(ctx context.Context, req interface{}) (interface{}, error) {
return handler(ctx, req)
}
if len(s.middleware) > 0 {
h = middleware.Chain(s.middleware...)(h)
}
return h(ctx, req)
}
}
简单列举了一些 kratos 中 grpc 的用法,其他 grpc 用法可以到 grpc 仓库中查看。
注册 grpc server
gs := grpc.NewServer()
app := kratos.New(
kratos.Name("kratos"),
kratos.Version("v1.0.0"),
kratos.Server(gs),
)
grpc server 中使用 kratos middleware
middleware 中处理 grpc 请求
if info, ok := transport.FromServerContext(ctx); ok {
kind = info.Kind().String()
operation = info.Operation()
}
配置
WithEndpoint()
配置客户端使用的对端连接地址,如果不使用服务发现则为ip:port,如果使用服务发现则格式为discovery://\<authority>/\<serviceName>
WithTimeout()
WithMiddleware()
配置客户端使用的 kratos 中间件
WithDiscovery()
配置客户端使用的服务发现
WithUnaryInterceptor()
配置客户端使用的 grpc 原生拦截器
WithOptions()
配置一些额外的 grpc.ClientOption
dial()
func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
// 默认配置
options := clientOptions{
timeout: 500 * time.Millisecond,
}
// 遍历 opts
o(&options)
// 将 kratos 中间件转化成 grpc 拦截器
var ints = []grpc.UnaryClientInterceptor{
unaryClientInterceptor(options.middleware, options.timeout),
}
if len(options.ints) > 0 {
ints = append(ints, options.ints...)
}
var grpcOpts = []grpc.DialOption{
// 负载均衡
grpc.WithBalancerName(roundrobin.Name),
grpc.WithChainUnaryInterceptor(ints...),
}
if options.discovery != nil {
// 如果存在服务发现配置,就配置 grpc 的 Resolvers
grpcOpts = append(grpcOpts, grpc.WithResolvers(discovery.NewBuilder(options.discovery)))
}
if insecure {
// 跳过证书验证
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}
if len(options.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, options.grpcOpts...)
}
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}
使用方式
创建客户端连接
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("127.0.0.1:9000"),
)
使用中间件
conn, err := grpc.DialInsecure(
context.Background(),
transport.WithEndpoint("127.0.0.1:9000"),
transport.WithMiddleware(
recovery.Recovery(),
),
)