DynamoDB 消息存储

    EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 DynamoDB规则引擎中创建 保存数据到 DynamoDB

    配置文件: etc/plugins/emqx_backend_dynamo.conf

    backend 消息存储规则包括:

    DynamoDB 数据库创建表

    mqtt_client 表定义(存储设备在线状态):

    1. {
    2. "TableName": "mqtt_client",
    3. "KeySchema": [
    4. { "AttributeName": "clientid", "KeyType": "HASH" }
    5. ],
    6. "AttributeDefinitions": [
    7. { "AttributeName": "clientid", "AttributeType": "S" }
    8. ],
    9. "ProvisionedThroughput": {
    10. "ReadCapacityUnits": 5,
    11. "WriteCapacityUnits": 5
    12. }
    13. }

    查询设备在线状态:

    1. aws dynamodb scan --table-name mqtt_client --region us-west-2 --endpoint-url http://localhost:8000
    2. {
    3. "Items": [
    4. {
    5. "offline_at": { "N": "0" },
    6. "node": { "S": "emqx@127.0.0.1" },
    7. "clientid": { "S": "mqttjs_384b9c73a9" },
    8. "connect_state": { "N": "1" },
    9. "online_at": { "N": "1562224940" }
    10. }
    11. ],
    12. "Count": 1,
    13. "ScannedCount": 1,
    14. "ConsumedCapacity": null
    15. }

    DynamoDB 用户订阅主题(Subscription Table)

    1. {
    2. "TableName": "mqtt_sub",
    3. "KeySchema": [
    4. { "AttributeName": "clientid", "KeyType": "HASH" },
    5. { "AttributeName": "topic", "KeyType": "RANGE" }
    6. ],
    7. "AttributeDefinitions": [
    8. { "AttributeName": "clientid", "AttributeType": "S" },
    9. { "AttributeName": "topic", "AttributeType": "S" }
    10. ],
    11. "ProvisionedThroughput": {
    12. "WriteCapacityUnits": 5
    13. }
    14. }

    查询 ClientId 为 “test-dynamo” 的客户端已订阅主题:

    mqtt_msg 表定义(存储 MQTT 消息):

    1. {
    2. "KeySchema": [
    3. { "AttributeName": "msgid", "KeyType": "HASH" }
    4. ],
    5. "AttributeDefinitions": [
    6. { "AttributeName": "msgid", "AttributeType": "S" }
    7. ],
    8. "ProvisionedThroughput": {
    9. "ReadCapacityUnits": 5,
    10. "WriteCapacityUnits": 5
    11. }
    12. }

    mqtt_topic_msg_map 表定义(存储主题和消息的映射关系):

    1. {
    2. "TableName": "mqtt_topic_msg_map",
    3. "KeySchema": [
    4. { "AttributeName": "topic", "KeyType": "HASH" }
    5. ],
    6. "AttributeDefinitions": [
    7. { "AttributeName": "topic", "AttributeType": "S" }
    8. ],
    9. "ProvisionedThroughput": {
    10. "ReadCapacityUnits": 5,
    11. "WriteCapacityUnits": 5
    12. }
    13. }

    某个客户端向主题 test 发布消息后,查询 mqtt_msg 表和 mqtt_topic_msg_map 表:

    查询 mqtt_msg 表:

    1. aws dynamodb scan --table-name mqtt_msg --region us-west-2 --endpoint-url http://localhost:8000
    2. > - {
    3. > - "Items": \[
    4. > - {
    5. > "arrived": { "N": "1562308553" }, "qos": { "N": "1" },
    6. > "sender": { "S": "mqttjs_231b962d5c" }, "payload": { "S":
    7. > "{ "msg": "Hello, World\!" }"}, "retain": { "N": "0" },
    8. > "msgid": { "S":
    9. > "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
    10. > "topic": { "S": "test" }
    11. > }
    12. > \], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null
    13. > }
    1. aws dynamodb scan --table-name mqtt_topic_msg_map --region us-west-2 --endpoint-url http://localhost:8000
    2. > - {
    3. > - "Items": \[
    4. > "topic": { "S": "test" }, "MsgId": { "SS": \[
    5. > }
    6. > \], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null
    7. > }

    DynamoDB 保留消息(Retain Message Table)

    mqtt_retain 表定义(存储 retain 消息):

    某个客户端向主题 test 发布消息后,查询 mqtt_retain 表:

    1. {
    2. "Items": [
    3. {
    4. "arrived": { "N": "1562312113" },
    5. "qos": { "N": "1" },
    6. "sender": { "S": "mqttjs_d0513acfce" },
    7. "payload": { "S": "test" },
    8. "retain": { "N": "1" },
    9. "msgid": { "S": "Mjg4MTk1NzE3MTY4MjYxMjA5MDExMDg0NTk5ODgzMjAyNTH" },
    10. "topic": { "S": "testtopic" }
    11. }
    12. ],
    13. "Count": 1,
    14. "ScannedCount": 1,
    15. "ConsumedCapacity": null
    16. }

    mqtt_acked 表定义(存储确认的消息):

    1. {
    2. "TableName": "mqtt_acked",
    3. "KeySchema": [
    4. { "AttributeName": "topic", "KeyType": "HASH" },
    5. { "AttributeName": "clientid", "KeyType": "RANGE" }
    6. ],
    7. "AttributeDefinitions": [
    8. { "AttributeName": "topic", "AttributeType": "S" },
    9. { "AttributeName": "clientid", "AttributeType": "S" }
    10. ],
    11. "ProvisionedThroughput": {
    12. "ReadCapacityUnits": 5,
    13. "WriteCapacityUnits": 5
    14. }
    15. }

    某个客户端向主题 test 发布消息后,查询 mqtt_acked 表:

    1. {
    2. "Items": [
    3. {
    4. "topic": { "S": "test" },
    5. "msgid": { "S": "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
    6. "clientid": { "S": "mqttjs_861e582a70" }
    7. }
    8. ],
    9. "Count": 1,
    10. "ScannedCount": 1,
    11. "ConsumedCapacity": null

    启用 DynamoDB 消息存储: