消息桥接

    • RPC 桥接: 使用 Erlang RPC 协议的桥接方式,只能在 EMQX 间使用

    • MQTT 桥接: 使用 MQTT 协议、作为客户端连接到远程 Broker 的桥接方式,可桥接到其他 MQTT Broker 以及 EMQX Broker

    其概念如下图所示:

    发布者可通过桥接将消息发布到远程的 Broker:

    image

    EMQX 根据不同的 name 来区分不同的 bridge。可在 中添加 Bridge:

    该项配置声明了两个 bridge,一个名为 aws,另一个名为 huawei,并分别指向响应的服务地址,使用 MQTT 方式桥接。

    如果该配置的值是另一个 EMQX 的节点名,则使用 RPC 方式桥接:

    1. bridge.mqtt.emqx2.address = emqx2@57.122.76.34

    使用桥接功能需要启动 emqx_bridge_mqtt 插件:

    1. $ emqx_ctl plugins load emqx_bridge_mqtt

    RPC 桥接的优点在于其不涉及 MQTT 协议编解码,效率高于 MQTT 桥接。

    RPC 桥接的缺点:

    • RPC 桥接只能将本地的消息转发到远程桥接节点上,无法将远程桥接节点的消息同步到本地节点上

    假设有两个 emqx 节点:

    现在我们要将 emqx1 桥接到 emqx2。首先需要在 emqx1etc/plugins/emqx_bridge_mqtt.conf 配置文件里添加 Bridge 配置并指向emqx2:

    1. bridge.mqtt.emqx2.address = emqx2@192.168.1.2

    接下来定义 forwards 规则,这样本节点上发到 sensor1/#、 上的消息都会被转发到 emqx2:

    1. bridge.mqtt.emqx2.forwards = sensor1/#,sensor2/#

    如果想要在消息转发前给 emqx2 之前,给主题加上特定前缀,可以设置挂载点:

    1. bridge.mqtt.emqx2.mountpoint = bridge/emqx2/${node}/

    挂载点利于 emqx2 区分桥接消息和本地消息。例如,以上配置中,原主题为 sensor1/hello 的消息,转发到 emqx2 后主题会变为 bridge/emqx2/emqx1@192.168.1.1/sensor1/hello

    MQTT 桥接是让 EMQX 作为 MQTT 客户端连接到远程的 MQTT Broker。

    首先需要配置 MQTT 客户端参数:

    远程 Broker 地址:

    1. bridge.mqtt.aws.address = 211.182.34.1:1883

    MQTT 协议版本,可以为 mqttv3mqttv4mqttv5 其中之一:

    MQTT 客户端的 clientid:

    1. bridge.mqtt.aws.clientid = bridge_emq

    MQTT 客户端的 username 字段:

    1. bridge.mqtt.aws.username = user
    1. bridge.mqtt.aws.password = passwd

    Keepalive 设置:

      然后是客户端的 clean_start 字段,有些 IoT Hub 要求 clean_start(或 clean_session) 字段必须为 true:

      1. bridge.mqtt.aws.clean_start = true

      可设置桥接断线重连间隔:

      1. bridge.mqtt.aws.reconnect_interval = 30s

      如需使用 TLS 连接,可以设置 bridge.mqtt.aws.ssl = on 并设置 TLS 证书:

      接下来定义 forwards 规则,这样本节点上发到 sensor1/#sensor2/# 上的消息都会被转发到远程 Broker:

      1. bridge.mqtt.aws.forwards = sensor1/#,sensor2/#

      还可指定 QoS1 与 QoS2 消息的重传间隔以及批量发送报文数:

      1. bridge.mqtt.aws.retry_interval = 20s
      2. bridge.mqtt.aws.max_inflight_batches = 32

      如果想要在消息转发前给 aws 之前,给主题加上特定前缀,可以设置挂载点,详见 章节:

      1. bridge.mqtt.aws.mountpoint = bridge/aws/${node}/

      如果想让本地 Broker “拉取” 远程 Broker 的消息,可以向远程 Broker 订阅某些主题:

      1. bridge.mqtt.aws.subscription.1.topic = cmd/topic1
      2. bridge.mqtt.aws.subscription.1.qos = 1

      EMQX 的 Bridge 拥有消息缓存机制,当 Bridge 连接断开时会将 forwards 主题的消息缓存,等到桥接恢复时,再把消息重新转发到远程节点上。缓存机制同时适用于 RPC 桥接和 MQTT 桥接。

      设置缓存队列总大小:

      1. bridge.mqtt.aws.queue.max_total_size = 5GB

      将消息缓存到磁盘的某个路径(如不设置,则仅缓存到内存):

        设置单个缓存文件的大小,如超过则会创建新的文件来存储消息队列: