Cassandra Backend

    After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to Cassandra to setup Save data to Cassandra in rule engine.

    Config file: etc/plugins/emqx_backend_cassa.conf

    Multi node Cassandra cluster is supported:

    Configure Cassandra Persistence Hooks

    1. backend.cassa.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
    2. ## Subscribe Lookup Record
    3. backend.cassa.hook.client.connected.2 = {"action": {"function": "on_subscription_lookup"}, "pool": "pool1"}
    4. ## Client DisConnected Record
    5. backend.cassa.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
    6. ## Lookup Unread Message QOS > 0
    7. backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
    8. ## Lookup Retain Message
    9. backend.cassa.hook.session.subscribed.2 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
    10. ## Store Publish Message QOS > 0
    11. backend.cassa.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
    12. ## Delete Acked Record
    13. backend.cassa.hook.session.unsubscribed.1= {"topic": "#", "action": {"cql": ["delete from acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
    14. ## Store Retain Message
    15. backend.cassa.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
    16. ## Delete Retain Message
    17. backend.cassa.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
    18. backend.cassa.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
    19. ## Get offline messages
    20. ### "offline_opts": Get configuration for offline messages
    21. ### max_returned_count: Maximum number of offline messages get at a time
    22. ### time_range: Get only messages in the current time range
    23. ## backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
    24. ## If you need to store Qos0 messages, you can enable the following configuration
    25. ## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
    26. ## backend.cassa.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

    Description of Cassandra Persistence Hooks

    CQL Parameters Description

    Customized CQL command parameters includes:

    hookParameterExample (${name} in CQL represents available parameter
    client.connectedclientidinsert into conn(clientid) values(${clientid})
    client.disconnectedclientidinsert into disconn(clientid) values(${clientid})
    session.subscribedclientid, topic, qosinsert into sub(topic, qos) values(${topic}, ${qos})
    session.unsubscribedclientid, topicdelete from sub where topic = ${topic}
    message.publishmsgid, topic, payload, qos, clientidinsert into msg(msgid, topic) values(${msgid}, ${topic})
    message.ackedmsgid, topic, clientidinsert into ack(msgid, topic) values(${msgid}, ${topic})
    message.deliveredmsgid, topic, clientidinsert into delivered(msgid, topic) values(${msgid}, ${topic})

    Cassandra backend supports CLQ in ‘action’:

    1. ## After a client is connected to the EMQX server, it executes a CQL command(multiple command also supported):
    2. backend.cassa.hook.client.connected.3 = {"action": {"cql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

    Initializing Cassandra

    Create KeySpace:

    1. CREATE KEYSPACE mqtt WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
    2. USE mqtt;

    Import Cassandra tables:

    1. cqlsh -e "SOURCE 'emqx_backend_cassa.cql'"

    KeySpace is free of choice

    Cassandra Client Connection Table

    mqtt.client stores client connection states:

    1. CREATE TABLE mqtt.client (
    2. clientid text,
    3. node text,
    4. state int,
    5. connected timestamp,
    6. disconnected timestamp,
    7. PRIMARY KEY(clientid)
    8. );

    Query a client’s connection state:

    1. select * from mqtt.client where clientid = ${clientid};

    If client ‘test’ is online:

    Client ‘test’ is offline:

    1. select * from mqtt.client where clientid = 'test';
    2. clientid | connected | disconnected | node | state
    3. -----------+---------------------------------+---------------------------------+---------------+-------
    4. test | 2017-02-14 08:27:29.872000+0000 | 2017-02-14 08:27:35.872000+0000 | emqx@127.0.0.1| 0

    Cassandra Subscription Table

    mqtt.sub stores subscriptions of clients:

    1. CREATE TABLE mqtt.sub (
    2. clientid text,
    3. topic text,
    4. qos int,
    5. PRIMARY KEY(clientid, topic)
    6. );

    Client ‘test’ subscribes to topic ‘test_topic1’ and ‘test_topic2’:

    1. insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic1', 1);
    2. insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic2', 2);
    1. select * from mqtt_sub where clientid = ${clientid};

    Query subscriptions of client ‘test’:

    1. clientid | topic | qos
    2. test | test_topic1 | 1
    3. test | test_topic2 | 2

    mqtt.msg stores MQTT messages:

    1. CREATE TABLE mqtt.msg (
    2. topic text,
    3. msgid text,
    4. sender text,
    5. qos int,
    6. retain int,
    7. payload blob,
    8. arrived timestamp,
    9. PRIMARY KEY(topic, msgid)
    10. ) WITH CLUSTERING ORDER BY (msgid DESC);

    Query messages published by a client:

    Query messages published by client ‘test’:

    1. select * from mqtt_msg where sender = 'test';
    2. topic | msgid | arrived | payload | qos | retain | sender
    3. -------+----------------------+---------------------------------+--------------+-----+--------+--------
    4. hello | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test
    5. world | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test

    Cassandra Retained Message Table

    mqtt.retain stores retained messages:

    1. CREATE TABLE mqtt.retain (
    2. topic text,
    3. msgid text,
    4. PRIMARY KEY(topic)
    5. );

    Query retained messages:

    1. select * from mqtt_retain where topic = ${topic};

    Query retained messages with topic ‘retain’:

    1. select * from mqtt_retain where topic = 'retain';
    2. topic | msgid
    3. --------+----------------------
    4. retain | 2PguFrHsrzEvIIBdctmb

    Cassandra Acknowledgement Table

    1. CREATE TABLE mqtt.acked (
    2. clientid text,
    3. topic text,
    4. msgid text,
    5. PRIMARY KEY(clientid, topic)

    Enable Cassandra Backend

    1. ./bin/emqx_ctl plugins load emqx_backend_cassa