Kafka 桥接

    EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用Kafka 桥接

    EMQX 桥接转发 MQTT 消息到 Kafka 集群,Apache Kafka是一个快速、高可扩展、高吞吐的分布式日志系统,配合kafka Stream,在流式数据处理中非常常用。

    配置 Kafka 桥接规则

    1. ## ${topic}: the kafka topics to which the messages will be published.
    2. ## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed.
    3. ## Client Connected Record Hook
    4. bridge.kafka.hook.client.connected.1 = {"topic": "client_connected"}
    5. ## Client Disconnected Record Hook
    6. bridge.kafka.hook.client.disconnected.1 = {"topic": "client_disconnected"}
    7. ## Session Subscribed Record Hook
    8. bridge.kafka.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"}
    9. ## Session Unsubscribed Record Hook
    10. bridge.kafka.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"}
    11. ## Message Publish Record Hook
    12. bridge.kafka.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"}
    13. ## Message Delivered Record Hook
    14. bridge.kafka.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"}
    15. bridge.kafka.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}
    16. ## More Configures
    17. ## partitioner strategy:
    18. ## Option: random | roundrobin | first_key_dispatch
    19. ## Example: bridge.kafka.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "strategy":"random"}
    20. ## Option: ${clientid} | ${username}
    21. ## Example: bridge.kafka.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "key":"${clientid}"}
    22. ## format:
    23. ## Option: json | json
    24. ## Example: bridge.kafka.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "format":"json"}

    Kafka 桥接规则说明

    设备上线 EMQX 转发上线事件消息到 Kafka:

    1. topic = "client_connected",
    2. value = {
    3. "client_id": ${clientid},
    4. "username": ${username},
    5. "node": ${node},
    6. "ts": ${ts}
    7. }

    设备下线 EMQX 转发下线事件消息到 Kafka:

    1. topic = "client_disconnected",
    2. value = {
    3. "client_id": ${clientid},
    4. "username": ${username},
    5. "reason": ${reason},
    6. "node": ${node},
    7. "ts": ${ts}
    8. }

    客户端订阅主题事件转发 Kafka

    客户端取消订阅主题事件转发 Kafka

    1. topic = session_unsubscribed
    2. value = {
    3. "client_id": ${clientid},
    4. "qos": ${qos},
    5. "node": ${node},
    6. "ts": ${timestamp}
    1. topic = message_publish
    2. value = {
    3. "client_id": ${clientid},
    4. "username": ${username},
    5. "topic": ${topic},
    6. "payload": ${payload},
    7. "qos": ${qos},
    8. "node": ${node},
    9. "ts": ${timestamp}
    10. }

    MQTT 消息派发 (Deliver) 事件转发 Kafka

    1. topic = message_delivered
    2. value = {
    3. "client_id": ${clientid},
    4. "username": ${username},
    5. "from": ${fromClientId},
    6. "topic": ${topic},
    7. "payload": ${payload},
    8. "qos": ${qos},
    9. "node": ${node},
    10. "ts": ${timestamp}
    11. }

    MQTT 消息确认 (Ack) 事件转发 Kafka

    Kafka 读取 MQTT 客户端上下线事件消息:

    1. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic client_connected --from-beginning
    2. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic client_disconnected --from-beginning
    1. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic session_subscribed --from-beginning
    2. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic session_unsubscribed --from-beginning

    Kafka 读取 MQTT 发布消息:

      Kafka 读取 MQTT 消息发布 (Deliver)、确认 (Ack)事件:

      提示

      启用 Kafka 桥接插件

      1. ./bin/emqx_ctl plugins load emqx_bridge_kafka