Save data to DolphinDB
EMQX uses Erlang to implement DolphinDB’s client API, which transmits data to DolphinDB for storage through TCP.
Currently, EMQX only adapts to DolphinDB 1.20.7 version.
Taking the Linux version as an example, you can go to the official website to download the latest version of the Linux64 installation package from the community:
Upload the server directory of the installation package to the server directory , and test whether the startup is normal:
If the startup is successful and the correct output is obtained, it indicates that DolphinDB is successfully installed. Then use <CRTL+D>
to close DolphinDB.
Now, we need to open the publish / subscribe function of streamtable in dolphin dB and create relevant data tables to realize the function of EMQ x message storage and persistence:
- Modify the DolphinDB’s configuration file
vim dolphindb.cfg
and add the following configuration items to enable the publish/subscribe function:
## Publisher for streaming
maxPubConnections=10
persistenceDir=/ddb/pubdata/
#persistenceWorkerNum=
#maxPersistenceQueueDepth=
#maxMsgNumPerBlock=
#maxPubQueueDepthPerSite=
## Subscriber for streaming
subPort=8000
#subExecutorPooling=
#maxSubQueueDepth=
- Start the dolphin DB service from the background:
Go to the official website of DolphinDB and download a suitable GUI client to connect to the DolphinDB service:
- Go to download page (opens new window) to download
DolphinDB GUI
- DolphinDB GUI client depends on the Java environment, therefore, make sure that Java is installed at first
- Go to the DolphinDB GUI directory and execute
sh gui.sh
to start the client - Add Server in the client and create a Project with script files.
- Go to download page (opens new window) to download
- Create a distributed database and a streamtable table, and persist the data of streamtable into the distributed table
// Create a distributed file database named emqx
// And create a table named `msg`, partition by the hash values of `clientid` and `topic`:
schema = table(1:0, `clientid`topic`qos`payload, [STRING, STRING, INT, STRING])
db1 = database("", HASH, [STRING, 8])
db = database("dfs://emqx", COMPO, [db1, db2])
// Create a StreamTable table named `st_msg` and persist the data to the `msg` table.
share streamTable(10000:0,`clientid`topic`qos`payload, [STRING,STRING,INT,STRING]) as st_msg
msg_ref= loadTable("dfs://emqx", "msg")
subscribeTable(, "st_msg", "save_msg_to_dfs", 0, msg_ref, true)
// Query msg_ref to check whether the creation is successful
select * from msg_ref;
After that, you can see that an empty msg_ref
has been created successfully:
So far, the configuration of DolphinDB has been completed.
- User Guide:
- Examples of IoT scenarios: https://gitee.com/dolphindb/Tutorials\_CN/blob/master/iot\_examples.md
- Stream processing guidelines:
- Programming manual: https://www.dolphindb.cn/cn/help/index.html
Configure the rules engine
Create rules:
Open EMQX Dashboard (opens new window) and select the “Rules” tab on the left.
Fill in the rule SQL:
Related actions:
On the “Response Action” interface, select “Add”, and then select “Save Data to DolphinDB” in the “Action” drop-down box.
Fill in the action parameters:
The “Save data to DolphinDB” action requires two parameters:
1). SQL template. In this example, we insert a piece of data into the stream table st_msg
, and the SQL template is:
insert into st_msg values(${clientid}, ${topic}, ${qos}, ${payload})
Fill in the server address corresponding to the DolphinDB server deployed above. The user name is admin
and the password is 123456
Click the “OK” button.
Return to the response action interface and click “OK”.
Return to the rule creation interface and click “Create”.
In the rule list, click the “View” button or the rule ID connection to preview the rule just created:
The rule has been created. Now, send a piece of data: