PostgreSQL Backend

    After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to PostgreSQL to setup Save data to PostgreSQL in rule engine.

    Config file: emqx_backend_pgsql.conf

    TIP

    Support PostgreSQL 13 and below versions

    Connection pool of multiple PostgreSQL servers is supported:

    Configure PostgreSQL Persistence Hooks

    1. backend.pgsql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
    2. ## Subscribe Lookup Record
    3. backend.pgsql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
    4. ## Client DisConnected Record
    5. backend.pgsql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
    6. ## Lookup Unread Message QOS > 0
    7. backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
    8. ## Lookup Retain Message
    9. backend.pgsql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
    10. ## Store Publish Message QOS > 0
    11. backend.pgsql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
    12. ## Store Retain Message
    13. backend.pgsql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
    14. ## Delete Retain Message
    15. backend.pgsql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
    16. ## Store Ack
    17. backend.pgsql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
    18. ## Get offline messages
    19. ### "offline_opts": Get configuration for offline messages
    20. ### max_returned_count: Maximum number of offline messages get at a time
    21. ## backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
    22. ## If you need to store Qos0 messages, you can enable the following configuration
    23. ## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
    24. ## backend.pgsql.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

    Description of PostgreSQL Persistence Hooks

    SQL Parameters Description

    hookParametersExample (${name} represents available parameter)
    client.connectedclientidinsert into conn(clientid) values(${clientid})
    client.disconnectedclientidinsert into disconn(clientid) values(${clientid})
    session.subscribedclientid, topic, qosinsert into sub(topic, qos) values(${topic}, ${qos})
    session.unsubscribedclientid, topicdelete from sub where topic = ${topic}
    message.publishmsgid, topic, payload, qos, clientidinsert into msg(msgid, topic) values(${msgid}, ${topic})
    message.ackedmsgid, topic, clientidinsert into ack(msgid, topic) values(${msgid}, ${topic})
    message.deliveredmsgid, topic, clientidinsert into delivered(msgid, topic) values(${msgid}, ${topic})

    PostgreSQL backend supports SQL in ‘action’:

    1. ## After a client is connected to the EMQX server, it executes a SQL command (multiple command also supported)
    2. backend.pgsql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

    Create PostgreSQL DB

    1. createdb mqtt -E UTF8 -e

    Import PostgreSQL DB & Table Schema

    TIP

    PostgreSQL Client Connection Table

    mqtt_client stores client connection states:

    1. CREATE TABLE mqtt_client(
    2. id SERIAL primary key,
    3. clientid character varying(100),
    4. state integer,
    5. node character varying(100),
    6. online_at timestamp,
    7. offline_at timestamp,
    8. created timestamp without time zone,
    9. UNIQUE (clientid)
    10. );

    Query a client’s connection state:

    1. select * from mqtt_client where clientid = ${clientid};

    E.g., if client ‘test’ is online:

    Client ‘test’ is offline:

    1. select * from mqtt_client where clientid = 'test';
    2. id | clientid | state | nod | online_at | offline_at | created
    3. ----+----------+-------+----------------+---------------------+---------------------+---------------------
    4. 1 | test | 0 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22
    5. (1 rows)

    mqtt_sub stores subscriptions of clients:

    1. CREATE TABLE mqtt_sub(
    2. id SERIAL primary key,
    3. clientid character varying(100),
    4. topic character varying(200),
    5. qos integer,
    6. created timestamp without time zone,
    7. UNIQUE (clientid, topic)
    8. );

    E.g., client ‘test’ subscribes to topic ‘test_topic1’ and ‘test_topic2’:

    1. insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic1', 1);
    2. insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic2', 2);

    Query subscription of a client:

    1. select * from mqtt_sub where clientid = ${clientid};
    1. select * from mqtt_sub where clientid = 'test';
    2. id | clientId | topic | qos | created
    3. ----+--------------+-------------+------+---------------------
    4. 1 | test | test_topic1 | 1 | 2016-12-24 17:09:05
    5. 2 | test | test_topic2 | 2 | 2016-12-24 17:12:51

    PostgreSQL Message Table

    mqtt_msg stores MQTT messages:

    1. CREATE TABLE mqtt_msg (
    2. id SERIAL primary key,
    3. msgid character varying(60),
    4. topic character varying(200),
    5. qos integer,
    6. retain integer,
    7. payload text,
    8. arrived timestamp without time zone
    9. );

    Query messages published by a client:

    Query messages published by client ‘test’:

    1. select * from mqtt_msg where sender = 'test';
    2. id | msgid | topic | sender | node | qos | retain | payload | arrived
    3. ----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------
    4. 1 | 53F98F80F66017005000004A60003 | hello | test | NULL | 1 | 0 | hello | 2016-12-24 17:25:12
    5. 2 | 53F98F9FE42AD7005000004A60004 | world | test | NULL | 1 | 0 | world | 2016-12-24 17:25:45
    6. (2 rows)

    PostgreSQL Retained Message Table

    mqtt_retain stores retained messages:

    1. CREATE TABLE mqtt_retain(
    2. id SERIAL primary key,
    3. topic character varying(200),
    4. msgid character varying(60),
    5. sender character varying(100),
    6. qos integer,
    7. payload text,
    8. arrived timestamp without time zone,
    9. UNIQUE (topic)
    10. );

    Query retained messages:

    1. select * from mqtt_retain where topic = ${topic};

    Query retained messages with topic ‘retain’:

    1. select * from mqtt_retain where topic = 'retain';
    2. id | topic | msgid | sender | node | qos | payload | arrived
    3. ----+----------+-------------------------------+---------+------+------+---------+---------------------
    4. 1 | retain | 53F33F7E4741E7007000004B70001 | test | NULL | 1 | www | 2016-12-24 16:55:18
    5. (1 rows)

    PostgreSQL Acknowledgement Table

    mqtt_acked stores acknowledgements from the clients:

    1. CREATE TABLE mqtt_acked (
    2. id SERIAL primary key,
    3. clientid character varying(100),
    4. topic character varying(100),
    5. mid integer,
    6. created timestamp without time zone,
    7. UNIQUE (clientid, topic)
    8. );