MongoDB 消息存储
EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 MongoDB规则引擎中创建 保存数据到 MongoDB
配置文件: emqx_backend_mongo.conf
配置 MongoDB 服务器
支持配置多台 MongoDB 服务器连接池:
backend 消息存储规则包括:
MongoDB 数据库初始化
db.createCollection("mqtt_client")
db.createCollection("mqtt_sub")
db.createCollection("mqtt_msg")
db.createCollection("mqtt_retain")
db.createCollection("mqtt_acked")
db.mqtt_client.ensureIndex({clientid:1, node:2})
db.mqtt_sub.ensureIndex({clientid:1})
db.mqtt_msg.ensureIndex({sender:1, topic:2})
db.mqtt_retain.ensureIndex({topic:1})
mqtt_client 存储设备在线状态:
{
clientid: string,
state: 0,1, //0离线 1在线
node: string,
online_at: timestamp,
offline_at: timestamp
}
db.mqtt_client.findOne({clientid: ${clientid}})
例如 ClientId 为 test 客户端上线:
db.mqtt_client.findOne({clientid: "test"})
{
"_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
"clientid" : "test",
"state" : 1,
"node" : "emqx@127.0.0.1",
"online_at" : 1482976411,
}
例如 ClientId 为 test 客户端下线:
db.mqtt_client.findOne({clientid: "test"})
{
"_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
"clientid" : "test",
"state" : 0,
"node" : "emqx@127.0.0.1",
"online_at" : 1482976411,
"offline_at" : 1482976501
}
MongoDB 用户订阅主题集合(Subscription Collection)
mqtt_sub 存储订阅关系:
用户 test 分别订阅主题 test_topic0 test_topic1 test_topic2:
db.mqtt_sub.insert({clientid: "test", topic: "test_topic1", qos: 1})
db.mqtt_sub.insert({clientid: "test", topic: "test_topic2", qos: 2})
某个客户端订阅主题:
db.mqtt_sub.find({clientid: ${clientid}})
db.mqtt_sub.find({clientid: "test"})
{ "_id" : ObjectId("58646d90c65dff6ac9668ca1"), "clientid" : "test", "topic" : "test_topic1", "qos" : 1 }
{ "_id" : ObjectId("58646d96c65dff6ac9668ca2"), "clientid" : "test", "topic" : "test_topic2", "qos" : 2 }
MongoDB 发布消息集合(Message Collection)
mqtt_msg 存储 MQTT 消息:
{
_id: int,
topic: string,
msgid: string,
qos: 0,1,2,
retain: boolean (true, false),
payload: string,
}
查询某个客户端发布的消息:
db.mqtt_msg.find({sender: ${clientid}})
查询 ClientId 为 “test” 的客户端发布的消息:
mqtt_retain 存储 Retain 消息:
{
topic: string,
msgid: string,
sender: string,
qos: 0,1,2,
payload: string,
arrived: timestamp
}
查询 retain 消息:
db.mqtt_retain.findOne({topic: ${topic}})
db.mqtt_retain.findOne({topic: "t/retain"})
{
"_id" : ObjectId("58646dd9dde89a9fb9f7fb75"),
"topic" : "t/retain",
"msgid" : "AAVEwm0la4RufgAABeIAAQ==",
"sender" : "c1",
"qos" : 1,
"payload" : "Hello world!",
"arrived" : 1482976729
}
MongoDB 接收消息 ack 集合(Message Acked Collection)
mqtt_acked 存储客户端消息确认:
{
clientid: string,
topic: string,
mongo_id: int