多语言扩展 - 钩子

    • 客户端接入的认证授权
    • 发布/订阅的 ACL 鉴权
    • 消息的持久化,桥接
    • 发布/订阅,或者客户端上下线事件的通知处理

    多语言扩展钩子 功能由 插件提供。它使用 作为 RPC 的通信框架。

    架构如下图:

    它表明:EMQX 中的 ExHook 模块作为一个 gRPC 客户端,将系统中所有的钩子事件发送到用户的 gRPC 服务端。

    和 EMQX 原生的钩子一致,它也支持链式的方式计算和返回:

    chain_of_responsiblity

    作为事件的处理端,即 gRPC 的服务端。它需要用户自定义实现需要挂载的钩子列表,和每个钩子事件到达后如何去处理的回调函数。这些接口在 多语言扩展钩子 中被定义为一个名为 HookProvider的 gRPC 服务,其需要实现的接口的列表包含:

    1. syntax = "proto3";
    2. package emqx.exhook.v1;
    3. service HookProvider {
    4. rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
    5. rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
    6. rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
    7. rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
    8. rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
    9. rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
    10. rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
    11. rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
    12. rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
    13. rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
    14. rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
    15. rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
    16. rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
    17. rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
    18. rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
    19. rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
    20. rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
    21. rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
    22. }

    其中 HookProvider 部分:

    • OnProviderLoaded:定义 HookProvider 如何被加载,返回需要挂载的钩子列表。仅在该列表中的钩子会被回调到 HookProivder 服务。
    • OnProviderUnloaded:定义 HookProvider 如何被卸载,仅用作通知。
    • OnClient*OnSession*OnMessage* 为前缀的方法与 的当中的方法一一对应。它们有着相同的调用时机和相似的参数列表。
    • OnClientAuthenticateOnClientCheckAclOnMessagePublish 允许携带返回值到 EMQX 系统,其它回调则不支持。
    • 特定的,对于 message 类型的钩子:message.publish, message.delivered, message.acked, message.dropped,可以在返回钩子列表时,为这些钩子携带主题过滤器列表。 那么在触发消息事件时,仅有主题能够匹配主题过滤器列表中任意过滤器的消息,会被发送到用户的 gRPC 服务端。
      通过这样的方式,gRPC 服务端可以仅处理自己关心的主题下的消息,以避免多余的 gRPC 请求消耗。

    其中接口和参数数据结构的详情参考:exhook.proto (opens new window)

    提示

    需要注意的是,EMQX 仅在 HookProvider 被加载时确认一次需要加载的钩子以及 message 钩子所配置的主题过滤器列表,之后的 gRPC 请求都以将以加载时的为准。
    如果需要挂载的钩子列表有变更,或 message 类型钩子所关心的主题过滤器列表有变更,都需要重新加载。即需要在 EMQX 中重启 ExHook 插件/模块。

    用户在使用多语言扩展钩子的功能时,需要先实现 HookProvider 的 gRPC 服务来接收 ExHook 发送出来的回调事件。

    其步骤如下:

    1. 拷贝出当前版本的 lib/emqx_exhook-<x.y.z>/priv/protos/exhook.proto 文件。
    2. 按需实现 exhook.proto 当中定义的接口。

    开发完成后,需将该服务部署到与 EMQX 能够通信的服务器上,并保证端口的开放。

    其中各个语言的 gRPC 框架可参考:grpc-ecosystem/awesome-grpc多语言扩展钩子 - 图4 (opens new window)

    打开 EMQX Dashboard (opens new window),点击左侧的 “模块” 选项卡,选择添加:

    Modules

    选择 “多语言扩展钩子”:

    配置 服务相关参数:

    Configure ExHook gRPC Server

    点击添加后,模块添加完成

    至此,多语言扩展钩子的配置已经完成。