EMQX Broker 中仅适用以下操作:

    • 检查 (调试)
    • 发送数据到 Web 服务
    • 桥接数据到 MQTT Broker
    • 保存数据到 TDengine(使用 发送数据到 Web 服务 实现) 其余均是 EMQX Enterprise 专属功能。

    创建一个测试规则,当有消息发送到 ‘t/a’ 主题时,打印消息内容以及动作参数细节。

    • 规则的筛选 SQL 语句为: SELECT * FROM “t/a”;
    • 动作是: “打印动作参数细节”,需要使用内置动作 ‘inspect’。

    上面的 CLI 命令创建了一个 ID 为 ‘Rule rule:803de6db’ 的规则。

    参数中前两个为必参数:

    • SQL 语句: SELECT * FROM “t/a”
    • 动作列表: [{“name”:”inspect”, “params”: {“a”: 1}}]。动作列表是用 JSON Array 格式表示的。name 字段是动作的名字,params 字段是动作的参数。注意 动作是不需要绑定资源的。

    最后一个可选参数,是规则的描述: ‘Rule for debug’。

    1. $ tail -f log/erlang.log.1
    2. (emqx@127.0.0.1)1> [inspect]
    3. Selected Data: #{clientid => <<"shawn">>,event => 'message.publish',
    4. flags => #{dup => false},
    5. id => <<"5898704A55D6AF4430000083D0002">>,
    6. payload => <<"hello">>,
    7. peername => <<"127.0.0.1:61770">>,qos => 1,
    8. timestamp => 1558587875090,topic => <<"t/a">>,
    9. Envs: #{event => 'message.publish',
    10. flags => #{dup => false},
    11. from => <<"shawn">>,
    12. headers =>
    13. peername => {{127,0,0,1},61770},
    14. username => undefined},
    15. id => <<0,5,137,135,4,165,93,106,244,67,0,0,8,61,0,2>>,
    16. payload => <<"hello">>,qos => 1,
    17. timestamp => {1558,587875,89754},
    18. topic => <<"t/a">>}
    19. Action Init Params: #{<<"a">> => 1}
    • Selected Data 列出的是消息经过 SQL 筛选、提取后的字段,由于我们用的是 select *,所以这里会列出所有可用的字段。
    • Envs 是动作内部可以使用的环境变量。
    • Action Init Params 是初始化动作的时候,我们传递给动作的参数。

    发送数据到 Web 服务

    创建一个规则,将所有发送自 client_id=’Steven’ 的消息,转发到地址为 ‘http://127.0.0.1:9910 (opens new window)‘ 的 Web 服务器:

    • 规则的筛选条件为: SELECT username as u, payload FROM “#” where u=’Steven’;
    • 动作是: “转发到地址为 ‘‘ 的 Web 服务”;
    • 资源类型是: web_hook;

    首先我们创建一个简易 Web 服务,这可以使用 nc ​ 命令实现:

    使用 WebHook 类型创建一个资源,并配置资源参数 url:

    1). 列出当前所有可用的资源类型,确保 ‘web_hook’ 资源类型已存在:

    1. $ ./bin/emqx_ctl resource-types list
    2. resource_type(name='web_hook', provider='emqx_web_hook', params=#{...}}, on_create={emqx_web_hook_actions,on_resource_create}, description='WebHook Resource')

    2). 使用类型 ‘web_hook’ 创建一个新的资源,并配置 “url”=”http://127.0.0.1:9910 (opens new window)“:

    然后创建规则,并选择规则的动作为 ‘data_to_webserver’:

    1). 列出当前所有可用的动作,确保 ‘data_to_webserver’ 动作已存在:

    1. $ ./bin/emqx_ctl rule-actions list
    2. action(name='data_to_webserver', app='emqx_web_hook', for='$any', types=[web_hook], params=#{'$resource' => ...}, title ='Data to Web Server', description='Forward Messages to Web Server')
    3. ...

    2). 创建规则,选择 data_to_webserver 动作,并通过 “$resource” 参数将 资源绑定到动作上:

    上面的 CLI 命令与第一个例子里创建 Inspect 规则时类似,区别在于这里需要把刚才创建的资源 ‘resource:691c29ba‘ 绑定到 ‘data_to_webserver’ 动作上。这个绑定通过给动作设置一个特殊的参数 ‘$resource’ 完成。’data_to_webserver’ 动作的作用是将数据发送到指定的 Web 服务器。

    现在我们使用 username “Steven” 发送 “hello” 到任意主题,上面创建的规则就会被触发,Web Server收到消息并回复 200 OK:

    1. $ while true; do echo -e "HTTP/1.1 200 OK\n\n $(date)" | nc -l 127.0.0.1 9910; done;
    2. POST / HTTP/1.1
    3. content-type: application/json
    4. content-length: 32
    5. te:
    6. host: 127.0.0.1:9910
    7. connection: keep-alive
    8. token: axfw34y235wrq234t4ersgw4t