MongoDB Backend
After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to MongoDB to setup Save data to MongoDB in rule engine.
Config file: emqx_backend_mongo.conf
Connection pool of multiple MongoDB servers is supported:
Configure MongoDB Persistence Hooks
backend.mongo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.mongo.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.mongo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message QOS > 0
backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## Lookup Retain Message
backend.mongo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0, payload_format options mongo_json | plain_text
backend.mongo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1", "payload_format": "mongo_json"}
## Store Retain Message, payload_format options mongo_json | plain_text
backend.mongo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1", "payload_format": "mongo_json"}
## Delete Retain Message
backend.mongo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack
backend.mongo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## Get offline messages
### "offline_opts": Get configuration for offline messages
### max_returned_count: Maximum number of offline messages get at a time
## backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
## If you need to store Qos0 messages, you can enable the following configuration
## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.mongo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
Description of MongoDB Persistence Hooks
use mqtt
db.createCollection("mqtt_sub")
db.createCollection("mqtt_msg")
db.createCollection("mqtt_retain")
db.createCollection("mqtt_acked")
db.mqtt_client.ensureIndex({clientid:1, node:2})
db.mqtt_sub.ensureIndex({clientid:1})
db.mqtt_msg.ensureIndex({sender:1, topic:2})
db.mqtt_retain.ensureIndex({topic:1})
TIP
DB name is free of choice
MongoDB MQTT Client Collection
{
clientid: string,
state: 0,1, //0 disconnected 1 connected
node: string,
online_at: timestamp,
offline_at: timestamp
}
Query client’s connection state:
db.mqtt_client.findOne({clientid: ${clientid}})
E.g., if client ‘test’ is online:
db.mqtt_client.findOne({clientid: "test"})
{
"_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
"clientid" : "test",
"state" : 1,
"node" : "emqx@127.0.0.1",
"online_at" : 1482976411,
"offline_at" : null
}
Client ‘test’ is offline:
MongoDB Subscription Collection
mqtt_sub stores subscriptions of clients:
{
clientid: string,
topic: string,
qos: 0,1,2
}
E.g., client ‘test’ subscribes to topic ‘test_topic1’ and ‘test_topic2’:
db.mqtt_sub.insert({clientid: "test", topic: "test_topic1", qos: 1})
db.mqtt_sub.find({clientid: "test"})
{ "_id" : ObjectId("58646d96c65dff6ac9668ca2"), "clientid" : "test", "topic" : "test_topic2", "qos" : 2 }
mqtt_msg stores MQTT messages:
{
_id: int,
topic: string,
msgid: string,
sender: string,
qos: 0,1,2,
retain: boolean (true, false),
payload: string,
arrived: timestamp
}
Query messages published by a client:
db.mqtt_msg.find({sender: ${clientid}})
Query messages published by client ‘test’:
MongoDB Retained Message Collection
mqtt_retain stores retained messages:
{
topic: string,
msgid: string,
sender: string,
qos: 0,1,2,
payload: string,
arrived: timestamp
}
Query retained messages:
db.mqtt_retain.findOne({topic: ${topic}})
db.mqtt_retain.findOne({topic: "/World"})
{
"_id" : ObjectId("58646dd9dde89a9fb9f7fb75"),
"topic" : "/World",
"msgid" : "AAVEwm0la4RufgAABeIAAQ==",
"sender" : "c1",
"qos" : 1,
"payload" : "Hello world!",
"arrived" : 1482976729
}
MongoDB Acknowledgement Collection
mqtt_acked stores acknowledgements from the clients:
{
clientid: string,
topic: string,
mongo_id: int
}