MongoDB Backend

    After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to MongoDB to setup Save data to MongoDB in rule engine.

    Config file: emqx_backend_mongo.conf

    Connection pool of multiple MongoDB servers is supported:

    Configure MongoDB Persistence Hooks

    1. backend.mongo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
    2. ## Subscribe Lookup Record
    3. backend.mongo.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
    4. ## Client DisConnected Record
    5. backend.mongo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
    6. ## Lookup Unread Message QOS > 0
    7. backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
    8. ## Lookup Retain Message
    9. backend.mongo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
    10. ## Store Publish Message QOS > 0, payload_format options mongo_json | plain_text
    11. backend.mongo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1", "payload_format": "mongo_json"}
    12. ## Store Retain Message, payload_format options mongo_json | plain_text
    13. backend.mongo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1", "payload_format": "mongo_json"}
    14. ## Delete Retain Message
    15. backend.mongo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
    16. ## Store Ack
    17. backend.mongo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
    18. ## Get offline messages
    19. ### "offline_opts": Get configuration for offline messages
    20. ### max_returned_count: Maximum number of offline messages get at a time
    21. ## backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
    22. ## If you need to store Qos0 messages, you can enable the following configuration
    23. ## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
    24. ## backend.mongo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

    Description of MongoDB Persistence Hooks

    1. use mqtt
    2. db.createCollection("mqtt_sub")
    3. db.createCollection("mqtt_msg")
    4. db.createCollection("mqtt_retain")
    5. db.createCollection("mqtt_acked")
    6. db.mqtt_client.ensureIndex({clientid:1, node:2})
    7. db.mqtt_sub.ensureIndex({clientid:1})
    8. db.mqtt_msg.ensureIndex({sender:1, topic:2})
    9. db.mqtt_retain.ensureIndex({topic:1})

    TIP

    DB name is free of choice

    MongoDB MQTT Client Collection

    1. {
    2. clientid: string,
    3. state: 0,1, //0 disconnected 1 connected
    4. node: string,
    5. online_at: timestamp,
    6. offline_at: timestamp
    7. }

    Query client’s connection state:

    1. db.mqtt_client.findOne({clientid: ${clientid}})

    E.g., if client ‘test’ is online:

    1. db.mqtt_client.findOne({clientid: "test"})
    2. {
    3. "_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
    4. "clientid" : "test",
    5. "state" : 1,
    6. "node" : "emqx@127.0.0.1",
    7. "online_at" : 1482976411,
    8. "offline_at" : null
    9. }

    Client ‘test’ is offline:

    MongoDB Subscription Collection

    mqtt_sub stores subscriptions of clients:

    1. {
    2. clientid: string,
    3. topic: string,
    4. qos: 0,1,2
    5. }

    E.g., client ‘test’ subscribes to topic ‘test_topic1’ and ‘test_topic2’:

    1. db.mqtt_sub.insert({clientid: "test", topic: "test_topic1", qos: 1})
    1. db.mqtt_sub.find({clientid: "test"})
    2. { "_id" : ObjectId("58646d96c65dff6ac9668ca2"), "clientid" : "test", "topic" : "test_topic2", "qos" : 2 }

    mqtt_msg stores MQTT messages:

    1. {
    2. _id: int,
    3. topic: string,
    4. msgid: string,
    5. sender: string,
    6. qos: 0,1,2,
    7. retain: boolean (true, false),
    8. payload: string,
    9. arrived: timestamp
    10. }

    Query messages published by a client:

    1. db.mqtt_msg.find({sender: ${clientid}})

    Query messages published by client ‘test’:

    MongoDB Retained Message Collection

    mqtt_retain stores retained messages:

    1. {
    2. topic: string,
    3. msgid: string,
    4. sender: string,
    5. qos: 0,1,2,
    6. payload: string,
    7. arrived: timestamp
    8. }

    Query retained messages:

    1. db.mqtt_retain.findOne({topic: ${topic}})
    1. db.mqtt_retain.findOne({topic: "/World"})
    2. {
    3. "_id" : ObjectId("58646dd9dde89a9fb9f7fb75"),
    4. "topic" : "/World",
    5. "msgid" : "AAVEwm0la4RufgAABeIAAQ==",
    6. "sender" : "c1",
    7. "qos" : 1,
    8. "payload" : "Hello world!",
    9. "arrived" : 1482976729
    10. }

    MongoDB Acknowledgement Collection

    mqtt_acked stores acknowledgements from the clients:

    1. {
    2. clientid: string,
    3. topic: string,
    4. mongo_id: int
    5. }