规则引擎 SQL 语句中可用的字段

    规则引擎的 SQL 语句可以处理消息发布。 在一个规则语句中,用户可以用 FROM 子句指定一个或者多个主题,当任何消息发布到指定的主题时都会触发该规则。

    示例

    输出

    1. {
    2. "username": "u_emqx",
    3. "topic": "t/a",
    4. "qos": 1,
    5. "payload": "{\"msg\":\"hello\"}",
    6. "msg": "hello",
    7. "clientid": "c_emqx"
    8. }

    使用规则引擎 SQL 语句处理事件

    规则引擎的 SQL 语句既可以处理消息(消息发布),也可以处理事件(客户端上下线、客户端订阅等)。对于消息,FROM 子句后面直接跟主题名;对于事件,FROM 子句后面跟事件主题。

    事件消息的主题以 "$events/" 开头,比如 "$events/client_connected", "$events/session_subscribed"。 如果想让 emqx 将事件消息发布出来,可以在 emqx_rule_engine.conf 文件中配置。

    事件主题名释义
    $events/message_delivered消息投递
    $events/message_acked消息确认
    $events/message_dropped消息丢弃
    $events/client_connected连接完成
    $events/client_disconnected连接断开
    $events/client_connack连接确认
    $events/client_check_acl_complete鉴权结果
    $events/session_subscribed订阅
    $events/session_unsubscribed取消订阅

    $events/message_delivered (消息投递)

    当消息被放入底层socket时触发规则

    字段解释
    idMQTT 消息 ID
    from_clientid消息来源 Client ID
    from_username消息来源用户名
    clientid消息目的 Client ID
    username消息目的用户名
    payloadMQTT 消息体
    peerhost客户端的 IPAddress
    topicMQTT 主题
    qosMQTT 消息的 QoS
    flagsMQTT 消息的 Flags
    pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
    timestamp事件触发时间 (ms)
    publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
    node事件触发所在节点

    示例

    1. SELECT
    2. from_clientid,
    3. from_username,
    4. topic,
    5. qos,
    6. node,
    7. timestamp
    8. FROM
    9. "$events/message_delivered"

    输出

    1. {
    2. "topic": "t/a",
    3. "timestamp": 1645002753259,
    4. "qos": 1,
    5. "node": "emqx@127.0.0.1",
    6. "from_username": "u_emqx_1",
    7. "from_clientid": "c_emqx_1"
    8. }

    $events/message_acked (消息确认)

    当消息发送到客户端,并收到客户端回复的ack时触发规则,仅QOS1,QOS2会触发

    字段解释
    idMQTT 消息 ID
    from_clientid消息来源 Client ID
    from_username消息来源用户名
    clientid消息目的 Client ID
    username消息目的用户名
    payloadMQTT 消息体
    peerhost客户端的 IPAddress
    topicMQTT 主题
    qosMQTT 消息的 QoS
    flagsMQTT 消息的 Flags
    pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
    puback_propsPUBACK Properties (仅适用于 MQTT 5.0)
    timestamp事件触发时间 (ms)
    publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
    node事件触发所在节点

    示例

    1. SELECT
    2. from_clientid,
    3. from_username,
    4. topic,
    5. qos,
    6. node,
    7. timestamp
    8. FROM
    9. "$events/message_acked"

    输出

    1. {
    2. "topic": "t/a",
    3. "qos": 1,
    4. "node": "emqx@127.0.0.1",
    5. "from_username": "u_emqx_1",
    6. "from_clientid": "c_emqx_1"

    当一条消息无任何订阅者时触发规则

    1. SELECT
    2. reason,
    3. topic,
    4. qos,
    5. node,
    6. timestamp
    7. FROM
    8. "$events/message_dropped"

    输出

    $events/delivery_dropped (消息在投递的过程中被丢弃)

    当订阅者的消息队列已满时触发规则

    字段解释
    idMQTT 消息 ID
    reason消息丢弃原因,可能的原因:
    queue_full: 消息队列已满(QoS>0)
    no_local: 不允许客户端接收自己发布的消息
    expired: 消息或者会话过期
    qos0_msg: QoS0 的消息因为消息队列已满被丢弃
    from_clientid消息来源 Client ID
    from_username消息来源用户名
    clientid消息目的 Client ID
    username消息目的用户名
    payloadMQTT 消息体
    peerhost客户端的 IPAddress
    topicMQTT 主题
    qosMQTT 消息的 QoS
    flagsMQTT 消息的 Flags
    pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
    timestamp事件触发时间 (ms)
    publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
    node事件触发所在节点

    示例

    1. SELECT
    2. from_clientid,
    3. from_username,
    4. reason,
    5. topic,
    6. qos
    7. FROM "$events/delivery_dropped"

    输出

    1. {
    2. "topic": "t/a",
    3. "reason": "queue_full",
    4. "qos": 1,
    5. "from_username": "u_emqx_1",
    6. "from_clientid": "c_emqx_1"
    7. }

    $events/client_connected (终端连接成功)

    当终端连接成功时触发规则

    字段解释
    clientid消息目的 Client ID
    username消息目的用户名
    mountpoint主题挂载点(主题前缀)
    peername终端的 IPAddress 和 Port
    socknameemqx 监听的 IPAddress 和 Port
    proto_name协议名字
    proto_ver协议版本
    keepaliveMQTT 保活间隔
    clean_startMQTT clean_start
    expiry_intervalMQTT Session 过期时间
    is_bridge是否为 MQTT bridge 连接
    connected_at终端连接完成时间 (s)
    conn_propsCONNECT Properties (仅适用于 MQTT 5.0)
    timestamp事件触发时间 (ms)
    node事件触发所在节点

    示例

    1. SELECT
    2. clientid,
    3. username,
    4. keepalive,
    5. is_bridge
    6. FROM
    7. "$events/client_connected"

    输出

    1. {
    2. "username": "u_emqx",
    3. "keepalive": 60,
    4. "is_bridge": false,
    5. "clientid": "c_emqx"
    6. }

    当终端连接断开时触发规则

    字段解释
    reason终端连接断开原因:
    normal:客户端主动断开
    kicked:服务端踢出,通过 REST API
    keepalive_timeout: keepalive 超时
    not_authorized: 认证失败,或者 acl_nomatch = disconnect 时没有权限的 Pub/Sub 会主动断开客户端
    tcp_closed: 对端关闭了网络连接
    internal_error: 畸形报文或其他未知错误
    clientid消息目的 Client ID
    username消息目的用户名
    peername终端的 IPAddress 和 Port
    socknameemqx 监听的 IPAddress 和 Port
    disconnected_at终端连接断开时间 (s)
    disconn_propsDISCONNECT Properties (仅适用于 MQTT 5.0)
    timestamp事件触发时间 (ms)
    node事件触发所在节点

    示例

    1. SELECT
    2. clientid,
    3. username,
    4. reason,
    5. disconnected_at,
    6. node
    7. FROM
    8. "$events/client_disconnected"

    输出

    1. {
    2. "reason": "normal",
    3. "node": "emqx@127.0.0.1",
    4. "clientid": "c_emqx"
    5. }

    $events/client_connack (连接确认)

    当服务端向客户端发送CONNACK报文时触发规则, reason_code 包含各种错误原因代码

    MQTT v5.0 协议将返回码重命名为原因码,增加了一个原因码来指示更多类型的错误(Reason code and ACK - MQTT 5.0 new features (opens new window))。 因此reason_code 在MQTT v3.1.1与MQTT v5.0中有很大的不同。

    reason_code描述
    connection_accepted已接受连接
    unacceptable_protocol_version服务器不支持客户端请求的 MQTT 协议
    client_identifier_not_valid客户端 ID 是正确的 UTF-8 字符串,但服务器不允许
    server_unavaliable网络连接已建立,但 MQTT 服务不可用
    malformed_username_or_password用户名或密码中的数据格式错误
    unauthorized_client客户端连接未授权

    MQTT v5.0

    reason_code描述
    success连接成功
    unspecified_error未指定的错误
    malformed_packet畸形数据包
    protocol_error协议错误
    implementation_specific_error实现特定错误
    unsupported_protocol_version不支持的协议版本
    client_identifier_not_valid客户端标识符无效
    bad_username_or_password错误的用户名或密码
    not_authorized未经授权
    server_unavailable服务器无法使用
    server_busy服务器繁忙
    banned禁止访问
    bad_authentication_method错误的身份验证方法
    topic_name_invalid主题名称无效
    packet_too_large数据包太大
    quota_exceeded超出配额
    retain_not_supported不支持的retain
    qos_not_supported不支持的qos
    use_another_server使用另一台服务器
    server_moved服务器迁移了
    connection_rate_exceeded超出连接速率

    示例

    输出

    1. {
    2. "username": "u_emqx",
    3. "reason": "success",
    4. "node": "emqx@127.0.0.1",
    5. "connected_at": 1645003578536,
    6. "clientid": "c_emqx"
    7. }

    $events/client_check_acl_complete (鉴权结果)

    当客户端鉴权结束时触发规则

    字段解释
    clientid消息目的 Client ID
    username消息目的用户名
    peerhost客户端的 IPAddress
    topicMQTT 主题
    actionpublish or subscribe, 发布或者订阅事件
    resultallow or deny,鉴权结果
    is_cachetrue or false,鉴权时数据的来源
    is_cache为true时,鉴权数据来源于cache
    is_cache为false时,鉴权数据来源于插件
    timestamp事件触发时间 (ms)
    node事件触发所在节点

    示例

    1. SELECT
    2. clientid,
    3. username,
    4. topic,
    5. action,
    6. result,
    7. is_cache,
    8. node
    9. FROM
    10. "$events/client_check_acl_complete"

    输出

    1. {
    2. "username": "u_emqx",
    3. "topic": "t/a",
    4. "action": "publish",
    5. "result": "allow",
    6. "is_cache": "false",
    7. "node": "emqx@127.0.0.1",
    8. "clientid": "c_emqx"
    9. }

    当终端订阅成功时触发规则

    示例

    1. SELECT
    2. clientid,
    3. username,
    4. topic,
    5. qos
    6. FROM
    7. "$events/session_subscribed"

    输出

    1. {
    2. "username": "u_emqx",
    3. "topic": "t/a",
    4. "qos": 1,
    5. "clientid": "c_emqx"
    6. }

    $events/session_unsubscribed (取消终端订阅成功)

    当取消终端订阅成功时触发规则

    字段解释
    clientid消息目的 Client ID
    username消息目的用户名
    peerhost客户端的 IPAddress
    topicMQTT 主题
    qosMQTT 消息的 QoS
    unsub_propsUNSUBSCRIBE Properties (仅适用于 5.0)
    timestamp事件触发时间 (ms)
    node事件触发所在节点

    示例

    1. SELECT
    2. clientid,
    3. username,
    4. topic,
    5. qos

    输出