Timescale 消息存储

    EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 Timescale规则引擎中创建 保存数据到 Timescale

    etc/plugins/emqx_backend_timescale.conf:

    Timescale Backend 消息存储规则参数:

    Example:

    1. backend.timescale.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
    2. ## Store Publish message with "stat/#" topic
    3. backend.timescale.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

    Timescale Backend 支持 Hook 与 相应内置函数列表:

    Timescale Backend 提供 emqx_backend_timescale.tmpl 模板文件,用于从不同主题的 MQTT Message 中提取数据以写入 Timescale。

    模板文件采用 Json 格式, 组成部分:

    • key - MQTT Topic, 字符串, 支持通配符主题
    • value - Template, Json 对象, 用于将 MQTT Message 转换成 measurement,tag_key=tag_value,... field_key=field_value,... timestamp 的形式以写入 InfluxDB。

    你可以为不同 Topic 定义不同的 Template, 也可以为同一个 Topic 定义多个 Template, 类似:

    1. {
    2. "name": <Name of template>,
    3. "param_keys": <Param Keys>
    4. }

    name, sqlparam_keys 都是必选项。

    name 可以是任意的字符串,确保没有重复即可。

    sql 为 Timescale 可用的 SQL INSERT INTO 语句,例如:insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)

    是一个数组,它的第一个元素对应 sql 中出现的 $1,并以此类推。

    数组中任意元素都可以是一个固定值, 它支持的数据类型依赖于你定义的数据表。当然更符合实际情况的是,你可以通过我们提供的占位符来获取 MQTT 消息中的数据。

    目前我们支持的占位符如下:

    $payload 与 $<Number>:

    你可以直接使用 $payload 取得完整的消息载荷, 也可以通过 ["$payload", <Key>, ...] 取得消息载荷内部的数据。

    考虑到 Json 还有数组这一数据类型的情况, 我们引入了 $0$<pos_integer>, $0 表示获取数组内所有元素, $<pos_integer> 表示获取数组内第 <pos_integer> 个元素。

    一个简单例子, ["$payload", "$0", "temp"] 将从 [{"temp": 20}, {"temp": 21}] 中取得 [20, 21], 而 ["$payload", "$1", "temp"] 将只取得 20

    值得注意的是, 当你使用 时,我们希望你取得的数据个数都是相等的。因为我们需要将这些数组转换为多条记录写入 Timescale, 而当你一个字段取得了 3 份数据, 另一个字段却取得了 2 份数据, 我们将无从判断应当怎样为你组合这些数据。

    Example

    data/templates 目录下提供了一个示例模板 (emqx_backend_timescale_example.tmpl, 正式使用时请去掉文件名中的 “_example” 后缀) 供用户参考:

    当 Topic 为 “sensor_data” 的 MQTT Message 拥有以下 Payload 时:

    1. {
    2. "data":[
    3. {
    4. "location":"bedroom",
    5. "temperature":21.3,
    6. "humidity":40.3
    7. },
    8. {
    9. "temperature":22.3,
    10. "humidity":61.8
    11. },
    12. {
    13. "location":"kitchen",
    14. "temperature":29.5,
    15. "humidity":58.7
    16. }
    17. ]

    [“$payload”, “data”, “$0”, “location”] 会先获取 MQTT Message 的 Payload,如果 Payload 为 json 格式,则继续尝试读取 data。data 的值是数组,这里我们用到了 “$0” 表示获取数组中所有的元素,因此 [“$payload”, “data”, “$0”, “location”] 将帮我们获得 [“bedroom”, “bathroom”, “kitchen”]。相应的,如果将 “$0” 替换为 “$1”,将只获得 [“bedroom”]。相应的,如果将

    那么在这个场景中,我们将得到以下 SQL 语句: