用户指南(User Guide)

    etc/emqttd.config配置启用一个认证模块:

    Note

    “%” 注释一行

    如果同时启用多个认证模块,认证流程如下:

    1. Client --> | Username | -ignore-> | ClientID | -ignore-> | Anonymous |
    2. ---------------- ---------------- -------------
    3. | | |
    4. \|/ \|/ \|/
    5. allow | deny allow | deny allow | deny

    emqtt项目维护开发的认证插件:

    Note

    如果加载认证插件,etc/emqttd.config中配置的认证模块失效

    基于MQTT登录用户名(username)、密码(password)认证:

    1. {username, [{client1, "passwd1"}, {client1, "passwd2"}]},

    两种方式添加认证用户:

    1. 直接配置用户名和明文密码:

      1. {username, [{client1, "passwd1"}, {client1, "passwd2"}]},
    2. 使用’./bin/emqttd_ctl users’命令添加用户:

      1. $ ./bin/emqttd_ctl users add <Username> <Password>

    ClientId认证

    基于MQTT客户端ID(ClientId)认证:

    1. {clientid, [{password, no}, {file, "etc/clients.config"}]},

    etc/clients.config添加客户端ID:

    1. testclientid0
    2. testclientid1 127.0.0.1
    3. testclientid2 192.168.0.1/24

    LDAP认证

    1. {ldap, [
    2. {servers, ["localhost"]},
    3. {port, 389},
    4. {timeout, 30},
    5. {user_dn, "uid=$u,ou=People,dc=example,dc=com"},
    6. {ssl, fasle},
    7. {sslopts, [
    8. {"certfile", "ssl.crt"},
    9. {"keyfile", "ssl.key"}]}
    10. ]},

    匿名认证(Anonymous)

    emqttd消息服务器默认采用匿名认证,允许任何客户端登录:

    1. {anonymous, []}

    HTTP插件认证

    emqttd_auth_http/etc/plugin.config配置’super_req’, ‘auth_req’:

    1. [
    2. {emqttd_auth_http, [
    3. %% Variables: %u = username, %c = clientid, %a = ipaddress, %t = topic
    4. {super_req, [
    5. {method, post},
    6. {url, "http://localhost:8080/mqtt/superuser"},
    7. {params, [
    8. {username, "%u"},
    9. {clientid, "%c"}
    10. ]}
    11. ]},
    12. {auth_req, [
    13. {method, post},
    14. {url, "http://localhost:8080/mqtt/auth"},
    15. {params, [
    16. {clientid, "%c"},
    17. {username, "%u"},
    18. {password, "%P"}
    19. ]}
    20. ]},
    21. ...

    启用插件:

    1. ./bin/emqttd_ctl plugins load emqttd_auth_http

    MySQL插件认证

    通过MySQL数据库表认证,可创建如下的’mqtt_user’表:

    1. CREATE TABLE `mqtt_user` (
    2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    3. `username` varchar(100) DEFAULT NULL,
    4. `password` varchar(100) DEFAULT NULL,
    5. `salt` varchar(20) DEFAULT NULL,
    6. `is_superuser` tinyint(1) DEFAULT 0,
    7. `created` datetime DEFAULT NULL,
    8. PRIMARY KEY (`id`),
    9. UNIQUE KEY `mqtt_username` (`username`)
    10. ) ENGINE=MyISAM DEFAULT CHARSET=utf8;

    emqttd_plugin_mysql/etc/plugin.config配置’superquery’, ‘authquery’, ‘password_hash’:

    1. [
    2. {emqttd_plugin_mysql, [
    3. ...
    4. %% Variables: %u = username, %c = clientid, %a = ipaddress
    5. %% Superuser Query
    6. {superquery, "select is_superuser from mqtt_user where username = '%u' limit 1"},
    7. %% select password only
    8. {authquery, "select password from mqtt_user where username = '%u' limit 1"},
    9. %% hash algorithm: md5, sha, sha256, pbkdf2?
    10. {password_hash, sha256},
    11. %% select password with salt
    12. %% {authquery, "select password, salt from mqtt_user where username = '%u'"},
    13. %% sha256 with salt prefix
    14. %% {password_hash, {salt, sha256}},
    15. %% sha256 with salt suffix
    16. %% {password_hash, {sha256, salt}},
    17. ...
    18. ]}
    19. ].

    Note

    如果系统已有MQTT认证表,可通过配置’authquery’查询语句集成。

    启用插件:

    1. ./bin/emqttd_ctl plugins load emqttd_plugin_mysql

    PostgreSQL插件认证

    通过PostgreSQL数据库表认证,可创建如下的’mqtt_user’表:

    1. CREATE TABLE mqtt_user (
    2. id SERIAL primary key,
    3. is_superuser boolean,
    4. username character varying(100),
    5. password character varying(100),
    6. salt character varying(40)
    7. );

    emqttd_plugin_pgsql/etc/plugin.config配置’authquery’、’password_hash’:

    1. [
    2. {emqttd_plugin_pgsql, [
    3. ...
    4. %% Variables: %u = username, %c = clientid, %a = ipaddress
    5. %% Superuser Query
    6. {superquery, "select is_superuser from mqtt_user where username = '%u' limit 1"},
    7. %% Authentication Query: select password only
    8. {authquery, "select password from mqtt_user where username = '%u' limit 1"},
    9. %% hash algorithm: plain, md5, sha, sha256, pbkdf2?
    10. {password_hash, sha256},
    11. %% select password with salt
    12. %% {authquery, "select password, salt from mqtt_user where username = '%u'"},
    13. %% sha256 with salt prefix
    14. %% {password_hash, {salt, sha256}},
    15. %% sha256 with salt suffix
    16. %% {password_hash, {sha256, salt}},
    17. ...
    18. ]}
    19. ].

    启用插件:

    Redis认证。MQTT用户记录存储在Redis Hash, 键值: “mqtt_user:<Username>”

    emqttd_plugin_redis/etc/plugin.config设置’supercmd’、’authcmd’、’password_hash’:

    1. [
    2. {emqttd_plugin_redis, [
    3. ...
    4. %% HMGET mqtt_user:%u is_superuser
    5. {supercmd, ["HGET", "mqtt_user:%u", "is_superuser"]},
    6. %% HMGET mqtt_user:%u password
    7. {authcmd, ["HGET", "mqtt_user:%u", "password"]},
    8. %% Password hash algorithm: plain, md5, sha, sha256, pbkdf2?
    9. {password_hash, sha256},
    10. ...
    11. ]}
    12. ].

    启用插件:

    1. ./bin/emqttd_ctl plugins load emqttd_plugin_redis

    MongoDB插件认证

    按MongoDB用户集合认证,例如创建’mqtt_user’集合:

    1. {
    2. username: "user",
    3. password: "password hash",
    4. is_superuser: boolean (true, false),
    5. created: "datetime"
    6. }

    emqttd_plugin_mongo/etc/plugin.config设置’superquery’、’authquery’:

    1. [
    2. {emqttd_plugin_mongo, [
    3. ...
    4. %% Variables: %u = username, %c = clientid
    5. %% Superuser Query
    6. {superquery, [
    7. {collection, "mqtt_user"},
    8. {super_field, "is_superuser"},
    9. {selector, {"username", "%u"}}
    10. ]},
    11. %% Authentication Query
    12. {authquery, [
    13. {collection, "mqtt_user"},
    14. {password_field, "password"},
    15. %% Hash Algorithm: plain, md5, sha, sha256, pbkdf2?
    16. {password_hash, sha256},
    17. {selector, {"username", "%u"}}
    18. ]},
    19. ...
    20. ]}
    21. ].

    启用插件:

    1. ./bin/emqttd_ctl plugins load emqttd_plugin_mongodb

    访问控制(ACL)

    emqttd消息服务器通过ACL(Access Control List)实现MQTT客户端访问控制。

    ACL访问控制规则定义:

    1. 允许(Allow)|拒绝(Deny) 谁(Who) 订阅(Subscribe)|发布(Publish) 主题列表(Topics)

    MQTT客户端发起订阅/发布请求时,emqttd消息服务器的访问控制模块,会逐条匹配ACL规则,直到匹配成功为止:

    1. --------- --------- ---------
    2. Client -> | Rule1 | --nomatch--> | Rule2 | --nomatch--> | Rule3 | --> Default
    3. --------- --------- ---------
    4. | | |
    5. match match match
    6. \|/ \|/ \|/
    7. allow | deny allow | deny allow | deny

    Internal访问控制

    emqttd消息服务器默认的访问控制,由一个’internal’模块实现,etc/emqttd.config中配置:

    1. {acl, [
    2. %% Internal ACL module
    3. {internal, [{file, "etc/acl.config"}, {nomatch, allow}]}
    4. ]}

    ACL规则通过etc/acl.config配置,emqttd启动时加载到ETS内存表:

    1. %% Allow 'dashboard' to subscribe '$SYS/#'
    2. {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
    3. %% Allow clients from localhost to subscribe any topics
    4. {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
    5. %% Deny clients to subscribe '$SYS#' and '#'
    6. {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
    7. %% Allow all by default
    8. {allow, all}.

    HTTP插件访问控制

    HTTP API实现访问控制:

    启用HTTP认证插件后,配置emqttd_auth_http/etc/plugin.config:

    1. ...
    2. %% Variables: %u = username, %c = clientid, %a = ipaddress, %t = topic
    3. %% 'access' parameter: sub = 1, pub = 2
    4. {acl_req, [
    5. {method, post},
    6. {url, "http://localhost:8080/mqtt/acl"},
    7. {params, [
    8. {access, "%A"},
    9. {username, "%u"},
    10. {clientid, "%c"},
    11. {ipaddr, "%a"},
    12. {topic, "%t"}
    13. ]}
    14. ]}

    MySQL插件访问控制

    MySQL插件访问控制,通过mqtt_acl表定义ACL规则:

    1. CREATE TABLE `mqtt_acl` (
    2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    3. `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
    4. `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
    5. `username` varchar(100) DEFAULT NULL COMMENT 'Username',
    6. `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
    7. `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
    8. `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
    9. PRIMARY KEY (`id`)
    10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    11. INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
    12. VALUES
    13. (1,1,NULL,'$all',NULL,2,'#'),
    14. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
    15. (3,0,NULL,'$all',NULL,1,'eq #'),
    16. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    17. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    18. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

    emqttd_plugin_mysql/etc/plugin.config配置’aclquery’与’acl_nomatch’:

    1. [
    2. {emqttd_plugin_mysql, [
    3. ...
    4. %% comment this query, the acl will be disabled
    5. {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
    6. %% If no rules matched, return...
    7. {acl_nomatch, allow}
    8. ]}
    9. ].

    PostgreSQL插件访问控制

    PostgreSQL插件访问控制,通过mqtt_acl表定义ACL规则:

    1. CREATE TABLE mqtt_acl (
    2. allow integer,
    3. ipaddr character varying(60),
    4. username character varying(100),
    5. clientid character varying(100),
    6. access integer,
    7. topic character varying(100)
    8. );
    9. INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
    10. VALUES
    11. (1,1,NULL,'$all',NULL,2,'#'),
    12. (3,0,NULL,'$all',NULL,1,'eq #'),
    13. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    14. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    15. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

    emqttd_plugin_pgsql/etc/plugin.config设置’aclquery’与’acl_nomatch’:

    1. [
    2. {emqttd_plugin_pgsql, [
    3. ...
    4. %% Comment this query, the acl will be disabled. Notice: don't edit this query!
    5. {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl
    6. where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
    7. %% If no rules matched, return...
    8. {acl_nomatch, allow}
    9. ...
    10. ]}
    11. ].

    Redis插件访问控制

    Redis List存储一个MQTT客户端的访问控制规则,键值: “mqtt_acl:<Username>”,List存储: 存储”publish <Topic>”, “subscribe <Topic>” 或 “pubsub <Topic>”.

    emqttd_plugin_redis/etc/plugin.config配置’aclcmd’与’acl_nomatch’:

    1. [
    2. {emqttd_plugin_redis, [
    3. ...
    4. %% SMEMBERS mqtt_acl:%u
    5. {aclcmd, ["SMEMBERS", "mqtt_acl:%u"]},
    6. %% If no rules matched, return...
    7. {acl_nomatch, deny},
    8. ...
    9. ]}
    10. ].

    MongoDB数据库创建’mqtt_acl’集合:

    ‘mqtt_acl’集合插入数据,例如:

    1. db.mqtt_acl.insert({username: "test", publish: ["t/1", "t/2"], subscribe: ["user/%u", "client/%c"]})
    2. db.mqtt_acl.insert({username: "admin", pubsub: ["#"]})

    emqttd_plugin_mongodb/etc/plugin.config配置’aclquery’与’acl_nomatch’:

    1. %% ACL Query: "%u" = username, "%c" = clientid
    2. {aclquery, [
    3. {collection, "mqtt_acl"},
    4. {selector, {"username", "%u"}}
    5. ]},
    6. %% If no ACL rules matched, return...
    7. {acl_nomatch, deny}

    MQTT是为移动互联网、物联网设计的轻量发布订阅模式的消息服务器:

    emqttd消息服务器安装启动后,任何设备或终端的MQTT客户端,可通过MQTT协议连接到emqttd,发布订阅消息方式互通。

    MQTT协议客户端库: https://github.com/mqtt/mqtt.github.io/wiki/libraries

    例如,mosquitto_sub/pub命令行发布订阅消息:

    1. mosquitto_sub -t topic -q 2
    2. mosquitto_pub -t topic -q 1 -m "Hello, MQTT!"

    MQTT V3.1.1版本协议规范:

    emqttd消息服务器的MQTT协议TCP监听器,可在etc/emqttd.config文件中设置:

    1. {mqtt, 1883, [
    2. %% Size of acceptor pool
    3. {acceptors, 16},
    4. %% Maximum number of concurrent clients
    5. {max_clients, 512},
    6. %% Socket Access Control
    7. {access, [{allow, all}]},
    8. %% Connection Options
    9. {connopts, [
    10. %% Rate Limit. Format is 'burst, rate', Unit is KB/Sec
    11. %% {rate_limit, "100,10"} %% 100K burst, 10K rate
    12. ]},
    13. %% Socket Options
    14. {sockopts, [
    15. %Set buffer if hight thoughtput
    16. %{recbuf, 4096},
    17. %{sndbuf, 4096},
    18. %{buffer, 4096},
    19. %{nodelay, true},
    20. {backlog, 512}
    21. ]}
    22. ]},

    MQTT(SSL) TCP监听器,缺省端口8883:

    1. {mqtts, 8883, [
    2. %% Size of acceptor pool
    3. {acceptors, 4},
    4. %% Maximum number of concurrent clients
    5. {max_clients, 512},
    6. %% Socket Access Control
    7. {access, [{allow, all}]},
    8. %% SSL certificate and key files
    9. {ssl, [{certfile, "etc/ssl/ssl.crt"},
    10. {keyfile, "etc/ssl/ssl.key"}]},
    11. %% Socket Options
    12. {sockopts, [
    13. {backlog, 1024}
    14. %{buffer, 4096},
    15. ]}
    16. ]},

    HTTP发布接口

    emqttd消息服务器提供了一个HTTP发布接口,应用服务器或Web服务器可通过该接口发布MQTT消息:

    1. HTTP POST http://host:8083/mqtt/publish

    Web服务器例如PHP/Java/Python/NodeJS或Ruby on Rails,可通过HTTP POST请求发布MQTT消息:

    1. curl -v --basic -u user:passwd -d "qos=1&retain=0&topic=/a/b/c&message=hello from http..." -k http://localhost:8083/mqtt/publish

    HTTP接口参数:

    参数

    client

    MQTT客户端ID

    qos

    QoS: 0 | 1 | 2

    retain

    Retain: 0 | 1

    topic

    主题(Topic)

    message

    消息

    Note

    HTTP接口采用Basic认证

    emqttd消息服务器支持MQTT WebSocket连接,Web浏览器可直接通过MQTT协议连接到emqttd:

    WebSocket URI:

    ws(s)://host:8083/mqtt

    Sec-WebSocket-Protocol:

    ‘mqttv3.1’ or ‘mqttv3.1.1’

    Dashboard插件提供了一个MQTT WebSocket连接的测试页面:

    1. http://127.0.0.1:18083/websocket.html

    emqttd通过内嵌的HTTP服务器,实现MQTT WebSocket与HTTP发布接口,etc/emqttd.config设置:

    1. %% HTTP and WebSocket Listener
    2. {http, 8083, [
    3. %% Size of acceptor pool
    4. {acceptors, 4},
    5. %% Maximum number of concurrent clients
    6. {max_clients, 64},
    7. %% Socket Access Control
    8. {access, [{allow, all}]},
    9. %% Socket Options
    10. {sockopts, [
    11. {backlog, 1024}
    12. %{buffer, 4096},
    13. ]}
    14. ]}

    $SYS-系统主题

    emqttd消息服务器周期性发布自身运行状态、MQTT协议统计、客户端上下线状态到’$SYS/’开头系统主题。

    $SYS主题路径以”$SYS/brokers/{node}/”开头,’${node}’是Erlang节点名称:

    1. $SYS/brokers/emqttd@127.0.0.1/version
    2. $SYS/brokers/emqttd@host2/uptime

    Note

    默认只允许localhost的MQTT客户端订阅$SYS主题,可通过etc/acl.config修改访问控制规则。

    $SYS系统消息发布周期,通过etc/emqttd.config配置:

    1. {broker, [
    2. %% System interval of publishing broker $SYS messages
    3. {sys_interval, 60},

    服务器版本、启动时间与描述消息

    主题

    说明

    $SYS/brokers

    集群节点列表

    $SYS/brokers/${node}/version

    emqttd版本

    $SYS/brokers/${node}/uptime

    emqttd启动时间

    $SYS/brokers/${node}/datetime

    emqttd服务器时间

    $SYS/brokers/${node}/sysdescr

    emqttd描述

    MQTT客户端上下线状态消息

    $SYS主题前缀: $SYS/brokers/${node}/clients/

    ‘connected’消息JSON数据:

    1. {
    2. ipaddress: "127.0.0.1",
    3. username: "test",
    4. session: false,
    5. protocol: 3,
    6. connack: 0,
    7. ts: 1432648482
    8. }

    ‘disconnected’消息JSON数据:

    1. {
    2. reason: normal,
    3. ts: 1432648486
    4. }

    Statistics - 系统统计消息

    系统主题前缀: $SYS/brokers/${node}/stats/

    Clients - 客户端统计

    主题(Topic)

    说明

    clients/count

    当前客户端总数

    clients/max

    最大客户端数量

    Sessions - 会话统计

    主题(Topic)

    说明

    sessions/count

    当前会话总数

    sessions/max

    最大会话数量

    Subscriptions - 订阅统计

    主题(Topic)

    说明

    subscriptions/count

    当前订阅总数

    subscriptions/max

    最大订阅数量

    Topics - 主题统计

    Metrics-收发流量/报文/消息统计

    系统主题(Topic)前缀: $SYS/brokers/${node}/metrics/

    收发流量统计

    主题(Topic)

    说明

    bytes/received

    累计接收流量

    bytes/sent

    累计发送流量

    MQTT报文收发统计

    主题(Topic)

    说明

    packets/received

    累计接收MQTT报文

    packets/sent

    累计发送MQTT报文

    packets/connect

    累计接收MQTT CONNECT报文

    packets/connack

    累计发送MQTT CONNACK报文

    packets/publish/received

    累计接收MQTT PUBLISH报文

    packets/publish/sent

    累计发送MQTT PUBLISH报文

    packets/subscribe

    累计接收MQTT SUBSCRIBE报文

    packets/suback

    累计发送MQTT SUBACK报文

    packets/unsubscribe

    累计接收MQTT UNSUBSCRIBE报文

    packets/unsuback

    累计发送MQTT UNSUBACK报文

    packets/pingreq

    累计接收MQTT PINGREQ报文

    packets/pingresp

    累计发送MQTT PINGRESP报文数量

    packets/disconnect

    累计接收MQTT DISCONNECT数量

    MQTT消息收发统计

    主题(Topic)

    说明

    messages/received

    累计接收消息

    messages/sent

    累计发送消息

    messages/retained

    Retained消息总数

    messages/dropped

    丢弃消息总数

    Alarms-系统告警

    系统主题(Topic)前缀: $SYS/brokers/${node}/alarms/

    Sysmon-系统监控

    系统主题(Topic)前缀: $SYS/brokers/${node}/sysmon/

    主题(Topic)

    说明

    long_gc

    GC时间过长警告

    long_schedule

    调度时间过长警告

    large_heap

    Heap内存占用警告

    busy_port

    Port忙警告

    busy_dist_port

    Dist Port忙警告

    emqttd消息服务器支持追踪来自某个客户端(Client)的全部报文,或者发布到某个主题(Topic)的全部消息。

    追踪客户端(Client):

    1. ./bin/emqttd_ctl trace client "clientid" "trace_clientid.log"

    追踪主题(Topic):

    1. ./bin/emqttd_ctl trace topic "topic" "trace_topic.log"

    停止追踪:

    1. ./bin/emqttd_ctl trace client "clientid" off