MongoDB 消息存储

    EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 MongoDB规则引擎中创建 保存数据到 MongoDB

    配置文件: emqx_backend_mongo.conf

    配置 MongoDB 服务器

    支持配置多台 MongoDB 服务器连接池:

    backend 消息存储规则包括:

    MongoDB 数据库初始化

    1. db.createCollection("mqtt_client")
    2. db.createCollection("mqtt_sub")
    3. db.createCollection("mqtt_msg")
    4. db.createCollection("mqtt_retain")
    5. db.createCollection("mqtt_acked")
    6. db.mqtt_client.ensureIndex({clientid:1, node:2})
    7. db.mqtt_sub.ensureIndex({clientid:1})
    8. db.mqtt_msg.ensureIndex({sender:1, topic:2})
    9. db.mqtt_retain.ensureIndex({topic:1})

    mqtt_client 存储设备在线状态:

    1. {
    2. clientid: string,
    3. state: 0,1, //0离线 1在线
    4. node: string,
    5. online_at: timestamp,
    6. offline_at: timestamp
    7. }
    1. db.mqtt_client.findOne({clientid: ${clientid}})

    例如 ClientId 为 test 客户端上线:

    1. db.mqtt_client.findOne({clientid: "test"})
    2. {
    3. "_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
    4. "clientid" : "test",
    5. "state" : 1,
    6. "node" : "emqx@127.0.0.1",
    7. "online_at" : 1482976411,
    8. }

    例如 ClientId 为 test 客户端下线:

    1. db.mqtt_client.findOne({clientid: "test"})
    2. {
    3. "_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
    4. "clientid" : "test",
    5. "state" : 0,
    6. "node" : "emqx@127.0.0.1",
    7. "online_at" : 1482976411,
    8. "offline_at" : 1482976501
    9. }

    MongoDB 用户订阅主题集合(Subscription Collection)

    mqtt_sub 存储订阅关系:

    用户 test 分别订阅主题 test_topic0 test_topic1 test_topic2:

    1. db.mqtt_sub.insert({clientid: "test", topic: "test_topic1", qos: 1})
    2. db.mqtt_sub.insert({clientid: "test", topic: "test_topic2", qos: 2})

    某个客户端订阅主题:

    1. db.mqtt_sub.find({clientid: ${clientid}})
    1. db.mqtt_sub.find({clientid: "test"})
    2. { "_id" : ObjectId("58646d90c65dff6ac9668ca1"), "clientid" : "test", "topic" : "test_topic1", "qos" : 1 }
    3. { "_id" : ObjectId("58646d96c65dff6ac9668ca2"), "clientid" : "test", "topic" : "test_topic2", "qos" : 2 }

    MongoDB 发布消息集合(Message Collection)

    mqtt_msg 存储 MQTT 消息:

    1. {
    2. _id: int,
    3. topic: string,
    4. msgid: string,
    5. qos: 0,1,2,
    6. retain: boolean (true, false),
    7. payload: string,
    8. }

    查询某个客户端发布的消息:

    1. db.mqtt_msg.find({sender: ${clientid}})

    查询 ClientId 为 “test” 的客户端发布的消息:

    mqtt_retain 存储 Retain 消息:

    1. {
    2. topic: string,
    3. msgid: string,
    4. sender: string,
    5. qos: 0,1,2,
    6. payload: string,
    7. arrived: timestamp
    8. }

    查询 retain 消息:

    1. db.mqtt_retain.findOne({topic: ${topic}})
    1. db.mqtt_retain.findOne({topic: "t/retain"})
    2. {
    3. "_id" : ObjectId("58646dd9dde89a9fb9f7fb75"),
    4. "topic" : "t/retain",
    5. "msgid" : "AAVEwm0la4RufgAABeIAAQ==",
    6. "sender" : "c1",
    7. "qos" : 1,
    8. "payload" : "Hello world!",
    9. "arrived" : 1482976729
    10. }

    MongoDB 接收消息 ack 集合(Message Acked Collection)

    mqtt_acked 存储客户端消息确认:

    1. {
    2. clientid: string,
    3. topic: string,
    4. mongo_id: int

    启用 MongoDB 数据存储插件