Save data to DolphinDB

    EMQX uses Erlang to implement DolphinDB’s client API, which transmits data to DolphinDB for storage through TCP.

    Currently, EMQX only adapts to DolphinDB 1.20.7 version.

    Taking the Linux version as an example, you can go to the official website to download the latest version of the Linux64 installation package from the community:

    Upload the server directory of the installation package to the server directory , and test whether the startup is normal:

    If the startup is successful and the correct output is obtained, it indicates that DolphinDB is successfully installed. Then use <CRTL+D> to close DolphinDB.

    Now, we need to open the publish / subscribe function of streamtable in dolphin dB and create relevant data tables to realize the function of EMQ x message storage and persistence:

    1. Modify the DolphinDB’s configuration file vim dolphindb.cfg and add the following configuration items to enable the publish/subscribe function:
    1. ## Publisher for streaming
    2. maxPubConnections=10
    3. persistenceDir=/ddb/pubdata/
    4. #persistenceWorkerNum=
    5. #maxPersistenceQueueDepth=
    6. #maxMsgNumPerBlock=
    7. #maxPubQueueDepthPerSite=
    8. ## Subscriber for streaming
    9. subPort=8000
    10. #subExecutorPooling=
    11. #maxSubQueueDepth=
    1. Start the dolphin DB service from the background:
    1. Go to the official website of DolphinDB and download a suitable GUI client to connect to the DolphinDB service:

      • Go to download page (opens new window) to download DolphinDB GUI
      • DolphinDB GUI client depends on the Java environment, therefore, make sure that Java is installed at first
      • Go to the DolphinDB GUI directory and execute sh gui.sh to start the client
      • Add Server in the client and create a Project with script files.
    2. Create a distributed database and a streamtable table, and persist the data of streamtable into the distributed table
    1. // Create a distributed file database named emqx
    2. // And create a table named `msg`, partition by the hash values of `clientid` and `topic`:
    3. schema = table(1:0, `clientid`topic`qos`payload, [STRING, STRING, INT, STRING])
    4. db1 = database("", HASH, [STRING, 8])
    5. db = database("dfs://emqx", COMPO, [db1, db2])
    6. // Create a StreamTable table named `st_msg` and persist the data to the `msg` table.
    7. share streamTable(10000:0,`clientid`topic`qos`payload, [STRING,STRING,INT,STRING]) as st_msg
    8. msg_ref= loadTable("dfs://emqx", "msg")
    9. subscribeTable(, "st_msg", "save_msg_to_dfs", 0, msg_ref, true)
    10. // Query msg_ref to check whether the creation is successful
    11. select * from msg_ref;

    After that, you can see that an empty msg_ref has been created successfully:

    Create DolphinDB Table

    So far, the configuration of DolphinDB has been completed.

    Configure the rules engine

    Create rules:

    Open EMQX Dashboard (opens new window) and select the “Rules” tab on the left.

    Fill in the rule SQL:

    image

    Related actions:

    On the “Response Action” interface, select “Add”, and then select “Save Data to DolphinDB” in the “Action” drop-down box.

    Fill in the action parameters:

    The “Save data to DolphinDB” action requires two parameters:

    1). SQL template. In this example, we insert a piece of data into the stream table st_msg, and the SQL template is:

    1. insert into st_msg values(${clientid}, ${topic}, ${qos}, ${payload})

    Fill in the server address corresponding to the DolphinDB server deployed above. The user name is admin and the password is 123456

    image

    Click the “OK” button.

    Return to the response action interface and click “OK”.

    Return to the rule creation interface and click “Create”.

    image

    In the rule list, click the “View” button or the rule ID connection to preview the rule just created:

    The rule has been created. Now, send a piece of data:

    image