DynamoDB Backend
After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to DynamoDB to setup Save data to DynamoDB in rule engine.
Config file: etc/plugins/emqx_backend_dynamo.conf
Description of DynamoDB Persistence Hooks
Create DynamoDB DB
TIP
DB name is free of choice
mqtt_client stores client connection states:
{
"TableName": "mqtt_client",
"KeySchema": [
{ "AttributeName": "clientid", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "clientid", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
aws dynamodb scan --table-name mqtt_client --region us-west-2 --endpoint-url http://localhost:8000
{
"Items": [
{
"offline_at": { "N": "0" },
"node": { "S": "emqx@127.0.0.1" },
"clientid": { "S": "mqttjs_384b9c73a9" },
"connect_state": { "N": "1" },
"online_at": { "N": "1562224940" }
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
DynamoDB Subscription Table
mqtt_sub table stores MQTT subscriptions of clients:
{
"TableName": "mqtt_sub",
"KeySchema": [
{ "AttributeName": "clientid", "KeyType": "HASH" },
{ "AttributeName": "topic", "KeyType": "RANGE" }
],
"AttributeDefinitions": [
{ "AttributeName": "clientid", "AttributeType": "S" },
{ "AttributeName": "topic", "AttributeType": "S" }
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query topics subscribed by the client named “test-dynamo”:
DynamoDB Message Table
mqtt_msg stores MQTT messages:
"TableName": "mqtt_msg",
"KeySchema": [
{ "AttributeName": "msgid", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "msgid", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
mqtt_topic_msg_map stores the mapping between topics and messages:
{
"TableName": "mqtt_topic_msg_map",
"KeySchema": [
{ "AttributeName": "topic", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "topic", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query mqtt_msg and mqtt_topic_msg_map after a client publishes a message to the “test” topic:
aws dynamodb scan --table-name mqtt_msg --region us-west-2 --endpoint-url http://localhost:8000
> - {
> - "Items": [
> - {
> "arrived": { "N": "1562308553" }, "qos": { "N": "1" },
> "sender": { "S": "mqttjs\_231b962d5c" }, "payload": { "S":
> "{ "msg": "Hello, World\!" }"}, "retain": { "N": "0" },
> "msgid": { "S":
> "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
> "topic": { "S": "test" }
> }
> ], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null
> }
Query mqtt_topic_msg_map:
> - {
> - "Items": \[
> - {
> "topic": { "S": "test" }, "MsgId": { "SS": \[
> }
> }
mqtt_retain stores retained messages:
{
"TableName": "mqtt_retain",
"KeySchema": [
{ "AttributeName": "topic", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "topic", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query mqtt_retain after a client publishes a message to the “test” topic:
{
"Items": [
{
"arrived": { "N": "1562312113" },
"qos": { "N": "1" },
"sender": { "S": "mqttjs_d0513acfce" },
"payload": { "S": "test" },
"retain": { "N": "1" },
"msgid": { "S": "Mjg4MTk1NzE3MTY4MjYxMjA5MDExMDg0NTk5ODgzMjAyNTH" },
"topic": { "S": "testtopic" }
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
DynamoDB Acknowledgement Table
mqtt_acked stores acknowledgements from the clients:
{
"TableName": "mqtt_acked",
"KeySchema": [
{ "AttributeName": "topic", "KeyType": "HASH" },
{ "AttributeName": "clientid", "KeyType": "RANGE" }
],
"AttributeDefinitions": [
{ "AttributeName": "topic", "AttributeType": "S" },
{ "AttributeName": "clientid", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
Query mqtt_acked after a client publishes a message to the “test” topic:
Enable DynamoDB Backend
./bin/emqx_ctl plugins load emqx_backend_dynamo