Kafka 桥接
EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用Kafka 桥接
EMQX 桥接转发 MQTT 消息到 Kafka 集群,Apache Kafka是一个快速、高可扩展、高吞吐的分布式日志系统,配合kafka Stream,在流式数据处理中非常常用。
配置 Kafka 桥接规则
## ${topic}: the kafka topics to which the messages will be published.
## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed.
## Client Connected Record Hook
bridge.kafka.hook.client.connected.1 = {"topic": "client_connected"}
## Client Disconnected Record Hook
bridge.kafka.hook.client.disconnected.1 = {"topic": "client_disconnected"}
## Session Subscribed Record Hook
bridge.kafka.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"}
## Session Unsubscribed Record Hook
bridge.kafka.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"}
## Message Publish Record Hook
bridge.kafka.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"}
## Message Delivered Record Hook
bridge.kafka.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"}
bridge.kafka.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}
## More Configures
## partitioner strategy:
## Option: random | roundrobin | first_key_dispatch
## Example: bridge.kafka.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "strategy":"random"}
## Option: ${clientid} | ${username}
## Example: bridge.kafka.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "key":"${clientid}"}
## format:
## Option: json | json
## Example: bridge.kafka.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "format":"json"}
Kafka 桥接规则说明
设备上线 EMQX 转发上线事件消息到 Kafka:
topic = "client_connected",
value = {
"client_id": ${clientid},
"username": ${username},
"node": ${node},
"ts": ${ts}
}
设备下线 EMQX 转发下线事件消息到 Kafka:
topic = "client_disconnected",
value = {
"client_id": ${clientid},
"username": ${username},
"reason": ${reason},
"node": ${node},
"ts": ${ts}
}
客户端订阅主题事件转发 Kafka
客户端取消订阅主题事件转发 Kafka
topic = session_unsubscribed
value = {
"client_id": ${clientid},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
topic = message_publish
value = {
"client_id": ${clientid},
"username": ${username},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
MQTT 消息派发 (Deliver) 事件转发 Kafka
topic = message_delivered
value = {
"client_id": ${clientid},
"username": ${username},
"from": ${fromClientId},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
MQTT 消息确认 (Ack) 事件转发 Kafka
Kafka 读取 MQTT 客户端上下线事件消息:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic client_connected --from-beginning
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic client_disconnected --from-beginning
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic session_subscribed --from-beginning
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic session_unsubscribed --from-beginning
Kafka 读取 MQTT 发布消息:
Kafka 读取 MQTT 消息发布 (Deliver)、确认 (Ack)事件:
提示
启用 Kafka 桥接插件
./bin/emqx_ctl plugins load emqx_bridge_kafka