MySQL Backend

    After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to MySQL to setup Save data to MySQL in rule engine.

    Config file: emqx_backend_mysql.conf

    Connection pool of multiple MySQL servers is supported:

    Configure MySQL Persistence Hooks

    1. backend.mysql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
    2. ## Subscribe Lookup Record
    3. backend.mysql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
    4. ## Client DisConnected Record
    5. backend.mysql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
    6. ## Lookup Unread Message QOS > 0
    7. backend.mysql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
    8. ## Lookup Retain Message
    9. backend.mysql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
    10. ## Store Publish Message QOS > 0
    11. backend.mysql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
    12. ## Store Retain Message
    13. backend.mysql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
    14. ## Delete Retain Message
    15. backend.mysql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
    16. ## Store Ack
    17. backend.mysql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
    18. ## Get offline messages
    19. ### "offline_opts": Get configuration for offline messages
    20. ### max_returned_count: Maximum number of offline messages get at a time
    21. ### time_range: Get only messages in the current time range
    22. ## backend.mysql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
    23. ## If you need to store Qos0 messages, you can enable the following configuration
    24. ## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
    25. ## backend.mysql.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

    Description of MySQL Persistence Hooks

    SQL Parameters Description

    hookParametersExample (${name} represents available parameter)
    client.connectedclientidinsert into conn(clientid) values(${clientid})
    client.disconnectedclientidinsert into disconn(clientid) values(${clientid})
    session.subscribedclientid, topic, qosinsert into sub(topic, qos) values(${topic}, ${qos})
    session.unsubscribedclientid, topicdelete from sub where topic = ${topic}
    message.publishmsgid, topic, payload, qos, clientidinsert into msg(msgid, topic) values(${msgid}, ${topic})
    message.ackedmsgid, topic, clientidinsert into ack(msgid, topic) values(${msgid}, ${topic})
    message.deliveredmsgid, topic, clientidinsert into delivered(msgid, topic) values(${msgid}, ${topic})

    MySQL backend supports SQL in ‘action’:

    1. ## After a client is connected to the EMQX server, it executes a SQL command (multiple SQL commands also supported)

    Create MySQL DB

    1. create database mqtt;

    Import MySQL DB & Table Schema

    1. mysql -u root -p mqtt < etc/sql/emqx_backend_mysql.sql

    TIP

    DB name is free of choice

    MySQL Client Connection Table

    1. DROP TABLE IF EXISTS `mqtt_client`;
    2. CREATE TABLE `mqtt_client` (
    3. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    4. `clientid` varchar(64) DEFAULT NULL,
    5. `state` varchar(3) DEFAULT NULL,
    6. `node` varchar(100) DEFAULT NULL,
    7. `offline_at` datetime DEFAULT NULL,
    8. `created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
    9. PRIMARY KEY (`id`),
    10. KEY `mqtt_client_idx` (`clientid`),
    11. UNIQUE KEY `mqtt_client_key` (`clientid`)
    12. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    Query the client connection state:

    1. select * from mqtt_client where clientid = ${clientid};

    If client ‘test’ is online:

    If client ‘test’ is offline:

    1. select * from mqtt_client where clientid = "test";
    2. +----+----------+-------+----------------+---------------------+---------------------+---------------------+
    3. | id | clientid | state | node | online_at | offline_at | created |
    4. +----+----------+-------+----------------+---------------------+---------------------+---------------------+
    5. | 1 | test | 0 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22 |
    6. +----+----------+-------+----------------+---------------------+---------------------+---------------------+
    7. 1 rows in set (0.00 sec)

    mqtt_sub table stores MQTT subscriptions of clients:

    1. DROP TABLE IF EXISTS `mqtt_sub`;
    2. CREATE TABLE `mqtt_sub` (
    3. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    4. `clientid` varchar(64) DEFAULT NULL,
    5. `topic` varchar(255) DEFAULT NULL,
    6. `qos` int(3) DEFAULT NULL,
    7. `created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
    8. PRIMARY KEY (`id`),
    9. KEY `mqtt_sub_idx` (`clientid`,`topic`(255),`qos`),
    10. UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`)
    11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    E.g., client ‘test’ subscribes to ‘test_topic1’ and ‘test_topic2’:

    1. insert into mqtt_sub(clientid, topic, qos) values("test", "test_topic1", 1);
    2. insert into mqtt_sub(clientid, topic, qos) values("test", "test_topic2", 2);

    Query subscription of a client:

    1. select * from mqtt_sub where clientid = ${clientid};
    1. select * from mqtt_sub where clientid = "test";
    2. +----+--------------+-------------+------+---------------------+
    3. | id | clientId | topic | qos | created |
    4. +----+--------------+-------------+------+---------------------+
    5. | 1 | test | test_topic1 | 1 | 2016-12-24 17:09:05 |
    6. | 2 | test | test_topic2 | 2 | 2016-12-24 17:12:51 |
    7. +----+--------------+-------------+------+---------------------+
    8. 2 rows in set (0.00 sec)

    MySQL Message Table

    mqtt_msg stores MQTT messages:

    1. DROP TABLE IF EXISTS `mqtt_msg`;
    2. CREATE TABLE `mqtt_msg` (
    3. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    4. `topic` varchar(1024) NOT NULL,
    5. `sender` varchar(1024) DEFAULT NULL,
    6. `node` varchar(60) DEFAULT NULL,
    7. `retain` tinyint(2) DEFAULT NULL,
    8. `payload` blob,
    9. `arrived` datetime NOT NULL,
    10. PRIMARY KEY (`id`)
    11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    Query messages published by a client:

    Query messages published by client ‘test’:

    1. select * from mqtt_msg where sender = "test";
    2. +----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
    3. | id | msgid | topic | sender | node | qos | retain | payload | arrived |
    4. +----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
    5. | 1 | 53F98F80F66017005000004A60003 | hello | test | NULL | 1 | 0 | hello | 2016-12-24 17:25:12 |
    6. | 2 | 53F98F9FE42AD7005000004A60004 | world | test | NULL | 1 | 0 | world | 2016-12-24 17:25:45 |
    7. +----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
    8. 2 rows in set (0.00 sec)

    MySQL Retained Message Table

    mqtt_retain stores retained messages:

    1. DROP TABLE IF EXISTS `mqtt_retain`;
    2. CREATE TABLE `mqtt_retain` (
    3. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    4. `topic` varchar(200) DEFAULT NULL,
    5. `msgid` varchar(60) DEFAULT NULL,
    6. `sender` varchar(100) DEFAULT NULL,
    7. `node` varchar(100) DEFAULT NULL,
    8. `qos` int(2) DEFAULT NULL,
    9. `payload` blob,
    10. `arrived` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
    11. PRIMARY KEY (`id`),
    12. UNIQUE KEY `mqtt_retain_key` (`topic`)
    13. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    Query retained messages:

    1. select * from mqtt_retain where topic = ${topic};

    Query retained messages with topic ‘retain’:

    1. select * from mqtt_retain where topic = "retain";
    1. +----+----------+-------------------------------+---------+------+------+---------+---------------------+
    2. | id | topic | msgid | sender | node | qos | payload | arrived |
    3. +----+----------+-------------------------------+---------+------+------+---------+---------------------+
    4. | 1 | retain | 53F33F7E4741E7007000004B70001 | test | NULL | 1 | www | 2016-12-24 16:55:18 |
    5. +----+----------+-------------------------------+---------+------+------+---------+---------------------+
    6. > 1 rows in set (0.00 sec)

    MySQL Acknowledgement Table

    1. DROP TABLE IF EXISTS `mqtt_acked`;
    2. CREATE TABLE `mqtt_acked` (
    3. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    4. `clientid` varchar(200) DEFAULT NULL,
    5. `topic` varchar(200) DEFAULT NULL,
    6. `mid` int(200) DEFAULT NULL,
    7. `created` timestamp NULL DEFAULT NULL,
    8. PRIMARY KEY (`id`),
    9. UNIQUE KEY `mqtt_acked_key` (`clientid`,`topic`)