Cassandra 消息存储
EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 Cassandra规则引擎中创建 保存数据到 Cassandra
配置文件: emqx_backend_cassa.conf
支持配置多台Cassandra服务器连接池:
backend 消息存储规则包括:
自定义 CQL 语句 可用参数包括:
hook | 可用参数 | 示例(cql语句中${name} 表示可获取的参数) |
---|---|---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.deliver | msgid, topic, clientid | insert into deliver(msgid, topic) values(${msgid}, ${topic}) |
支持 CQL 语句配置:
考虑到用户的需求不同, backend cassandra 自带的函数无法满足用户需求, 用户可根据自己的需求配置 cql 语句
backend.cassa.hook.client.connected.3 = {"action": {"cql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}
Cassandra 创建一个 Keyspace
CREATE KEYSPACE mqtt WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
USE mqtt;
导入 Cassandra 表结构
cqlsh -e "SOURCE 'emqx_backend_cassa.cql'"
mqtt.client 存储设备在线状态:
CREATE TABLE mqtt.client (
clientid text PRIMARY KEY,
connected timestamp,
disconnected timestamp,
node text,
);
查询设备在线状态:
select * from mqtt.client where clientid = ${clientid};
例如 ClientId 为 test 的客户端上线:
例如ClientId为test客户端下线:
select * from mqtt.client where clientid = 'test';
clientid | connected | disconnected | node | state
-----------+---------------------------------+---------------------------------+-----------------+-------
Cassandra 用户订阅主题表(Sub Table)
mqtt.sub 存储订阅关系:
CREATE TABLE mqtt.sub (
clientid text,
topic text,
qos int,
PRIMARY KEY (clientid, topic)
);
用户test分别订阅主题test_topic1 test_topic2:
insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic1', 1);
insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic2', 2);
某个客户端订阅主题:
select * from mqtt_sub where clientid = ${clientid};
select * from mqtt_sub where clientid = 'test';
clientid | topic | qos
test | test_topic1 | 1
test | test_topic2 | 2
Cassandra 发布消息表(Msg Table)
mqtt.msg 存储MQTT消息:
查询某个客户端发布的消息:
select * from mqtt_msg where sender = ${clientid};
查询ClientId为’test’的客户端发布的消息:
topic | msgid | arrived | payload | qos | retain | sender
-------+----------------------+---------------------------------+--------------+-----+--------+--------
hello | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test
world | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test
mqtt.retain 存储 Retain 消息:
CREATE TABLE mqtt.retain (
topic text PRIMARY KEY,
msgid text
);
查询 retain 消息:
select * from mqtt_retain where topic = ${topic};
查询 topic 为 ‘t/retain’ 的 retain 消息:
select * from mqtt_retain where topic = 't/retain';
topic | msgid
--------+----------------------
retain | 2PguFrHsrzEvIIBdctmb
Cassandra 接收消息 ack 表(Message Acked Table)
mqtt.acked 存储客户端消息确认: