Cassandra 消息存储

    EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 Cassandra规则引擎中创建 保存数据到 Cassandra

    配置文件: emqx_backend_cassa.conf

    支持配置多台Cassandra服务器连接池:

    backend 消息存储规则包括:

    自定义 CQL 语句 可用参数包括:

    hook可用参数示例(cql语句中${name} 表示可获取的参数)
    client.connectedclientidinsert into conn(clientid) values(${clientid})
    client.disconnectedclientidinsert into disconn(clientid) values(${clientid})
    session.subscribedclientid, topic, qosinsert into sub(topic, qos) values(${topic}, ${qos})
    session.unsubscribedclientid, topicdelete from sub where topic = ${topic}
    message.publishmsgid, topic, payload, qos, clientidinsert into msg(msgid, topic) values(${msgid}, ${topic})
    message.ackedmsgid, topic, clientidinsert into ack(msgid, topic) values(${msgid}, ${topic})
    message.delivermsgid, topic, clientidinsert into deliver(msgid, topic) values(${msgid}, ${topic})

    支持 CQL 语句配置:

    考虑到用户的需求不同, backend cassandra 自带的函数无法满足用户需求, 用户可根据自己的需求配置 cql 语句

    1. backend.cassa.hook.client.connected.3 = {"action": {"cql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

    Cassandra 创建一个 Keyspace

    1. CREATE KEYSPACE mqtt WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
    2. USE mqtt;

    导入 Cassandra 表结构

    1. cqlsh -e "SOURCE 'emqx_backend_cassa.cql'"

    mqtt.client 存储设备在线状态:

    1. CREATE TABLE mqtt.client (
    2. clientid text PRIMARY KEY,
    3. connected timestamp,
    4. disconnected timestamp,
    5. node text,
    6. );

    查询设备在线状态:

    1. select * from mqtt.client where clientid = ${clientid};

    例如 ClientId 为 test 的客户端上线:

    例如ClientId为test客户端下线:

    1. select * from mqtt.client where clientid = 'test';
    2. clientid | connected | disconnected | node | state
    3. -----------+---------------------------------+---------------------------------+-----------------+-------

    Cassandra 用户订阅主题表(Sub Table)

    mqtt.sub 存储订阅关系:

    1. CREATE TABLE mqtt.sub (
    2. clientid text,
    3. topic text,
    4. qos int,
    5. PRIMARY KEY (clientid, topic)
    6. );

    用户test分别订阅主题test_topic1 test_topic2:

    1. insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic1', 1);
    2. insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic2', 2);

    某个客户端订阅主题:

    1. select * from mqtt_sub where clientid = ${clientid};
    1. select * from mqtt_sub where clientid = 'test';
    2. clientid | topic | qos
    3. test | test_topic1 | 1
    4. test | test_topic2 | 2

    Cassandra 发布消息表(Msg Table)

    mqtt.msg 存储MQTT消息:

    查询某个客户端发布的消息:

    1. select * from mqtt_msg where sender = ${clientid};

    查询ClientId为’test’的客户端发布的消息:

    1. topic | msgid | arrived | payload | qos | retain | sender
    2. -------+----------------------+---------------------------------+--------------+-----+--------+--------
    3. hello | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test
    4. world | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test

    mqtt.retain 存储 Retain 消息:

    1. CREATE TABLE mqtt.retain (
    2. topic text PRIMARY KEY,
    3. msgid text
    4. );

    查询 retain 消息:

    1. select * from mqtt_retain where topic = ${topic};

    查询 topic 为 ‘t/retain’ 的 retain 消息:

    1. select * from mqtt_retain where topic = 't/retain';
    2. topic | msgid
    3. --------+----------------------
    4. retain | 2PguFrHsrzEvIIBdctmb

    Cassandra 接收消息 ack 表(Message Acked Table)

    mqtt.acked 存储客户端消息确认:

    启用 Cassandra 存储插件