Kafka Bridge

    After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Bridge data to Kafka to setup kafka bridges in rule engine.

    EMQX bridges and forwards MQTT messages to Kafka cluster:

    Configure Kafka Bridge Hooks

    1. ## ${topic}: the kafka topics to which the messages will be published.
    2. ## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .
    3. bridge.kafka.hook.client.connected.1 = {"topic": "client_connected"}
    4. bridge.kafka.hook.client.disconnected.1 = {"topic": "client_disconnected"}
    5. bridge.kafka.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"}
    6. bridge.kafka.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"}
    7. bridge.kafka.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"}
    8. bridge.kafka.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"}
    9. bridge.kafka.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}

    Description of Kafka Bridge Hooks

    Client goes online, EMQX forwards ‘client_connected’ event message to Kafka:

    1. topic = "client_connected",
    2. value = {
    3. "node": ${node},
    4. "ts": ${ts}
    5. }

    Client goes offline, EMQX forwards ‘client_disconnected’ event message to Kafka:

    1. topic = "client_disconnected",
    2. value = {
    3. "client_id": ${clientid},
    4. "reason": ${reason},
    5. "ts": ${ts}
    6. }

    Forward Subscription Event to Kafka

    Forward Unsubscription Event to Kafka

    1. topic = session_unsubscribed
    2. value = {
    3. "client_id": ${clientid},
    4. "topic": ${topic},
    5. "qos": ${qos},
    6. "node": ${node},
    7. "ts": ${timestamp}
    8. }
    1. topic = message_publish
    2. value = {
    3. "client_id": ${clientid},
    4. "username": ${username},
    5. "payload": ${payload},
    6. "qos": ${qos},
    7. "node": ${node},
    8. }

    Forwarding MQTT Message Deliver Event to Kafka

    1. topic = message_delivered
    2. value = {"client_id": ${clientid},
    3. "username": ${username},
    4. "from": ${fromClientId},
    5. "topic": ${topic},
    6. "payload": ${payload},
    7. "qos": ${qos},
    8. "node": ${node},
    9. "ts": ${timestamp}
    10. }

    Forwarding MQTT Message Ack Event to Kafka

    Kafka consumes MQTT clients connected / disconnected event messages:

    1. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic client_connected --from-beginning
    2. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic client_disconnected --from-beginning
    1. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic session_subscribed --from-beginning
    2. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic session_unsubscribed --from-beginning

    Kafka consumes MQTT published messages:

    1. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_publish --from-beginning

    Kafka consumes MQTT message Deliver and Ack event messages:

    TIP

    Enable Kafka Bridge

    1. ./bin/emqx_ctl plugins load emqx_bridge_kafka