保存数据到 DolphinDB

    EMQX 用 Erlang 实现了 DolphinDB 的客户端 API,它通过 TCP 的方式将数据传输到 DolphinDB 进行存储。

    目前,EMQX 仅适配 DolphinDB 1.20.7 的版本。

    以 Linux 版本为例,前往官网下载社区最新版本的 Linux64 安装包:

    将安装包的 server 目录上传至服务器目录 ,并测试启动是否正常:

    启动成功,并得到正确输出,表示成功安装 DolphinDB。然后使用 <CRTL+D> 关闭 DolphinDB。

    现在,我们需要打开 DolphinDB 的 StreamTable 的发布/订阅的功能,并创建相关数据表,以实现 EMQX 消息存储并持久化的功能:

    1. 修改 DolphinDB 的配置文件 vim dolphindb.cfg 加入以下配置项,以打开 发布/订阅 的功能:
    1. ## Publisher for streaming
    2. maxPubConnections=10
    3. persistenceDir=/ddb/pubdata/
    4. #persistenceWorkerNum=
    5. #maxPersistenceQueueDepth=
    6. #maxMsgNumPerBlock=
    7. #maxPubQueueDepthPerSite=
    8. ## Subscriber for streaming
    9. subPort=8000
    10. #subExecutorPooling=
    11. #maxSubQueueDepth=
    1. 后台启动 DolphinDB 服务:
    1. 前往 DolphinDB 官网,下载合适的 GUI 客户端连接 DolphinDB 服务:

      • 前往 下载页 (opens new window) 下载 DolphinDB GUI
      • DolphinDB GUI 客户端依赖 Java 环境,先确保已经安装 Java
      • 前往 DolphinDB GUI 目录中执行 sh gui.sh 启动客户端
      • 在客户端中添加 Server 并创建一个 Project,和脚本文件。
    2. 创建分布式数据库,和 StreamTable 表;并将 StreamTable 的数据持久化到分布式表中:
    1. // 创建一个名为 emqx 的 分布式文件数据库
    2. // 并创建一张名为 `msg` 表,按 `clientid` 和 `topic` 的 HASH 值进行分区:
    3. schema = table(1:0, `clientid`topic`qos`payload, [STRING, STRING, INT, STRING])
    4. db1 = database("", HASH, [STRING, 8])
    5. db2 = database("", HASH, [STRING, 8])
    6. db.createPartitionedTable(schema, "msg",`clientid`topic)
    7. // 创建名为 `st_msg` 的 StreamTable 表,并将数据持久化到 `msg` 表。
    8. share streamTable(10000:0,`clientid`topic`qos`payload, [STRING,STRING,INT,STRING]) as st_msg
    9. msg_ref= loadTable("dfs://emqx", "msg")
    10. subscribeTable(, "st_msg", "save_msg_to_dfs", 0, msg_ref, true)
    11. // 查询 msg_ref;检查是否创建成功
    12. select * from msg_ref;

    完成后,可以看到一张空的 msg_ref 已创建成功:

    Create DolphinDB Table

    至此,DolphinDB 的配置已经完成了。

    详细的 DolphinDB 使用文档请参考:

    配置规则引擎

    打开 EMQX Dashboard (opens new window),选择左侧的 “规则” 选项卡。

    填写规则 SQL:

    image

    关联动作:

    在 “响应动作” 界面选择 “添加”,然后在 “动作” 下拉框里选择 “保存数据到 DolphinDB”。

    填写动作参数:

    “保存数据到 DolphinDB” 动作需要两个参数:

    1). SQL 模板。这个例子里我们向流表 st_msg 中插入一条数据,SQL 模板为:

    1. insert into st_msg values('${clientid}', '${topic}', ${qos}, '${payload}')

    2). 关联资源的 ID。现在资源下拉框为空,可以点击右上角的 “新建资源” 来创建一个DolphinDB 资源:

    填写资源配置:

    image

    点击 “确定” 按钮。

    返回响应动作界面,点击 “确定”。

    返回规则创建界面,点击 “创建”。

    image

    在规则列表里,点击 “查看” 按钮或规则 ID 连接,可以预览刚才创建的规则:

    规则已经创建完成,现在发一条数据:

    然后检查持久化的 表,新的数据是否添加成功:

    image