Kafka Bridge
After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Bridge data to Kafka to setup kafka bridges in rule engine.
EMQX bridges and forwards MQTT messages to Kafka cluster:
Configure Kafka Bridge Hooks
## ${topic}: the kafka topics to which the messages will be published.
## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .
bridge.kafka.hook.client.connected.1 = {"topic": "client_connected"}
bridge.kafka.hook.client.disconnected.1 = {"topic": "client_disconnected"}
bridge.kafka.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"}
bridge.kafka.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"}
bridge.kafka.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"}
bridge.kafka.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"}
bridge.kafka.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}
Description of Kafka Bridge Hooks
Client goes online, EMQX forwards ‘client_connected’ event message to Kafka:
topic = "client_connected",
value = {
"node": ${node},
"ts": ${ts}
}
Client goes offline, EMQX forwards ‘client_disconnected’ event message to Kafka:
topic = "client_disconnected",
value = {
"client_id": ${clientid},
"reason": ${reason},
"ts": ${ts}
}
Forward Subscription Event to Kafka
Forward Unsubscription Event to Kafka
topic = session_unsubscribed
value = {
"client_id": ${clientid},
"topic": ${topic},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
topic = message_publish
value = {
"client_id": ${clientid},
"username": ${username},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
}
Forwarding MQTT Message Deliver Event to Kafka
topic = message_delivered
value = {"client_id": ${clientid},
"username": ${username},
"from": ${fromClientId},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
Forwarding MQTT Message Ack Event to Kafka
Kafka consumes MQTT clients connected / disconnected event messages:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic client_connected --from-beginning
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic client_disconnected --from-beginning
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic session_subscribed --from-beginning
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic session_unsubscribed --from-beginning
Kafka consumes MQTT published messages:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_publish --from-beginning
Kafka consumes MQTT message Deliver and Ack event messages:
TIP
Enable Kafka Bridge
./bin/emqx_ctl plugins load emqx_bridge_kafka