扩展插件 (Plugins)

    EMQ X 官方提供的插件包括:

    其中插件的加载有四种方式:

    1. 默认加载

    2. 命令行启停插件

    3. 使用 Dashboard 启停插件

    4. 调用管理 API 启停插件

    开启默认加载

    如需在系统启动时就默认启动某插件,则直接在 配置入需要启动的插件,例如默认开启的加载的插件有:

    命令行启停插件

    在运行过程中,我们可以通过 CLI 命令的方式查看可用的插件列表、和启停某插件:

    1. ## 显示所有可用的插件列表
    2. ./bin/emqx_ctl plugins list
    3. ## 加载某插件
    4. ./bin/emqx_ctl plugins load emqx_auth_username
    5. ## 卸载某插件
    6. ./bin/emqx_ctl plugins unload emqx_auth_username
    7. ## 重新加载某插件
    8. ./bin/emqx_ctl plugins reload emqx_auth_username

    使用 Dashboard 启停插件

    如果 EMQ X 开启了 Dashbord 的插件(默认开启) 还可以直接通过访问 http://localhost:18083/plugins 中的插件管理页面启停、或者配置插件。

    EMQ X 消息服务器的 Web 管理控制台, 该插件默认开启。当 EMQ X 启动成功后,可访问 http://localhost:18083 进行查看,默认用户名/密码: admin/public。

    Dashboard 中可查询 EMQ X 消息服务器基本信息、统计数据、负载情况,查询当前客户端列表(Connections)、会话(Sessions)、路由表(Topics)、订阅关系(Subscriptions) 等详细信息。

    除此之外,Dashboard 默认提供了一系列的 REST API 供前端调用。其详情可以参考 Dashboard -> HTTP API 部分。

    etc/plugins/emqx_dashboard.conf:

    1. ## Dashboard 默认用户名/密码
    2. dashboard.default_user.login = admin
    3. dashboard.default_user.password = public
    4. ## Dashboard HTTP 服务端口配置
    5. dashboard.listener.http = 18083
    6. dashboard.listener.http.acceptors = 2
    7. dashboard.listener.http.max_clients = 512
    8. ## Dashboard HTTPS 服务端口配置
    9. ## dashboard.listener.https = 18084
    10. ## dashboard.listener.https.acceptors = 2
    11. ## dashboard.listener.https.max_clients = 512
    12. ## dashboard.listener.https.handshake_timeout = 15s
    13. ## dashboard.listener.https.certfile = etc/certs/cert.pem
    14. ## dashboard.listener.https.keyfile = etc/certs/key.pem
    15. ## dashboard.listener.https.cacertfile = etc/certs/cacert.pem
    16. ## dashboard.listener.https.verify = verify_peer
    17. ## dashboard.listener.https.fail_if_no_peer_cert = true

    HTTP API 与 CLI 管理插件

    emqx_managementEMQ X 消息服务器的 HTTP API 与 CLI 管理插件,该插件默认开启。当 EMQ X 启动成功后,用户即可通过该插件提供的 HTTP API 与 CLI 进行查询当前客户端列表等操作,详见 与 管理命令 (Commands)

    HTTP API 与 CLI 管理设置

    etc/plugins/emqx_management.conf:

    1. ## 最多返回多少条数据,用于分页机制
    2. management.max_row_limit = 10000
    3. ## 默认的应用 secret
    4. # management.application.default_secret = public
    5. ## Management HTTP 服务器端口配置
    6. management.listener.http = 8080
    7. management.listener.http.acceptors = 2
    8. management.listener.http.max_clients = 512
    9. management.listener.http.backlog = 512
    10. management.listener.http.send_timeout = 15s
    11. management.listener.http.send_timeout_close = on
    12. ## Management HTTPS 服务器端口配置
    13. ## management.listener.https = 8081
    14. ## management.listener.https.acceptors = 2
    15. ## management.listener.https.max_clients = 512
    16. ## management.listener.https.backlog = 512
    17. ## management.listener.https.send_timeout = 15s
    18. ## management.listener.https.send_timeout_close = on
    19. ## management.listener.https.certfile = etc/certs/cert.pem
    20. ## management.listener.https.keyfile = etc/certs/key.pem
    21. ## management.listener.https.cacertfile = etc/certs/cacert.pem
    22. ## management.listener.https.verify = verify_peer
    23. ## management.listener.https.fail_if_no_peer_cert = true

    ClientID 认证插件

    目前只支持 连接认证,通过 clientidpassword 认证客户端。此插件在存储密码时会按照配置的 hash 算法将明文加密后存入。

    ClientID 认证配置

    etc/plugins/emqx_auth_clientid.conf:

    1. ## Default usernames Examples
    2. ##auth.client.1.clientid = id
    3. ##auth.client.1.password = passwd
    4. ##auth.client.2.clientid = dev:devid
    5. ##auth.client.2.password = passwd2
    6. ##auth.client.3.clientid = app:appid
    7. ##auth.client.3.password = passwd3
    8. ##auth.client.4.clientid = client~!@#$%^&*()_+
    9. ##auth.client.4.password = passwd~!@#$%^&*()_+
    10. ## 密码加密方式
    11. ## 枚举值: plain | md5 | sha | sha256
    12. auth.client.password_hash = sha256

    Username 认证插件

    emqx_auth_username 目前只支持 连接认证,通过 usernamepassword 认证客户端。此插件在存储密码时会按照配置的 hash 算法将明文加密后存入。

    用户名认证配置

    etc/plugins/emqx_auth_username.conf:

    1. ## Default usernames Examples:
    2. ##auth.user.1.username = admin
    3. ##auth.user.1.password = public
    4. ##auth.user.2.username = feng@emqtt.io
    5. ##auth.user.2.password = public
    6. ##auth.user.3.username = name~!@#$%^&*()_+
    7. ##auth.user.3.password = pwsswd~!@#$%^&*()_+
    8. ## 密码加密方式
    9. ## 枚举值: plain | md5 | sha | sha256
    10. auth.user.password_hash = sha256

    JWT 认证插件

    支持基于 JWT 的方式,对连接的客户端进行认证,只支持 连接认证 功能。它会解析并校验 Token 的合理性和时效性、满足则允许连接。

    JWT 认证配置

    etc/plugins/emqx_auth_jwt.conf:

    1. ## HMAC Hash 算法密钥
    2. auth.jwt.secret = emqxsecret
    3. ## RSA 或 ECDSA 算法的公钥
    4. ## auth.jwt.pubkey = etc/certs/jwt_public_key.pem
    5. ## JWT 串的来源
    6. ## 枚举值: username | password
    7. auth.jwt.from = password

    LDAP 认证/访问控制插件

    支持访问 LDAP 实现 连接认证访问控制 功能。

    LDAP 认证插件配置

    etc/plugins/emqx_auth_ldap.conf:

    1. auth.ldap.servers = 127.0.0.1
    2. auth.ldap.port = 389
    3. auth.ldap.pool = 8
    4. auth.ldap.bind_dn = cn=root,dc=emqx,dc=io
    5. auth.ldap.bind_password = public
    6. auth.ldap.timeout = 30s
    7. auth.ldap.device_dn = ou=device,dc=emqx,dc=io
    8. auth.ldap.match_objectclass = mqttUser
    9. auth.ldap.username.attributetype = uid
    10. auth.ldap.password.attributetype = userPassword
    11. auth.ldap.ssl = false
    12. ## auth.ldap.ssl.certfile = etc/certs/cert.pem
    13. ## auth.ldap.ssl.keyfile = etc/certs/key.pem
    14. ## auth.ldap.ssl.cacertfile = etc/certs/cacert.pem
    15. ## auth.ldap.ssl.verify = verify_peer
    16. ## auth.ldap.ssl.fail_if_no_peer_cert = true

    HTTP 认证/访问控制插件

    插件实现 连接认证访问控制 的功能。它会将每个请求发送到指定的 HTTP 服务,通过其返回值来判断是否具有操作权限。

    该插件总共支持三个请求分别为:

    1. auth.http.auth_req: 连接认证

    2. auth.http.super_req: 判断是否为超级用户

    3. auth.http.acl_req: 访问控制权限查询

    每个请求的参数都支持使用真实的客户端的 Username, IP 地址等进行自定义。

    注解

    其中在 3.1 版本中新增的 %C %d 的支持。

    HTTP 认证插件配置

    etc/plugins/emqx_auth_http.conf:

    1. ## http 请求超时时间, 0 为不设置超时
    2. ## auth.http.request.timeout = 0
    3. ## http 建立 tcp 连接的超时时间, 默认与 'request.timeout' 一致
    4. ## auth.http.request.connect_timout = 0
    5. ## http 请求最大重试次数
    6. auth.http.request.retry_times = 3
    7. ## http 重试间隔
    8. auth.http.request.retry_interval = 1s
    9. ## 重试间隔的退避指数, 实际值 = `interval * backoff ^ times`
    10. auth.http.request.retry_backoff = 2.0
    11. ## https 证书配置
    12. ## auth.http.ssl.cacertfile = {{ platform_etc_dir }}/certs/ca.pem
    13. ## auth.http.ssl.certfile = {{ platform_etc_dir }}/certs/client-cert.pem
    14. ## auth.http.ssl.keyfile = {{ platform_etc_dir }}/certs/client-key.pem
    15. ## 占位符:
    16. ## - %u: username
    17. ## - %c: clientid
    18. ## - %a: ipaddress
    19. ## - %P: password
    20. ## - %C: common name of client TLS cert
    21. ## - %d: subject of client TLS cert
    22. auth.http.auth_req = http://127.0.0.1:8080/mqtt/auth
    23. ## AUTH 请求的 HTTP 方法和参数配置
    24. auth.http.auth_req.method = post
    25. auth.http.auth_req.params = clientid=%c,username=%u,password=%P
    26. auth.http.super_req = http://127.0.0.1:8080/mqtt/superuser
    27. auth.http.super_req.method = post
    28. auth.http.super_req.params = clientid=%c,username=%u
    29. ## 占位符:
    30. ## - %A: 1 | 2, 1 = sub, 2 = pub
    31. ## - %u: username
    32. ## - %c: clientid
    33. ## - %a: ipaddress
    34. ## - %t: topic
    35. auth.http.acl_req = http://127.0.0.1:8080/mqtt/acl
    36. auth.http.acl_req.method = get
    37. auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t

    HTTP API 返回值处理

    连接认证

    1. ## 认证成功
    2. HTTP Status Code: 200
    3. ## 忽略此次认证
    4. HTTP Status Code: 200
    5. Body: ignore
    6. ## 认证失败
    7. HTTP Status Code: Except 200

    超级用户

    1. ## 确认为超级用户
    2. HTTP Status Code: 200
    3. ## 非超级用户
    4. HTTP Status Code: Except 200

    访问控制

    1. ## 允许 Publish/Subscribe:
    2. HTTP Status Code: 200
    3. ## 忽略此次鉴权:
    4. HTTP Status Code: 200
    5. Body: ignore
    6. ## 拒绝该次 Publish/Subscribe:
    7. HTTP Status Code: Except 200

    MySQL 认证/访问控制插件

    emqx_auth_mysql 支持访问 MySQL 实现 连接认证访问控制 功能。要实现这些功能,我们需要在 MySQL 中创建两张表,其格式如下:

    MQTT 用户表

    1. CREATE TABLE `mqtt_user` (
    2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    3. `username` varchar(100) DEFAULT NULL,
    4. `password` varchar(100) DEFAULT NULL,
    5. `salt` varchar(35) DEFAULT NULL,
    6. `is_superuser` tinyint(1) DEFAULT 0,
    7. `created` datetime DEFAULT NULL,
    8. PRIMARY KEY (`id`),
    9. UNIQUE KEY `mqtt_username` (`username`)
    10. ) ENGINE=MyISAM DEFAULT CHARSET=utf8;

    注解

    插件同样支持使用自定义结构的表,通过 auth_query 配置查询语句即可。

    MQTT 访问控制表

    1. CREATE TABLE `mqtt_acl` (
    2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    3. `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
    4. `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
    5. `username` varchar(100) DEFAULT NULL COMMENT 'Username',
    6. `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
    7. `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
    8. PRIMARY KEY (`id`)
    9. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    10. INSERT INTO `mqtt_acl` (`id`, `allow`, `ipaddr`, `username`, `clientid`, `access`, `topic`)
    11. VALUES
    12. (1,1,NULL,'$all',NULL,2,'#'),
    13. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
    14. (3,0,NULL,'$all',NULL,1,'eq #'),
    15. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    16. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    17. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

    配置 MySQL 认证鉴权插件

    etc/plugins/emqx_auth_mysql.conf:

    1. ## Mysql 服务器地址
    2. auth.mysql.server = 127.0.0.1:3306
    3. ## Mysql 连接池大小
    4. auth.mysql.pool = 8
    5. ## Mysql 连接用户名
    6. ## auth.mysql.username =
    7. ## Mysql 连接密码
    8. ## auth.mysql.password =
    9. ## Mysql 认证用户表名
    10. auth.mysql.database = mqtt
    11. ## Mysql 查询超时时间
    12. auth.mysql.query_timeout = 5s
    13. ## 可用占位符:
    14. ## - %u: username
    15. ## - %c: clientid
    16. ## - %C: common name of client TLS cert
    17. ## - %d: subject of client TLS cert
    18. ## 注: 该条 SQL 必须且仅需查询 `password` 字段
    19. auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1
    20. ## 密码加密方式: plain, md5, sha, sha256, pbkdf2
    21. auth.mysql.password_hash = sha256
    22. ## 超级用户查询语句
    23. auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1
    24. ## 注: 可以增加 'ORDER BY' 子句以控制 ACL 规则的生效顺序
    25. auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

    此外,为防止密码域过于简单而带来安全的隐患问题,该插件还支持密码加盐操作:

    1. ## 加盐密文格式
    2. ## auth.mysql.password_hash = salt,sha256
    3. ## auth.mysql.password_hash = salt,bcrypt
    4. ## auth.mysql.password_hash = sha256,salt
    5. ## pbkdf2 带 macfun 格式
    6. ## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
    7. ## auth.mysql.password_hash = pbkdf2,sha256,1000,20

    注解

    3.1 版本新增 %C %d 支持。

    通过访问 Postgres 实现 连接认证访问控制 功能。同样需要定义两张表如下:

    Postgres MQTT 用户表

    1. CREATE TABLE mqtt_user (
    2. id SERIAL primary key,
    3. is_superuser boolean,
    4. username character varying(100),
    5. password character varying(100),
    6. salt character varying(40)
    7. );
    1. CREATE TABLE mqtt_acl (
    2. id SERIAL primary key,
    3. allow integer,
    4. ipaddr character varying(60),
    5. username character varying(100),
    6. clientid character varying(100),
    7. access integer,
    8. topic character varying(100)
    9. );
    10. INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
    11. VALUES
    12. (1,1,NULL,'$all',NULL,2,'#'),
    13. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
    14. (3,0,NULL,'$all',NULL,1,'eq #'),
    15. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    16. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    17. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

    配置 Postgres 认证鉴权插件

    etc/plugins/emqx_auth_pgsql.conf:

    1. ## PostgreSQL 服务地址
    2. auth.pgsql.server = 127.0.0.1:5432
    3. ## PostgreSQL 连接池大小
    4. auth.pgsql.pool = 8
    5. auth.pgsql.username = root
    6. ## auth.pgsql.password =
    7. auth.pgsql.database = mqtt
    8. auth.pgsql.encoding = utf8
    9. ## 连接认证查询 SQL
    10. ## 占位符:
    11. ## - %u: username
    12. ## - %c: clientid
    13. ## - %C: common name of client TLS cert
    14. ## - %d: subject of client TLS cert
    15. auth.pgsql.auth_query = select password from mqtt_user where username = '%u' limit 1
    16. ## 加密方式: plain | md5 | sha | sha256 | bcrypt
    17. auth.pgsql.password_hash = sha256
    18. ## 超级用户查询语句 (占位符与认证一致)
    19. auth.pgsql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1
    20. ## ACL 查询语句
    21. ##
    22. ## 占位符:
    23. ## - %a: ipaddress
    24. ## - %u: username
    25. ## - %c: clientid
    26. ## 注: 可以增加 'ORDER BY' 子句以控制 ACL 规则的生效顺序
    27. auth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

    同样的 password_hash 可以配置为更为安全的模式:

    开启以下配置,则可支持 TLS 连接到 Postgres:

    1. ## 是否开启 SSL
    2. auth.pgsql.ssl = false
    3. ## 证书配置
    4. ## auth.pgsql.ssl_opts.keyfile =
    5. ## auth.pgsql.ssl_opts.certfile =
    6. ## auth.pgsql.ssl_opts.cacertfile =

    注解

    3.1 版本新增 %C %d 支持。

    Redis 认证/访问控制插件

    emqx_auth_redis 通过访问 Redis 数据以实现 连接认证访问控制 的功能。

    配置 Redis 认证插件

    etc/plugins/emqx_auth_redis.conf:

    1. ## Redis 服务集群类型
    2. ## 枚举值: single | sentinel | cluster
    3. auth.redis.type = single
    4. ## Redis 服务器地址
    5. ##
    6. ## Single Redis Server: 127.0.0.1:6379, localhost:6379
    7. ## Redis Sentinel: 127.0.0.1:26379,127.0.0.2:26379,127.0.0.3:26379
    8. ## Redis Cluster: 127.0.0.1:6379,127.0.0.2:6379,127.0.0.3:6379
    9. auth.redis.server = 127.0.0.1:6379
    10. ## Redis sentinel 名称
    11. ## auth.redis.sentinel = mymaster
    12. ## Redis 连接池大小
    13. auth.redis.pool = 8
    14. ## Redis database 序号
    15. auth.redis.database = 0
    16. ## Redis password.
    17. ## auth.redis.password =
    18. ## Redis 查询超时时间
    19. auth.redis.query_timeout = 5s
    20. ## 认证查询指令
    21. ## 占位符:
    22. ## - %u: username
    23. ## - %c: clientid
    24. ## - %C: common name of client TLS cert
    25. ## - %d: subject of client TLS cert
    26. auth.redis.auth_cmd = HMGET mqtt_user:%u password
    27. ## 密码加密方式.
    28. ## 枚举: plain | md5 | sha | sha256 | bcrypt
    29. auth.redis.password_hash = plain
    30. ## 超级用户查询指令 (占位符与认证一致)
    31. auth.redis.super_cmd = HGET mqtt_user:%u is_superuser
    32. ## ACL 查询指令
    33. ## 占位符:
    34. ## - %u: username
    35. ## - %c: clientid
    36. auth.redis.acl_cmd = HGETALL mqtt_acl:%u

    同样,该插件支持更安全的密码格式:

    1. ## 加盐密文格式
    2. ## auth.redis.password_hash = salt,sha256
    3. ## auth.redis.password_hash = sha256,salt
    4. ## auth.redis.password_hash = salt,bcrypt
    5. ## pbkdf2 macfun 格式
    6. ## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
    7. ## auth.redis.password_hash = pbkdf2,sha256,1000,20

    注解

    3.1 版本新增 %C %d 支持。

    Redis 用户 Hash

    默认基于用户 Hash 认证:

    1. HSET mqtt_user:<username> is_superuser 1
    2. HSET mqtt_user:<username> password "passwd"
    3. HSET mqtt_user:<username> salt "salt"

    Redis ACL 规则 Hash

    默认采用 Hash 存储 ACL 规则:

    1. HSET mqtt_acl:<username> topic1 1
    2. HSET mqtt_acl:<username> topic2 2
    3. HSET mqtt_acl:<username> topic3 3

    注解

    1: subscribe, 2: publish, 3: pubsub

    MongoDB 认证/访问控制插件

    通过访问 MongoDB 实现 连接认证访问控制 功能。

    配置 MongoDB 认证插件

    etc/plugins/emqx_auth_mongo.conf:

    1. ## MongoDB 拓扑类型
    2. ## 枚举: single | unknown | sharded | rs
    3. auth.mongo.type = single
    4. ## rs 模式下的 `set name`
    5. ## auth.mongo.rs_set_name =
    6. ## MongoDB 服务地址
    7. auth.mongo.server = 127.0.0.1:27017
    8. ## MongoDB 连接池大小
    9. auth.mongo.pool = 8
    10. ## 连接认证信息
    11. ## auth.mongo.login =
    12. ## auth.mongo.password =
    13. ## auth.mongo.auth_source = admin
    14. ## 认证数据表名
    15. auth.mongo.database = mqtt
    16. ## 查询超时时间
    17. auth.mongo.query_timeout = 5s
    18. ## 认证查询配置
    19. auth.mongo.auth_query.collection = mqtt_user
    20. auth.mongo.auth_query.password_field = password
    21. auth.mongo.auth_query.password_hash = sha256
    22. ## 连接认证查询字段列表
    23. ## 占位符:
    24. ## - %u: username
    25. ## - %c: clientid
    26. ## - %C: common name of client TLS cert
    27. ## - %d: subject of client TLS cert
    28. auth.mongo.auth_query.selector = username=%u
    29. ## 超级用户查询
    30. auth.mongo.super_query = on
    31. auth.mongo.super_query.collection = mqtt_user
    32. auth.mongo.super_query.super_field = is_superuser
    33. auth.mongo.super_query.selector = username=%u
    34. ## ACL 查询配置
    35. auth.mongo.acl_query = on
    36. auth.mongo.acl_query.collection = mqtt_acl
    37. auth.mongo.acl_query.selector = username=%u

    注解

    3.1 版本新增 %C %d 支持。

    MongoDB 数据库

    1. use mqtt
    2. db.createCollection("mqtt_user")
    3. db.createCollection("mqtt_acl")
    4. db.mqtt_user.ensureIndex({"username":1})

    注解

    数据库、集合名称可自定义。

    MongoDB 用户集合

    1. {
    2. username: "user",
    3. password: "password hash",
    4. is_superuser: boolean (true, false),
    5. created: "datetime"
    6. }

    示例:

    1. db.mqtt_user.insert({username: "test", password: "password hash", is_superuser: false})
    2. db.mqtt_user:insert({username: "root", is_superuser: true})

    MongoDB ACL 集合

    1. {
    2. username: "username",
    3. clientid: "clientid",
    4. publish: ["topic1", "topic2", ...],
    5. subscribe: ["subtop1", "subtop2", ...],
    6. pubsub: ["topic/#", "topic1", ...]
    7. }

    示例:

    1. db.mqtt_acl.insert({username: "test", publish: ["t/1", "t/2"], subscribe: ["user/%u", "client/%c"]})
    2. db.mqtt_acl.insert({username: "admin", pubsub: ["#"]})

    PSK 认证插件

    emqx_psk_file 插件主要提供了 PSK 支持。其目的是用于在客户端建立 TLS/DTLS 连接时,通过 PSK 方式实现 连接认证 的功能。

    配置 PSK 认证插件

    etc/plugins/emqx_psk_file.conf:

    1. psk.file.path = etc/psk.txt

    WebHook 插件

    插件可以将所有 EMQ X 的事件及消息都发送到指定的 HTTP 服务器。

    配置 WebHook 插件

    etc/plugins/emqx_web_hook.conf:

    1. ## 回调的 Web Server 地址
    2. web.hook.api.url = http://127.0.0.1:8080
    3. ## 编码 Payload 字段
    4. ## 枚举值: undefined | base64 | base62
    5. ## 默认值: undefined (不进行编码)
    6. ## web.hook.encode_payload = base64
    7. ## 消息、事件配置
    8. web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
    9. web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
    10. web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"}
    11. web.hook.rule.client.unsubscribe.1 = {"action": "on_client_unsubscribe"}
    12. web.hook.rule.session.created.1 = {"action": "on_session_created"}
    13. web.hook.rule.session.subscribed.1 = {"action": "on_session_subscribed"}
    14. web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
    15. web.hook.rule.session.terminated.1 = {"action": "on_session_terminated"}
    16. web.hook.rule.message.publish.1 = {"action": "on_message_publish"}
    17. web.hook.rule.message.deliver.1 = {"action": "on_message_deliver"}
    18. web.hook.rule.message.acked.1 = {"action": "on_message_acked"}

    Lua 插件

    emqx_lua_hook 插件将所有的事件和消息都发送到指定的 Lua 函数上。其具体使用参见其 README。

    Retainer 插件

    该插件设置为默认启动,为 EMQ X 提供 Retained 类型的消息支持。它会将所有主题的 Retained 消息存储在集群的数据库中,并待有客户端订阅该主题的时候将该消息投递出去。

    配置 Retainer 插件

    etc/plugins/emqx_retainer.conf:

    1. ## retained 消息存储方式
    2. ## - ram: 仅内存
    3. ## - disc: 内存和磁盘
    4. ## - disc_only: 仅磁盘
    5. ## 最大存储数 (0表示未限制)
    6. retainer.max_retained_messages = 0
    7. ## 单条最大可存储消息大小
    8. retainer.max_payload_size = 1MB
    9. ## 过期时间, 0 表示永不过期
    10. ## 单位: h 小时; m 分钟; s 秒。如 60m 表示 60 分钟
    11. retainer.expiry_interval = 0

    MQTT 消息桥接插件

    桥接 的概念是 EMQ X 支持将自身某类主题的消息通过某种方式转发到另一个 MQTT Broker。

    目前 MQTT 消息插件支持的桥接方式有:

    • RPC 桥接:RPC 桥接只能在 EMQ X Broker 间使用,且不支持订阅远程节点的主题去同步数据

    • MQTT 桥接:MQTT 桥接同时支持转发和通过订阅主题来实现数据同步两种方式

    在 EMQ X 中,通过修改 etc/plugins/emqx_bridge_mqtt.conf 来配置 bridge。EMQ X 根据不同的 name 来区分不同的 bridge。例如:

    1. ## 桥接地址: 使用节点名(nodename@host)则用于 RPC 桥接,使用 host:port 用于 MQTT 连接
    2. bridge.mqtt.aws.address = 127.0.0.1:1883

    该项配置声明了一个名为 aws 的 bridge 并指定以 MQTT 的方式桥接到 127.0.0.1:1883 这台 MQTT 服务器

    在需要创建多个 bridge 时,可以先复制其全部的配置项,在通过使用不同的 name 来标示(比如 bridge.mqtt.$name.address 其中 $name 指代的为 bridge 的名称)

    etc/plugins/emqx_bridge_mqtt.conf

    1. ## 桥接地址: 使用节点名(nodename@host)则用于 RPC 桥接,使用 host:port 用于 MQTT 连接
    2. bridge.mqtt.aws.address = 192.168.1.2:1883
    3. ## 桥接的协议版本
    4. ## 枚举值: mqttv3 | mqttv4 | mqttv5
    5. bridge.mqtt.aws.proto_ver = mqttv4
    6. ## 客户端的 client_id
    7. bridge.mqtt.aws.client_id = bridge_emq
    8. ## 注: 有些 MQTT Broker 需要将 clean_start 值设成 `true`
    9. bridge.mqtt.aws.clean_start = true
    10. ## 客户端的 username 字段
    11. bridge.mqtt.aws.username = user
    12. ## 客户端的 password 字段
    13. bridge.mqtt.aws.password = passwd
    14. ## 客户端是否使用 ssl 来连接远程服务器
    15. bridge.mqtt.aws.ssl = off
    16. ## 客户端 SSL 连接的 CA 证书 (PEM格式)
    17. bridge.mqtt.aws.cacertfile = etc/certs/cacert.pem
    18. ## 客户端 SSL 连接的 SSL 证书
    19. bridge.mqtt.aws.certfile = etc/certs/client-cert.pem
    20. ## 客户端 SSL 连接的密钥文件
    21. bridge.mqtt.aws.keyfile = etc/certs/client-key.pem
    22. ## SSL 加密方式
    23. bridge.mqtt.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
    24. ## TLS PSK 的加密套件
    25. ## 注意 'listener.ssl.external.ciphers' 和 'listener.ssl.external.psk_ciphers' 不能同时配置
    26. ##
    27. ## See 'https://tools.ietf.org/html/rfc4279#section-2'.
    28. ## bridge.mqtt.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
    29. ## 客户端的心跳间隔
    30. bridge.mqtt.aws.keepalive = 60s
    31. ## 支持的 TLS 版本
    32. bridge.mqtt.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1
    33. ## 需要被转发的消息的主题
    34. bridge.mqtt.aws.forwards = sensor1/#,sensor2/#
    35. ## 挂载点(mountpoint)
    36. bridge.mqtt.aws.mountpoint = bridge/emqx2/${node}/
    37. ## 订阅对端的主题
    38. bridge.mqtt.aws.subscription.1.topic = cmd/topic1
    39. ## 订阅对端主题的 QoS
    40. bridge.mqtt.aws.subscription.1.qos = 1
    41. ## 桥接的重连间隔
    42. ## 默认: 30秒
    43. bridge.mqtt.aws.reconnect_interval = 30s
    44. ## QoS1/QoS2 消息的重传间隔
    45. bridge.mqtt.aws.retry_interval = 20s
    46. ## Inflight 大小.
    47. bridge.mqtt.aws.max_inflight_batches = 32
    48. ## emqx_bridge 内部用于 batch 的消息数量
    49. bridge.mqtt.aws.queue.batch_count_limit = 32
    50. ## emqx_bridge 内部用于 batch 的消息字节数
    51. bridge.mqtt.aws.queue.batch_bytes_limit = 1000MB
    52. ## 放置 replayq 队列的路径,如果没有在配置中指定该项,那么 replayq
    53. ## 将会以 `mem-only` 的模式运行,消息不会缓存到磁盘上。
    54. bridge.mqtt.aws.queue.replayq_dir = data/emqx_aws_bridge/
    55. ## Replayq 数据段大小
    56. bridge.mqtt.aws.queue.replayq_seg_bytes = 10MB

    emqx_delayed_publish 提供了延迟发送消息的功能。当客户端使用特殊主题前缀 $delayed/<seconds>/ 发布消息到 EMQ X 时,EMQ X 将在 <seconds> 秒后发布该主题消息。

    CoAP 协议插件

    提供对 CoAP 协议(RFC 7252)的支持。

    配置 CoAP 协议插件

    etc/plugins/emqx_coap.conf:

    1. coap.port = 5683
    2. coap.keepalive = 120s
    3. coap.enable_stats = off

    若开启以下配置,则可以支持 DTLS:

    1. ## DTLS 监听端口
    2. coap.dtls.port = 5684
    3. coap.dtls.keyfile = {{ platform_etc_dir }}/certs/key.pem
    4. coap.dtls.certfile = {{ platform_etc_dir }}/certs/cert.pem
    5. ## 双向认证相关
    6. ## coap.dtls.verify = verify_peer
    7. ## coap.dtls.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
    8. ## coap.dtls.fail_if_no_peer_cert = false

    测试 CoAP 插件

    我们可以通过安装 libcoap 来测试 EMQ X 对 CoAP 协议的支持情况。

    LwM2M 协议插件

    提供对 LwM2M 协议的支持。

    配置 LwM2M 插件

    etc/plugins/emqx_lwm2m.conf:

    1. ## LwM2M 监听端口
    2. lwm2m.port = 5683
    3. ## Lifetime 限制
    4. lwm2m.lifetime_min = 1s
    5. lwm2m.lifetime_max = 86400s
    6. ## Q Mode 模式下 `time window` 长度, 单位秒。
    7. ## 超过该 window 的消息都将被缓存
    8. #lwm2m.qmode_time_window = 22
    9. ## LwM2M 是否部署在 coaproxy 后
    10. #lwm2m.lb = coaproxy
    11. ## 设备上线后,主动 observe 所有的 objects
    12. #lwm2m.auto_observe = off
    13. # 主题挂载点
    14. # Placeholders supported:
    15. # '%e': Endpoint Name
    16. # '%a': IP Address
    17. lwm2m.mountpoint = lwm2m/%e/
    18. ## client register 成功后主动向 EMQ X 订阅的主题
    19. ## 占位符:
    20. ## '%e': Endpoint Name
    21. ## '%a': IP Address
    22. lwm2m.topics.command = dn/#
    23. ## client 应答消息(response) 到 EMQ X 的主题
    24. lwm2m.topics.response = up/resp
    25. ## client 通知类消息(noify message) 到 EMQ X 的主题
    26. lwm2m.topics.notify = up/notify
    27. ## client 注册类消息(register message) 到 EMQ X 的主题
    28. lwm2m.topics.register = up/resp
    29. # client 更新类消息(update message) 到 EMQ X 的主题
    30. lwm2m.topics.update = %e/up/resp
    31. # Object 定义的 xml 文件位置
    32. lwm2m.xml_dir = etc/lwm2m_xml

    同样可以通过以下配置打开 DTLS 支持:

    1. # DTLS 证书配置
    2. lwm2m.certfile = etc/certs/cert.pem
    3. lwm2m.keyfile = etc/certs/key.pem

    MQTT-SN 协议插件

    emqx_sn 插件提供对 协议的支持。

    配置 MQTT-SN 协议插件

    etc/plugins/emqx_sn.conf:

    1. mqtt.sn.port = 1884

    Stomp 协议插件

    emqx_stomp 提供对 Stomp 协议的支持。支持客户端通过 Stomp 1.0/1.1/1.2 协议连接 EMQ X,发布订阅 MQTT 消息。

    配置 Stomp 插件

    注解

    Stomp 协议端口: 61613

    etc/plugins/emqx_stomp.conf:

    1. stomp.default_user.login = guest
    2. stomp.default_user.passcode = guest
    3. stomp.allow_anonymous = true
    4. stomp.frame.max_headers = 10
    5. stomp.frame.max_header_length = 1024
    6. stomp.frame.max_body_length = 8192
    7. stomp.listener = 61613
    8. stomp.listener.acceptors = 4
    9. stomp.listener.max_clients = 512

    Recon 性能调试插件

    插件集成了 recon 性能调测库,可用于查看当前系统的一些状态信息,例如:

    1. ./bin/emqx_ctl recon
    2. recon memory #recon_alloc:memory/2
    3. recon allocated #recon_alloc:memory(allocated_types, current|max)
    4. recon bin_leak #recon:bin_leak(100)
    5. recon node_stats #recon:node_stats(10, 1000)
    6. recon remote_load Mod #recon:remote_load(Mod)

    配置 Recon 插件

    etc/plugins/emqx_recon.conf:

    1. %% Garbage Collection: 10 minutes
    2. recon.gc_interval = 600

    Reloader 热加载插件

    emqx_reloader 用于开发调试的代码热升级插件。加载该插件后 EMQ X 会根据配置的时间间隔自动热升级更新代码。

    同时,也提供了 CLI 命令来指定 reload 某一个模块:

    1. ./bin/emqx_ctl reload <Module>

    注解

    产品部署环境不建议使用该插件。

    配置 Reloader 插件

    etc/plugins/emqx_reloader.conf:

    1. reloader.interval = 60
    2. reloader.logfile = log/reloader.log

    插件开发模版

    是一个 EMQ X 插件模板,在功能上并无任何意义。

    开发者需要自定义插件时,可以查看该插件的代码和结构,以更快地开发一个标准的 EMQ X 插件。插件实际是一个普通的 Erlang Application,其配置文件为: etc/${PluginName}.config

    创建插件项目

    参考 emqx_plugin_template 插件模版创建新的插件项目。

    注解

    <plugin name>_app.erl 文件中必须加上标签 -emqx_plugin(?MODULE). 以表明这是一个 EMQ X 的插件。

    创建认证/访问控制模块

    认证演示模块 - emqx_auth_demo.erl

    1. -module(emqx_auth_demo).
    2. -export([ init/1
    3. , check/2
    4. , description/0
    5. ]).
    6. init(Opts) -> {ok, Opts}.
    7. check(_Credentials = #{client_id := ClientId, username := Username, password := Password}, _State) ->
    8. io:format("Auth Demo: clientId=~p, username=~p, password=~p~n", [ClientId, Username, Password]),
    9. ok.
    10. description() -> "Auth Demo Module".

    访问控制演示模块 - emqx_acl_demo.erl

    1. -module(emqx_acl_demo).
    2. -include_lib("emqx/include/emqx.hrl").
    3. %% ACL callbacks
    4. -export([ init/1
    5. , check_acl/5
    6. , reload_acl/1
    7. , description/0
    8. ]).
    9. init(Opts) ->
    10. {ok, Opts}.
    11. check_acl({Credentials, PubSub, _NoMatchAction, Topic}, _State) ->
    12. io:format("ACL Demo: ~p ~p ~p~n", [Credentials, PubSub, Topic]),
    13. allow.
    14. reload_acl(_State) ->
    15. ok.
    16. description() -> "ACL Demo Module".

    注册认证、访问控制模块 - emqx_plugin_template_app.erl

    1. ok = emqx:hook('client.authenticate', fun emqx_auth_demo:check/2, []),
    2. ok = emqx:hook('client.check_acl', fun emqx_acl_demo:check_acl/5, []).

    注册钩子(Hooks)

    通过钩子(Hook)处理客户端上下线、主题订阅、消息收发。

    emqx_plugin_template.erl:

    1. %% Called when the plugin application start
    2. load(Env) ->
    3. emqx:hook('client.authenticate', fun ?MODULE:on_client_authenticate/2, [Env]),
    4. emqx:hook('client.check_acl', fun ?MODULE:on_client_check_acl/5, [Env]),
    5. emqx:hook('client.connected', fun ?MODULE:on_client_connected/4, [Env]),
    6. emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
    7. emqx:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]),
    8. emqx:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]),
    9. emqx:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
    10. emqx:hook('session.resumed', fun ?MODULE:on_session_resumed/3, [Env]),
    11. emqx:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
    12. emqx:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
    13. emqx:hook('session.terminated', fun ?MODULE:on_session_terminated/3, [Env]),
    14. emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
    15. emqx:hook('message.deliver', fun ?MODULE:on_message_deliver/3, [Env]),
    16. emqx:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]),
    17. emqx:hook('message.dropped', fun ?MODULE:on_message_dropped/3, [Env]).

    所有可用钩子(Hook)说明:

    钩子

    说明

    client.authenticate

    连接认证

    client.check_acl

    ACL 校验

    client.connected

    客户端上线

    client.disconnected

    客户端连接断开

    client.subscribe

    客户端订阅主题

    client.unsubscribe

    客户端取消订阅主题

    session.created

    会话创建

    session.resumed

    会话恢复

    session.subscribed

    会话订阅主题后

    session.unsubscribed

    会话取消订阅主题后

    session.terminated

    会话终止

    message.publish

    MQTT 消息发布

    message.deliver

    MQTT 消息进行投递

    message.acked

    MQTT 消息回执

    message.dropped

    MQTT 消息丢弃

    注册 CLI 命令

    扩展命令行演示模块 - emqx_cli_demo.erl

    1. -module(emqx_cli_demo).
    2. -export([cmd/1]).
    3. cmd(["arg1", "arg2"]) ->
    4. emqx_cli:print("ok");
    5. cmd(_) ->
    6. emqx_cli:usage([{"cmd arg1 arg2", "cmd demo"}]).

    注册命令行模块 - emqx_plugin_template_app.erl

    1. ok = emqx_ctl:register_command(cmd, {emqx_cli_demo, cmd}, []),

    插件加载后,./bin/emqx_ctl 新增命令行:

    1. ./bin/emqx_ctl cmd arg1 arg2

    插件自带配置文件放置在 etc/${plugin_name}.conf|configEMQ X 支持两种插件配置格式:

    1. Erlang 原生配置文件格式 - ${plugin_name}.config:

      1. [
      2. {plugin_name, [
      3. {key, value}
      4. ]}
      5. ].
    2. sysctl 的 k = v 通用格式 - ${plugin_name}.conf:

      1. plugin_name.key = value

    k = v 格式配置需要插件开发者创建 priv/plugin_name.schema 映射文件。

    编译发布插件

    1. clone emqx-rel 项目:
    1. git clone https://github.com/emqx/emqx-rel.git
    1. rebar.config 添加依赖:
    1. rebar.config 中 relx 段落添加:
    1. {relx,
    2. [...
    3. , ...
    4. , {release, {emqx, git_describe},
    5. [
    6. {plugin_name, load},
    7. ]
    8. }
    9. ]