用户指南 (User Guide)

    程序包下载后,可直接解压启动运行,例如 macOS 平台:

    EMQ X 消息服务器默认占用的 TCP 端口包括:

    MQTT 发布订阅

    MQTT 是为移动互联网、物联网设计的轻量发布订阅模式的消息服务器,目前支持 MQTT 和 v5.0:

    EMQ X 启动后,任何设备或终端可通过 MQTT 协议连接到服务器,通过 发布(Publish)/订阅(Subscribe) 进行交换消息。

    MQTT 客户端库:

    例如,mosquitto_sub/pub 命令行发布订阅消息:

    1. mosquitto_pub -h 127.0.0.1 -p 1883 -t topic -q 1 -m "Hello, MQTT!"

    认证/访问控制

    EMQ X 消息服务器 连接认证访问控制 由一系列的认证插件(Plugins)提供,他们的命名都符合 emqx_auth_<name> 的规则。

    在 EMQ X 中,这两个功能分别是指:

    1. 连接认证: EMQ X 校验每个连接上的客户端是否具有接入系统的权限,若没有则会断开该连接

    2. 访问控制: EMQ X 校验客户端每个 发布(Publish)/订阅(Subscribe) 的权限,以 允许/拒绝 相应操作

    EMQ X 消息服务器认证由一系列认证插件(Plugins)提供,系统支持按用户名密码、ClientID 或匿名认证。

    系统默认开启匿名认证(Anonymous),通过加载认证插件可开启的多个认证模块组成认证链:

    _images/guide_2.png

    开启匿名认证

    etc/emqx.conf 配置启用匿名认证:

    1. 允许匿名访问
    2. ## Value: true | false
    3. allow_anonymous = true

    访问控制(ACL)

    EMQ X 消息服务器通过 ACL(Access Control List) 实现 MQTT 客户端访问控制。

    ACL 访问控制规则定义:

    1. 允许(Allow)|拒绝(Deny) 谁(Who) 订阅(Subscribe)|发布(Publish) 主题列表(Topics)

    MQTT 客户端发起订阅/发布请求时,EMQ X 消息服务器的访问控制模块会逐条匹配 ACL 规则,直到匹配成功为止:

    默认访问控制设置

    EMQ X 消息服务器默认访问控制,在 etc/emqx.conf 中设置:

    1. ## 设置所有 ACL 规则都不能匹配时是否允许访问
    2. ## Value: allow | deny
    3. acl_nomatch = allow
    4. ## 设置存储 ACL 规则的默认文件
    5. ## Value: File Name
    6. acl_file = etc/acl.conf

    ACL 规则定义在 etc/acl.conf,EMQ X 启动时加载到内存:

    1. %% 允许 'dashboard' 用户订阅 '$SYS/#'
    2. {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
    3. %% 允许本机用户发布订阅全部主题
    4. {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
    5. %% 拒绝除本机用户以外的其他用户订阅 '$SYS/#' '#' 主题
    6. {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
    7. %% 允许上述规则以外的任何情形
    8. {allow, all}.

    EMQ X 提供的认证插件包括:

    插件

    说明

    emqx_auth_clientid

    ClientId 认证/鉴权插件

    用户名密码认证/鉴权插件

    emqx_auth_jwt

    JWT 认证/鉴权插件

    LDAP 认证/鉴权插件

    emqx_auth_http

    HTTP 认证/鉴权插件

    MySQ L认证/鉴权插件

    emqx_auth_pgsql

    Postgre 认证/鉴权插件

    Redis 认证/鉴权插件

    emqx_auth_mongo

    MongoDB 认证/鉴权插件

    其中,关于每个认证插件的配置及用法,可参考 关于认证部分。

    注解

    auth 插件可以同时启动多个。每次检查的时候,按照优先级从高到低依次检查,同一优先级的,先启动的插件先检查。

    此外 EMQ X 还支持使用 PSK (Pre-shared Key) 的方式来控制客户端的接入,但它并不是使用的上述的 连接认证 链的方式,而是在 SSL 握手期间进行验证。详情参考 Pre-shared Key

    EMQ X R3.0 版本开始支持集群级别的共享订阅功能。共享订阅(Shared Subscription)支持多种消息派发策略:

    _images/guide_4.png

    共享订阅支持两种使用方式:

    订阅前缀

    使用示例

    $queue/

    mosquitto_sub -t ‘$queue/topic’

    $share/<group>/

    mosquitto_sub -t ‘$share/group/topic’

    示例:

    1. mosquitto_sub -t '$share/group/topic'
    2. mosquitto_pub -t 'topic' -m msg -q 2

    EMQ X 通过 etc/emqx.conf 中的 broker.shared_subscription_strategy 字段配置共享消息的派发策略。

    目前支持按以下几种策略派发消息:

    策略

    说明

    random

    在所有共享订阅者中随机

    round_robin

    按订阅顺序

    sticky

    使用上次派发的订阅者

    hash

    根据发送者的 ClientId

    注解

    当所有的订阅者都不在线时,仍会挑选一个订阅者,并存至其 Session 的消息队列中

    节点桥接 (Bridge)

    EMQ X 节点间桥接

    桥接 的概念是 EMQ X 支持将自身某类主题的消息通过某种方式转发到另一个 MQTT Broker。

    桥接集群 的不同在于:桥接不会复制主题树与路由表,只根据桥接规则转发 MQTT 消息。

    目前 EMQ X 支持的桥接方式有:

    • RPC 桥接:RPC 桥接只能在 EMQ X Broker 间使用,且不支持订阅远程节点的主题去同步数据

    • MQTT 桥接:MQTT 桥接同时支持转发和通过订阅主题来实现数据同步两种方式

    其概念如下图所示:

    此外 EMQ X 消息服务器支持多节点桥接模式互联:

    _images/bridges_3.png

    在 EMQ X 中,通过修改 etc/plugins/emqx_bridge_mqtt.conf 来配置 bridge。EMQ X 根据不同的 name 来区分不同的 bridge。例如:

    1. ## Bridge address: node name for local bridge, host:port for remote.
    2. bridge.mqtt.aws.address = 127.0.0.1:1883

    该项配置声明了一个名为 aws 的 bridge 并指定以 MQTT 的方式桥接到 127.0.0.1:1883 这台 MQTT 服务器

    在需要创建多个 bridge 时,可以先复制其全部的配置项,在通过使用不同的 name 来标示(比如 bridge.mqtt.$name.address 其中 $name 指代的为 bridge 的名称)

    接下来两个小节,表述了如何创建 RPC/MQTT 方式的桥接,并创建一条转发传感器(sensor)主题消息的转发规则。假设在两台主机上启动了两个 EMQ X 节点:

    名称

    节点

    MQTT 端口

    emqx1

    emqx1@192.168.1.1

    1883

    emqx2

    1883

    EMQ X 节点 RPC 桥接配置

    以下是 RPC 桥接的基本配置,最简单的 RPC 桥接只需要配置以下三项就可以了:

    1. ## 桥接地址: 使用节点名(nodename@host)则用于 RPC 桥接,使用 host:port 用于 MQTT 连接
    2. bridge.mqtt.emqx2.address = emqx2@192.168.1.2
    3. ## 转发消息的主题
    4. bridge.mqtt.emqx2.forwards = sensor1/#,sensor2/#
    5. ## 桥接的 mountpoint(挂载点)
    6. bridge.mqtt.emqx2.mountpoint = bridge/emqx2/${node}/

    forwards 用于指定桥接的主题。所有发到 forwards 指定主题上的消息都会被转发到远程节点上。

    mountpoint 用于在转发消息时加上主题前缀。例如,以上配置中,主题为 sensor1/hello 的消息,EMQ X 将其转发到对端节点时,会将主题变为 bridge/emqx2/emqx1@192.168.1.1/sensor1/hello。

    RPC 桥接的特点:

    1. RPC 桥接只能将本地的消息转发到远程桥接节点上,无法将远程桥接节点的消息同步到本地节点上;

    2. RPC 桥接只能将两个 EMQ X 桥接在一起,无法桥接 EMQ X 到其他的 MQTT Broker 上;

    EMQ X 可以通过 MQTT Bridge 去订阅远程 MQTT Broker 的主题,再将远程 MQTT Broker 的消息同步到本地。

    EMQ X 的 MQTT Bridge 原理: 作为 MQTT 客户端连接到远程的 MQTT Broker,因此在 MQTT Bridge 的配置中,需要设置 MQTT 客户端连接时所需要的字段:

    1. ## 桥接地址
    2. bridge.mqtt.emqx2.address = 192.168.1.2:1883
    3. ## 桥接的协议版本
    4. ## 枚举值: mqttv3 | mqttv4 | mqttv5
    5. bridge.mqtt.emqx2.proto_ver = mqttv4
    6. ## 客户端的 client_id
    7. bridge.mqtt.emqx2.client_id = bridge_emq
    8. ## 客户端的 clean_start 字段
    9. ## 注: 有些 MQTT Broker 需要将 clean_start 值设成 `true`
    10. bridge.mqtt.emqx2.clean_start = true
    11. ## 客户端的 username 字段
    12. bridge.mqtt.emqx2.username = user
    13. ## 客户端的 password 字段
    14. bridge.mqtt.emqx2.password = passwd
    15. ## 客户端是否使用 ssl 来连接远程服务器
    16. bridge.mqtt.emqx2.ssl = off
    17. ## 客户端 SSL 连接的 CA 证书 (PEM格式)
    18. bridge.mqtt.emqx2.cacertfile = etc/certs/cacert.pem
    19. ## 客户端 SSL 连接的 SSL 证书
    20. bridge.mqtt.emqx2.certfile = etc/certs/client-cert.pem
    21. ## 客户端 SSL 连接的密钥文件
    22. bridge.mqtt.emqx2.keyfile = etc/certs/client-key.pem
    23. ## SSL 加密方式
    24. bridge.mqtt.emqx2.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
    25. ## TLS PSK 的加密套件
    26. ## 注意 'listener.ssl.external.ciphers' 和 'listener.ssl.external.psk_ciphers' 不能同时配置
    27. ##
    28. ## See 'https://tools.ietf.org/html/rfc4279#section-2'.
    29. ## bridge.mqtt.emqx2.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
    30. ## 客户端的心跳间隔
    31. bridge.mqtt.emqx2.keepalive = 60s
    32. ## 支持的 TLS 版本
    33. bridge.mqtt.emqx2.tls_versions = tlsv1.2,tlsv1.1,tlsv1
    34. ## 需要被转发的消息的主题
    35. bridge.mqtt.emqx2.forwards = sensor1/#,sensor2/#
    36. ## 挂载点(mountpoint)
    37. bridge.mqtt.emqx2.mountpoint = bridge/emqx2/${node}/
    38. ## 订阅对端的主题
    39. bridge.mqtt.emqx2.subscription.1.topic = cmd/topic1
    40. ## 订阅对端主题的 QoS
    41. bridge.mqtt.emqx2.subscription.1.qos = 1
    42. ## 桥接的重连间隔
    43. ## 默认: 30秒
    44. bridge.mqtt.emqx2.reconnect_interval = 30s
    45. ## QoS1/QoS2 消息的重传间隔
    46. bridge.mqtt.emqx2.retry_interval = 20s
    47. ## Inflight 大小.
    48. bridge.mqtt.emqx2.max_inflight_batches = 32

    EMQ X 桥接缓存配置

    EMQ X 的 Bridge 拥有消息缓存机制,缓存机制同时适用于 RPC 桥接和 MQTT 桥接,当 Bridge 断开(如网络连接不稳定的情况)时,可将 forwards 主题的消息缓存到本地的消息队列上。等到桥接恢复时,再把消息重新转发到远程节点上。关于缓存队列的配置如下:

    1. ## emqx_bridge 内部用于 batch 的消息数量
    2. bridge.mqtt.emqx2.queue.batch_count_limit = 32
    3. ## emqx_bridge 内部用于 batch 的消息字节数
    4. bridge.mqtt.emqx2.queue.batch_bytes_limit = 1000MB
    5. ## 放置 replayq 队列的路径,如果没有在配置中指定该项,那么 replayq
    6. ## 将会以 `mem-only` 的模式运行,消息不会缓存到磁盘上。
    7. bridge.mqtt.emqx2.queue.replayq_dir = data/emqx_emqx2_bridge/
    8. ## Replayq 数据段大小
    9. bridge.mqtt.emqx2.queue.replayq_seg_bytes = 10MB

    bridge.mqtt.emqx2.queue.replayq_seg_bytes 是用于指定缓存在磁盘上的消息队列的最大单个文件的大小,如果消息队列大小超出指定值的话,会创建新的文件来存储消息队列。

    EMQ X 桥接的命令行使用

    启动 emqx_bridge_mqtt 插件:

    桥接 CLI 命令:

    1. $ cd emqx1/ && ./bin/emqx_ctl bridges
    2. bridges list # List bridges
    3. bridges start <Name> # Start a bridge
    4. bridges stop <Name> # Stop a bridge
    5. bridges forwards <Name> # Show a bridge forward topic
    6. bridges add-forward <Name> <Topic> # Add bridge forward topic
    7. bridges del-forward <Name> <Topic> # Delete bridge forward topic
    8. bridges subscriptions <Name> # Show a bridge subscriptions topic
    9. bridges add-subscription <Name> <Topic> <Qos> # Add bridge subscriptions topic

    列出全部 bridge 状态

    1. name: emqx status: Stopped

    启动指定 bridge

    1. $ ./bin/emqx_ctl bridges start emqx
    2. Start bridge successfully.

    停止指定 bridge

    1. $ ./bin/emqx_ctl bridges stop emqx
    2. Stop bridge successfully.

    列出指定 bridge 的转发主题

    1. $ ./bin/emqx_ctl bridges forwards emqx
    2. topic: topic1/#
    3. topic: topic2/#

    添加指定 bridge 的转发主题

    1. $ ./bin/emqx_ctl bridges add-forwards emqx 'topic3/#'
    2. Add-forward topic successfully.

    删除指定 bridge 的转发主题

    1. $ ./bin/emqx_ctl bridges del-forwards emqx 'topic3/#'
    2. Del-forward topic successfully.

    列出指定 bridge 的订阅

    1. $ ./bin/emqx_ctl bridges subscriptions emqx
    2. topic: cmd/topic1, qos: 1
    3. topic: cmd/topic2, qos: 1

    添加指定 bridge 的订阅主题

    1. $ ./bin/emqx_ctl bridges add-subscription emqx 'cmd/topic3' 1
    2. Add-subscription topic successfully.

    删除指定 bridge 的订阅主题

    1. $ ./bin/emqx_ctl bridges del-subscription emqx 'cmd/topic3'
    2. Del-subscription topic successfully.

    注: 如果有创建多个 Bridge 的需求,需要复制默认的 Bridge 配置,再拷贝到 emqx_bridge_mqtt.conf 中,根据需求重命名 bridge.mqtt.${name}.config 中的 name 即可。

    HTTP 发布接口

    EMQ X 消息服务器提供了一个 HTTP 发布接口,应用服务器或 Web 服务器可通过该接口发布 MQTT 消息:

    Web 服务器例如 PHP/Java/Python/NodeJS 或 Ruby on Rails,可通过 HTTP POST 请求发布 MQTT 消息:

    1. curl -v --basic -u user:passwd -H "Content-Type: application/json" -d \
    2. '{"qos":1, "retain": false, "topic":"world", "payload":"test" , "client_id": "C_1492145414740"}' \-k http://localhost:8080/api/v3/mqtt/publish

    HTTP 接口参数:

    参数

    说明

    client_id

    MQTT 客户端 ID

    qos

    QoS: 0 | 1 | 2

    retain

    Retain: true | false

    topic

    主题(Topic)

    payload

    消息载荷

    注解

    HTTP 发布接口采用 Basic 认证。上例中的 userpassword 是来自于 Dashboard 下的 Applications 内的 AppId 和密码

    EMQ X 还支持 WebSocket 连接,Web 浏览器可直接通过 WebSocket 连接至服务器:

    Dashboard 插件提供了一个 MQTT WebSocket 连接的测试页面:

    1. http://127.0.0.1:18083/#/websocket

    $SYS-系统主题

    EMQ X 消息服务器周期性发布自身运行状态、消息统计、客户端上下线事件到 以 $SYS/ 开头系统主题。

    $SYS 主题路径以 $SYS/brokers/{node}/ 开头。 {node} 是指产生该 事件/消息 所在的节点名称,例如:

    1. $SYS/brokers/emqx@127.0.0.1/version
    2. $SYS/brokers/emqx@127.0.0.1/uptime

    注解

    默认只允许 localhost 的 MQTT 客户端订阅 $SYS 主题,可通过 etc/acl.config 修改访问控制规则。

    $SYS 系统消息发布周期,通过 etc/emqx.conf 配置:

    1. ## System interval of publishing $SYS messages.
    2. ##
    3. ## Value: Duration
    4. ## Default: 1m, 1 minute
    5. broker.sys_interval = 1m

    集群状态信息

    主题

    说明

    $SYS/brokers

    集群节点列表

    $SYS/brokers/${node}/version

    EMQ X 服务器版本

    $SYS/brokers/${node}/uptime

    EMQ X 服务器启动时间

    $SYS/brokers/${node}/datetime

    EMQ X 服务器时间

    $SYS/brokers/${node}/sysdescr

    EMQ X 服务器描述

    $SYS 主题前缀: $SYS/brokers/${node}/clients/

    主题(Topic)

    说明

    ${clientid}/connected

    上线事件。当某客户端上线时,会发布该消息

    ${clientid}/disconnected

    下线事件。当某客户端离线时,会发布该消息

    ‘connected’ 事件消息的 Payload 可解析成 JSON 格式:

    1. {
    2. "clientid":"id1",
    3. "username":"u",
    4. "ipaddress":"127.0.0.1",
    5. "connack":0,
    6. "ts":1554047291,
    7. "proto_ver":3,
    8. "proto_name":"MQIsdp",
    9. "clean_start":true,
    10. "keepalive":60
    11. }

    ‘disconnected’ 事件消息的 Payload 可解析成 JSON 格式:

    1. {
    2. "clientid":"id1",
    3. "username":"u",
    4. "reason":"normal",
    5. "ts":1554047291
    6. }

    系统统计(Statistics)

    系统主题前缀: $SYS/brokers/${node}/stats/

    客户端统计

    主题(Topic)

    说明

    connections/count

    当前客户端总数

    connections/max

    最大客户端数量

    会话统计

    主题(Topic)

    说明

    sessions/count

    当前会话总数

    sessions/max

    最大会话数量

    sessions/persistent/count

    当前持久会话总数

    sessions/persistent/max

    最大持久会话数量

    订阅统计

    主题(Topic)

    说明

    suboptions/count

    当前订阅选项个数

    suboptions/max

    最大订阅选项总数

    subscribers/max

    最大订阅者总数

    subscribers/count

    当前订阅者数量

    subscriptions/max

    最大订阅数量

    subscriptions/count

    当前订阅总数

    subscriptions/shared/count

    当前共享订阅个数

    subscriptions/shared/max

    当前共享订阅总数

    主题统计

    路由统计

    主题(Topic)

    说明

    routes/count

    当前 Routes 总数

    routes/max

    最大 Routes 数量

    注解

    topics/counttopics/maxroutes/countroutes/max 数值上是相等的。

    收发流量/报文/消息统计

    系统主题(Topic)前缀: $SYS/brokers/${node}/metrics/

    收发流量统计

    主题(Topic)

    说明

    累计接收流量

    bytes/sent

    累计发送流量

    MQTT报文收发统计

    主题(Topic)

    说明

    packets/received

    累计接收 MQTT 报文

    packets/sent

    累计发送 MQTT 报文

    packets/connect

    累计接收 MQTT CONNECT 报文

    packets/connack

    累计发送 MQTT CONNACK 报文

    packets/publish/received

    累计接收 MQTT PUBLISH 报文

    packets/publish/sent

    累计发送 MQTT PUBLISH 报文

    packets/puback/received

    累计接收 MQTT PUBACK 报文

    packets/puback/sent

    累计发送 MQTT PUBACK 报文

    packets/puback/missed

    累计丢失 MQTT PUBACK 报文

    packets/pubrec/received

    累计接收 MQTT PUBREC 报文

    packets/pubrec/sent

    累计发送 MQTT PUBREC 报文

    packets/pubrec/missed

    累计丢失 MQTT PUBREC 报文

    packets/pubrel/received

    累计接收 MQTT PUBREL 报文

    packets/pubrel/sent

    累计发送 MQTT PUBREL 报文

    packets/pubrel/missed

    累计丢失 MQTT PUBREL 报文

    packets/pubcomp/received

    累计接收 MQTT PUBCOMP 报文

    packets/pubcomp/sent

    累计发送 MQTT PUBCOMP 报文

    packets/pubcomp/missed

    累计丢失 MQTT PUBCOMP 报文

    packets/subscribe

    累计接收 MQTT SUBSCRIBE 报文

    packets/suback

    累计发送 MQTT SUBACK 报文

    packets/unsubscribe

    累计接收 MQTT UNSUBSCRIBE 报文

    packets/unsuback

    累计发送 MQTT UNSUBACK 报文

    packets/pingreq

    累计接收 MQTT PINGREQ 报文

    packets/pingresp

    累计发送 MQTT PINGRESP 报文

    packets/disconnect/received

    累计接收 MQTT DISCONNECT 报文

    packets/disconnect/sent

    累计接收 MQTT DISCONNECT 报文

    packets/auth

    累计接收 Auth 报文

    MQTT 消息收发统计

    主题(Topic)

    说明

    messages/received

    累计接收消息

    messages/sent

    累计发送消息

    messages/expired

    累计发送消息

    messages/retained

    Retained 消息总数

    messages/dropped

    丢弃消息总数

    messages/forward

    节点转发消息总数

    messages/qos0/received

    累计接受 QoS0 消息

    messages/qos0/sent

    累计发送 QoS0 消息

    messages/qos1/received

    累计接受 QoS1 消息

    messages/qos1/sent

    累计发送 QoS1 消息

    messages/qos2/received

    累计接受 QoS2 消息

    messages/qos2/sent

    累计发送 QoS2 消息

    messages/qos2/expired

    QoS2 过期消息总数

    messages/qos2/dropped

    QoS2 丢弃消息总数

    Alarms - 系统告警

    系统主题(Topic)前缀: $SYS/brokers/${node}/alarms/

    主题(Topic)

    说明

    alert

    新产生的告警

    clear

    被清除的告警

    系统主题(Topic)前缀: $SYS/brokers/${node}/sysmon/

    追踪

    EMQ X 消息服务器支持追踪来自某个客户端(Client),或者发布到某个主题(Topic)的全部消息。

    追踪来自客户端(Client)的消息:

    1. $ ./bin/emqx_ctl log primary-level debug
    2. $ ./bin/emqx_ctl trace start client "clientid" "trace_clientid.log" debug

    追踪发布到主题(Topic)的消息:

    1. $ ./bin/emqx_ctl log primary-level debug
    2. $ ./bin/emqx_ctl trace start topic "t/#" "trace_topic.log" debug

    查询追踪:

    1. $ ./bin/emqx_ctl trace list

    停止追踪:

    1. $ ./bin/emqx_ctl trace stop client "clientid"