InfluxDB 消息存储

    EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 InfluxDB规则引擎中创建 保存数据到 InfluxDB

    EMQX 支持通过 UDP 协议连接 InfluxDB,需要修改 InfluxDB 配置文件:

    配置 InfluxDB 消息存储

    配置文件 etc/plugins/emqx_backend_influxdb.conf:

    1. backend.influxdb.pool1.common.write_protocol = udp
    2. ## 批量写入大小
    3. backend.influxdb.pool1.common.batch_size = 1000
    4. ## InfluxDB 写进程池大小
    5. backend.influxdb.pool1.pool_size = 8
    6. ## InfluxDB UDP 主机地址
    7. backend.influxdb.pool1.udp.host = 127.0.0.1
    8. ## InfluxDB UDP 主机端口
    9. backend.influxdb.pool1.udp.port = 8089
    10. ## InfluxDB HTTP/HTTPS 主机地址
    11. backend.influxdb.pool1.http.host = 127.0.0.1
    12. ## InfluxDB HTTP/HTTPS 主机端口
    13. backend.influxdb.pool1.http.port = 8086
    14. ## InflxuDB 数据库名
    15. backend.influxdb.pool1.http.database = mydb
    16. ## 连接到 InfluxDB 的用户名
    17. ## backend.influxdb.pool1.http.username = admin
    18. ## 连接到 InfluxDB 的密码
    19. ## backend.influxdb.pool1.http.password = public
    20. ## 时间戳精度
    21. backend.influxdb.pool1.http.precision = ms
    22. ## 是否启用 HTTPS
    23. backend.influxdb.pool1.http.https_enabled = false
    24. ## 连接 InfluxDB 时使用的 TLS 协议版本
    25. ## backend.influxdb.pool1.http.ssl.version = tlsv1.2
    26. ## 密钥文件
    27. ## backend.influxdb.pool1.http.ssl.keyfile =
    28. ## 证书文件
    29. ## backend.influxdb.pool1.http.ssl.certfile =
    30. ## backend.influxdb.pool1.http.ssl.cacertfile =
    31. ## 存储 PUBLISH 消息
    32. backend.influxdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

    InfluxDB Backend 消息存储规则参数:

    Example:

    1. ## 存储主题为 "sensor/#" 的 PUBLISH 消息
    2. backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
    3. ## 存储主题为 "stat/#" 的 PUBLISH 消息
    4. backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

    InfluxDB Backend 支持 Hook 与 相应内置函数列表:

    由于 MQTT Message 无法直接写入 InfluxDB, InfluxDB Backend 提供了 emqx_backend_influxdb.tmpl 模板文件将 MQTT Message 转换为可写入 InfluxDB 的 DataPoint。

    模板文件采用 JSON 格式, 组成部分:

    • key - MQTT Topic, 字符串, 支持通配符
    • value - Template, Json 对象, 用于将 MQTT Message 转换成 measurement,tag_key=tag_value,... field_key=field_value,... timestamp 的形式以写入 InfluxDB。

    你可以为不同 Topic 定义不同的 Template, 也可以为同一个 Topic 定义多个 Template, 类似:

    1. {
    2. "measurement": <Measurement>,
    3. "tags": {
    4. <Tag Key>: <Tag Value>
    5. },
    6. "fields": {
    7. <Field Key>: <Field Value>
    8. },
    9. "timestamp": <Timestamp>
    10. }

    measurementfields 为必选项, tagstimestamp 为可选项。

    所有的值 (例如 <Measurement>) 你都可以直接在 Template 中配置为一个固定值, 它支持的数据类型依赖于你定义的数据表。当然更符合实际情况的是,你可以通过我们提供的占位符来获取 MQTT 消息中的数据。

    目前我们支持的占位符如下:

    $payload 与 $<Number>:

    你可以直接使用 $payload 取得完整的消息载荷, 也可以通过 ["$payload", <Key>, ...] 取得消息载荷内部的数据。

    例如 payload{"data": {"temperature": 23.9}}, 你可以通过占位符 ["$payload", "data", "temperature"] 来获取其中的 23.9

    考虑到 Json 还有数组这一数据类型的情况, 我们引入了 $0$<pos_integer>, $0 表示获取数组内所有元素, $<pos_integer> 表示获取数组内第 <pos_integer> 个元素。

    一个简单例子, ["$payload", "$0", "temp"] 将从 [{"temp": 20}, {"temp": 21}] 中取得 [20, 21], 而 ["$payload", "$1", "temp"] 将只取得 20

    image

    值得注意的是, 当你使用 $0 时,我们希望你取得的数据个数都是相等的。因为我们需要将这些数组转换为多条记录写入 InfluxDB, 而当你一个字段取得了 3 份数据, 另一个字段却取得了 2 份数据, 我们将无从判断应当怎样为你组合这些数据。

    Example

    data/templates 目录下提供了一个示例模板 (emqx_backend_influxdb_example.tmpl, 正式使用时请去掉文件名中的 “_example” 后缀) 供用户参考:

    1. "sample": {
    2. "measurement": "$topic",
    3. "tags": {
    4. "host": ["$payload", "data", "$0", "host"],
    5. "region": ["$payload", "data", "$0", "region"],
    6. "clientid": "$clientid"
    7. },
    8. "fields": {
    9. "temperature": ["$payload", "data", "$0", "temp"]
    10. },
    11. "timestamp": "$timestamp"
    12. }
    13. }

    提示

    当 Template 中设置 timestamp 或插件配置 backend.influxdb.pool1.set_timestamp = true 时,请将 InfluxDB UDP 配置中的 precision 设为 “ms”。

    当 Topic 为 “sample” 的 MQTT Message 拥有以下 Payload 时:

    Backend 会将 MQTT Message 转换为:

    1. [
    2. {
    3. "measurement": "sample",
    4. "tags": {
    5. "clientid": "mqttjs_ebcc36079a",
    6. "host": "serverA",
    7. "qos": "0",
    8. "region": "hangzhou",
    9. },
    10. "fields": {
    11. "temperature": "1"
    12. },
    13. "timestamp": "1560743513626681000"
    14. },
    15. {
    16. "measurement": "sample",
    17. "tags": {
    18. "clientid": "mqttjs_ebcc36079a",
    19. "host": "serverB",
    20. "qos": "0",
    21. "region": "ningbo",
    22. },
    23. "fields": {
    24. "temperature": "2"
    25. },
    26. "timestamp": "1560743513626681000"
    27. }
    28. ]
    1. "sample,clientid=mqttjs_6990f0e886,host=serverA,qos=0,region=hangzhou temperature=\"1\" 1560745505429670000\nsample,clientid=mqttjs_6990f0e886,host=serverB,qos=0,region=ningbo temperature=\"2\" 1560745505429670000\n"

    启用 InfluxDB 消息存储: