用户指南 (User Guide)

    系统默认开启匿名认证(anonymous),通过加载认证插件可开启的多个认证模块组成认证链:

    注解

    EMQ 2.0 消息服务器还提供了 MySQL、PostgreSQL、Redis、MongoDB、HTTP、LDAP 认证插件。

    开启匿名认证

    etc/emq.conf 配置启用匿名认证:

    1. mqtt.allow_anonymous = true

    EMQ 2.0 版本提供的认证插件包括:

    用户名密码认证

    基于 MQTT 登录用户名(username)、密码(password)认证。

    etc/plugins/emq_auth_username.conf 中配置默认用户:

    1. auth.user.$N.username = admin
    2. auth.user.$N.password = public

    启用 插件:

    1. ./bin/emqttd_ctl plugins load emq_auth_username

    使用 ./bin/emqttd_ctl users 命令添加用户:

    1. $ ./bin/emqttd_ctl users add <Username> <Password>

    ClientId 认证

    基于 MQTT 客户端 ID 认证。

    etc/plugins/emq_auth_clientid.conf:

    1. auth.client.$N.clientid = clientid
    2. auth.client.$N.password = passwd

    启用 emq_auth_clientid 插件:

    1. ./bin/emqttd_ctl plugins load emq_auth_clientid

    LDAP 插件认证

    etc/plugins/emq_auth_ldap.conf 配置 LDAP 参数:

    1. auth.ldap.servers = 127.0.0.1
    2. auth.ldap.port = 389
    3. auth.ldap.timeout = 30
    4. auth.ldap.user_dn = uid=%u,ou=People,dc=example,dc=com
    5. auth.ldap.ssl = false

    启用 LDAP 认证插件:

    1. ./bin/emqttd_ctl plugins load emq_auth_ldap

    HTTP 插件认证

    etc/plugins/emq_auth_http.conf 配置 ‘super_req’, ‘auth_req’:

    1. ## Variables: %u = username, %c = clientid, %a = ipaddress, %P = password, %t = topic
    2. auth.http.auth_req = http://127.0.0.1:8080/mqtt/auth
    3. auth.http.auth_req.method = post
    4. auth.http.auth_req.params = clientid=%c,username=%u,password=%P
    5. auth.http.super_req = http://127.0.0.1:8080/mqtt/superuser
    6. auth.http.super_req.method = post
    7. auth.http.super_req.params = clientid=%c,username=%u

    启用 HTTP 认证插件:

    1. ./bin/emqttd_ctl plugins load emq_auth_http

    MySQL 插件认证

    通过 MySQL 数据库表认证,可创建如下的 ‘mqtt_user’ 表:

    1. CREATE TABLE `mqtt_user` (
    2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    3. `username` varchar(100) DEFAULT NULL,
    4. `password` varchar(100) DEFAULT NULL,
    5. `salt` varchar(20) DEFAULT NULL,
    6. `is_superuser` tinyint(1) DEFAULT 0,
    7. `created` datetime DEFAULT NULL,
    8. PRIMARY KEY (`id`),
    9. UNIQUE KEY `mqtt_username` (`username`)
    10. ) ENGINE=MyISAM DEFAULT CHARSET=utf8;

    etc/plugins/emq_auth_mysql.conf 配置 ‘super_query’, ‘auth_query’, ‘password_hash’:

    1. ## Mysql Server
    2. auth.mysql.server = 127.0.0.1:3306
    3. ## Mysql Pool Size
    4. auth.mysql.pool = 8
    5. ## Mysql Username
    6. ## auth.mysql.username =
    7. ## Mysql Password
    8. ## auth.mysql.password =
    9. ## Mysql Database
    10. auth.mysql.database = mqtt
    11. ## Variables: %u = username, %c = clientid
    12. ## Authentication Query: select password only
    13. auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1
    14. ## Password hash: plain, md5, sha, sha256, pbkdf2
    15. auth.mysql.password_hash = sha256
    16. ## %% Superuser Query
    17. auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1

    注解

    如果系统已有MQTT认证表,可通过配置’auth_query’查询语句集成。

    启用 MySQL 认证插件:

    1. ./bin/emqttd_ctl plugins load emq_auth_mysql

    通过 PostgreSQL 数据库表认证,可创建如下的 ‘mqtt_user’ 表:

    1. CREATE TABLE mqtt_user (
    2. id SERIAL primary key,
    3. username character varying(100),
    4. password character varying(100),
    5. salt character varying(40)
    6. );

    etc/plugins/emq_auth_pgsql.conf 配置 ‘auth_query’、’password_hash’:

    1. ## Postgre Server
    2. auth.pgsql.server = 127.0.0.1:5432
    3. auth.pgsql.pool = 8
    4. auth.pgsql.username = root
    5. #auth.pgsql.password =
    6. auth.pgsql.database = mqtt
    7. auth.pgsql.encoding = utf8
    8. auth.pgsql.ssl = false
    9. ## Variables: %u = username, %c = clientid, %a = ipaddress
    10. ## Authentication Query: select password only
    11. auth.pgsql.auth_query = select password from mqtt_user where username = '%u' limit 1
    12. ## Password hash: plain, md5, sha, sha256, pbkdf2
    13. auth.pgsql.password_hash = sha256
    14. ## sha256 with salt prefix
    15. ## auth.pgsql.password_hash = salt sha256
    16. ## sha256 with salt suffix
    17. ## auth.pgsql.password_hash = sha256 salt
    18. ## Superuser Query
    19. auth.pgsql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1

    启用 Postgre 认证插件:

    1. ./bin/emqttd_ctl plugins load emq_auth_pgsql

    Redis 插件认证

    Redis 认证。MQTT 用户记录存储在 Redis Hash, 键值: “mqtt_user:<Username>”

    etc/plugins/emq_auth_redis.conf 设置 ‘super_cmd’、’auth_cmd’、’password_hash’:

    启用 Redis 认证插件:

    1. ./bin/emqttd_ctl plugins load emq_auth_redis

    MongoDB 插件认证

    按 MongoDB 用户集合认证,例如创建 ‘mqtt_user’ 集合:

    1. {
    2. username: "user",
    3. password: "password hash",
    4. is_superuser: boolean (true, false),
    5. created: "datetime"
    6. }

    etc/plugins/emq_auth_mongo.conf 设置 ‘super_query’、’auth_query’:

    1. ## Mongo Server
    2. auth.mongo.server = 127.0.0.1:27017
    3. ## Mongo Pool Size
    4. auth.mongo.pool = 8
    5. ## Mongo User
    6. ## auth.mongo.user =
    7. ## Mongo Password
    8. ## auth.mongo.password =
    9. ## Mongo Database
    10. auth.mongo.database = mqtt
    11. ## auth_query
    12. auth.mongo.auth_query.collection = mqtt_user
    13. auth.mongo.auth_query.password_field = password
    14. auth.mongo.auth_query.password_hash = sha256
    15. auth.mongo.auth_query.selector = username=%u
    16. ## super_query
    17. auth.mongo.super_query.collection = mqtt_user
    18. auth.mongo.super_query.super_field = is_superuser
    19. auth.mongo.super_query.selector = username=%u

    启用 MongoDB 认证插件:

    1. ./bin/emqttd_ctl plugins load emq_auth_mongo

    访问控制(ACL)

    EMQ 消息服务器通过 ACL(Access Control List) 实现 MQTT 客户端访问控制。

    ACL 访问控制规则定义:

    1. 允许(Allow)|拒绝(Deny) 谁(Who) 订阅(Subscribe)|发布(Publish) 主题列表(Topics)

    MQTT 客户端发起订阅/发布请求时,EMQ 消息服务器的访问控制模块,会逐条匹配 ACL 规则,直到匹配成功为止:

    1. --------- --------- ---------
    2. Client -> | Rule1 | --nomatch--> | Rule2 | --nomatch--> | Rule3 | --> Default
    3. --------- --------- ---------
    4. | | |
    5. match match match
    6. \|/ \|/ \|/
    7. allow | deny allow | deny allow | deny

    默认访问控制设置

    EMQ 消息服务器默认访问控制,在 etc/emq.conf 中设置:

    1. ## ACL nomatch
    2. mqtt.acl_nomatch = allow
    3. ## Default ACL File
    4. mqtt.acl_file = etc/acl.conf

    ACL 规则定义在 etc/acl.conf,EMQ 启动时加载到内存:

    1. %% Allow 'dashboard' to subscribe '$SYS/#'
    2. {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
    3. %% Allow clients from localhost to subscribe any topics
    4. {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
    5. %% Deny clients to subscribe '$SYS#' and '#'
    6. {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
    7. %% Allow all by default
    8. {allow, all}.

    HTTP 插件访问控制

    HTTP API 实现访问控制:

    配置 etc/plugins/emq_auth_http.conf, 启用 HTTP 认证插件后:

    1. ## 'access' parameter: sub = 1, pub = 2
    2. auth.http.acl_req = http://127.0.0.1:8080/mqtt/acl
    3. auth.http.acl_req.method = get
    4. auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t

    MySQL 插件访问控制

    MySQL 插件访问控制,通过 mqtt_acl 表定义 ACL 规则:

    1. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    2. `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
    3. `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
    4. `username` varchar(100) DEFAULT NULL COMMENT 'Username',
    5. `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
    6. `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
    7. `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
    8. PRIMARY KEY (`id`)
    9. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    10. INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
    11. (1,1,NULL,'$all',NULL,2,'#'),
    12. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
    13. (3,0,NULL,'$all',NULL,1,'eq #'),
    14. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    15. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    16. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

    etc/plugins/emq_auth_mysql.conf 配置 ‘acl_query’ 与 ‘acl_nomatch’:

    1. ## ACL Query Command
    2. auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

    PostgreSQL 插件访问控制,通过 mqtt_acl 表定义 ACL 规则:

    1. CREATE TABLE mqtt_acl (
    2. id SERIAL primary key,
    3. allow integer,
    4. ipaddr character varying(60),
    5. username character varying(100),
    6. clientid character varying(100),
    7. access integer,
    8. topic character varying(100)
    9. );
    10. INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
    11. VALUES
    12. (1,1,NULL,'$all',NULL,2,'#'),
    13. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
    14. (3,0,NULL,'$all',NULL,1,'eq #'),
    15. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    16. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    17. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

    etc/plugins/emq_auth_pgsql.conf 设置 ‘acl_query’ 与 ‘acl_nomatch’:

    1. ## ACL Query. Comment this query, the acl will be disabled.
    2. auth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

    Redis 插件访问控制

    Redis Hash 存储一个 MQTT 客户端的访问控制规则:

    1. HSET mqtt_acl:<username> topic1 1
    2. HSET mqtt_acl:<username> topic2 2
    3. HSET mqtt_acl:<username> topic3 3

    etc/plugins/emq_auth_redis.conf 配置 ‘acl_cmd’ 与 ‘acl_nomatch’:

    1. ## ACL Query Command
    2. auth.redis.acl_cmd = HGETALL mqtt_acl:%u

    MongoDB 插件访问控制

    MongoDB 数据库创建 mqtt_acl 集合:

    1. {
    2. username: "username",
    3. clientid: "clientid",
    4. publish: ["topic1", "topic2", ...],
    5. subscribe: ["subtop1", "subtop2", ...],
    6. pubsub: ["topic/#", "topic1", ...]
    7. }

    mqtt_acl 集合插入数据,例如:

    etc/plugins/emq_auth_mongo.conf 配置 ‘acl_query’ 与 ‘acl_nomatch’:

    1. ## acl_query
    2. auth.mongo.acl_query.collection = mqtt_user
    3. auth.mongo.acl_query.selector = username=%u

    MQTT 发布订阅

    MQTT 是为移动互联网、物联网设计的轻量发布订阅模式的消息服务器:

    EMQ 消息服务器安装启动后,任何设备或终端的 MQTT 客户端,可通过 MQTT 协议连接到服务器,发布订阅消息方式互通。

    MQTT 协议客户端库: https://github.com/mqtt/mqtt.github.io/wiki/libraries

    例如,mosquitto_sub/pub 命令行发布订阅消息:

    1. mosquitto_sub -t topic -q 2
    2. mosquitto_pub -t topic -q 1 -m "Hello, MQTT!"

    MQTT V3.1.1 版本协议规范:

    EMQ 消息服务器的 MQTT 协议 TCP 监听器,可在 etc/emq.conf 文件中设置:

    1. ## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883
    2. listener.tcp.external = 1883
    3. ## Size of acceptor pool
    4. listener.tcp.external.acceptors = 8
    5. ## Maximum number of concurrent clients
    6. listener.tcp.external.max_clients = 1024

    MQTT/SSL 监听器,缺省端口8883:

    1. ## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883
    2. listener.ssl.external = 8883
    3. ## Size of acceptor pool
    4. listener.ssl.external.acceptors = 4
    5. ## Maximum number of concurrent clients
    6. listener.ssl.external.max_clients = 512

    HTTP 发布接口

    EMQ 消息服务器提供了一个 HTTP 发布接口,应用服务器或 Web 服务器可通过该接口发布 MQTT 消息:

    1. HTTP POST http://host:8080/mqtt/publish
    1. curl -v --basic -u user:passwd -d "qos=1&retain=0&topic=/a/b/c&message=hello from http..." -k http://localhost:8080/mqtt/publish

    HTTP 接口参数:

    参数

    说明

    client

    MQTT 客户端 ID

    qos

    QoS: 0 | 1 | 2

    retain

    Retain: 0 | 1

    topic

    主题(Topic)

    message

    消息

    注解

    HTTP 发布接口采用 Basic 认证

    注解

    该接口在 v2.3-beta.2 版本变更为: ‘api/v2/mqtt/publish’, 详见文档: 管理监控API (REST API)

    MQTT WebSocket 连接

    EMQ 消息服务器支持 MQTT WebSocket 连接,Web 浏览器可直接通过 MQTT 协议连接服务器:

    WebSocket URI:

    ws(s)://host:8083/mqtt

    Sec-WebSocket-Protocol:

    ‘mqttv3.1’ or ‘mqttv3.1.1’

    Dashboard 插件提供了一个 MQTT WebSocket 连接的测试页面:

    1. http://127.0.0.1:18083/websocket.html

    EMQ 通过内嵌的 HTTP 服务器,实现 MQTT/WebSocket,etc/emq.conf 设置:

    1. ## MQTT/WebSocket Listener
    2. listener.ws.external = 8083
    3. listener.ws.external.acceptors = 4
    4. listener.ws.external.max_clients = 64

    $SYS-系统主题

    EMQ 消息服务器周期性发布自身运行状态、MQTT 协议统计、客户端上下线状态到 $SYS/ 开头系统主题。

    $SYS 主题路径以 “$SYS/brokers/{node}/” 开头,’${node}’ 是 Erlang 节点名称:

    1. $SYS/brokers/emqttd@127.0.0.1/version
    2. $SYS/brokers/emqttd@host2/uptime

    注解

    默认只允许 localhost 的 MQTT 客户端订阅 $SYS 主题,可通过 etc/acl.config 修改访问控制规则。

    $SYS 系统消息发布周期,通过 etc/emq.conf 配置:

    1. ## System Interval of publishing broker $SYS Messages
    2. mqtt.broker.sys_interval = 60

    主题

    说明

    $SYS/brokers

    集群节点列表

    $SYS/brokers/${node}/version

    EMQ 服务器版本

    $SYS/brokers/${node}/uptime

    EMQ 服务器启动时间

    $SYS/brokers/${node}/datetime

    EMQ 服务器时间

    $SYS/brokers/${node}/sysdescr

    EMQ 服务器描述

    MQTT 客户端上下线状态消息

    $SYS 主题前缀: $SYS/brokers/${node}/clients/

    | ts: 1432648482} | |

    ‘connected’ 消息 JSON 数据:

    1. {
    2. ipaddress: "127.0.0.1",
    3. username: "test",
    4. session: false,
    5. protocol: 3,
    6. connack: 0,
    7. ts: 1432648482
    8. }

    ‘disconnected’ 消息 JSON 数据:

    1. {
    2. reason: normal,
    3. ts: 1432648486
    4. }

    系统主题前缀: $SYS/brokers/${node}/stats/

    Clients - 客户端统计

    主题(Topic)

    说明

    clients/count

    当前客户端总数

    clients/max

    最大客户端数量

    Sessions - 会话统计

    主题(Topic)

    说明

    sessions/count

    当前会话总数

    sessions/max

    最大会话数量

    Subscriptions - 订阅统计

    主题(Topic)

    说明

    subscriptions/count

    当前订阅总数

    subscriptions/max

    最大订阅数量

    Topics - 主题统计

    Metrics - 收发流量/报文/消息统计

    系统主题(Topic)前缀: $SYS/brokers/${node}/metrics/

    收发流量统计

    主题(Topic)

    说明

    bytes/received

    累计接收流量

    bytes/sent

    累计发送流量

    MQTT报文收发统计

    主题(Topic)

    说明

    packets/received

    累计接收 MQTT 报文

    packets/sent

    累计发送 MQTT 报文

    packets/connect

    累计接收 MQTT CONNECT 报文

    packets/connack

    累计发送 MQTT CONNACK 报文

    packets/publish/received

    累计接收 MQTT PUBLISH 报文

    packets/publish/sent

    累计发送 MQTT PUBLISH 报文

    packets/subscribe

    累计接收 MQTT SUBSCRIBE 报文

    packets/suback

    累计发送 MQTT SUBACK 报文

    packets/unsubscribe

    累计接收 MQTT UNSUBSCRIBE 报文

    packets/unsuback

    累计发送 MQTT UNSUBACK 报文

    packets/pingreq

    累计接收 MQTT PINGREQ 报文

    packets/pingresp

    累计发送 MQTT PINGRESP 报文

    packets/disconnect

    累计接收 MQTT DISCONNECT 报文

    MQTT 消息收发统计

    主题(Topic)

    说明

    messages/received

    累计接收消息

    messages/sent

    累计发送消息

    messages/retained

    Retained 消息总数

    messages/dropped

    丢弃消息总数

    系统主题(Topic)前缀: $SYS/brokers/${node}/alarms/

    Sysmon - 系统监控

    系统主题(Topic)前缀: $SYS/brokers/${node}/sysmon/

    主题(Topic)

    说明

    long_gc

    GC 时间过长警告

    long_schedule

    调度时间过长警告

    large_heap

    Heap 内存占用警告

    busy_port

    Port 忙警告

    busy_dist_port

    Dist Port 忙警告

    EMQ 消息服务器支持追踪来自某个客户端(Client)的全部报文,或者发布到某个主题(Topic)的全部消息。

    追踪客户端(Client):

    1. ./bin/emqttd_ctl trace client "clientid" "trace_clientid.log"

    追踪主题(Topic):

    1. ./bin/emqttd_ctl trace topic "topic" "trace_topic.log"

    查询追踪:

    1. ./bin/emqttd_ctl trace list

    停止追踪:

    1. ./bin/emqttd_ctl trace client "clientid" off
    2. ./bin/emqttd_ctl trace topic "topic" off