TDengine内嵌支持轻量级的消息订阅与推送服务。 使用系统提供的API,用户可使用普通查询语句订阅数据库中的一张或多张表。 订阅的逻辑和操作状态的维护均是由客户端完成,客户端定时轮询服务器是否有新的记录到达, 有新的记录到达就会将结果反馈到客户。

    TDengine的订阅与推送服务的状态是客户端维持,TDengine服务器并不维持。 因此如果应用重启,从哪个时间点开始获取最新数据,由应用决定。

    TDengine的API中,与订阅相关的主要有以下三个:

    这些API的文档请见 , 下面仍以智能电表场景为例介绍一下它们的具体用法(超级表和子表结构请参考上一节“连续查询”), 完整的示例代码可以在 这里 找到。

    如果我们希望当某个电表的电流超过一定限制(比如10A)后能得到通知并进行一些处理, 有两种方法: 一是分别对每张子表进行查询,每次查询后记录最后一条数据的时间戳,后续只查询这个时间戳之后的数据:

    1. select * from D1002 where ts > {last_timestamp2} and current > 10;
    2. ...

    这确实可行,但随着电表数量的增加,查询数量也会增加,客户端和服务端的性能都会受到影响, 当电表数增长到一定的程度,系统就无法承受了。

    另一种方法是对超级表进行查询。这样,无论有多少电表,都只需一次查询:

    1. select * from meters where ts > {last_timestamp} and current > 10;

    但是,如何选择 last_timestamp 就成了一个新的问题。 因为,一方面数据的产生时间(也就是数据时间戳)和数据入库的时间一般并不相同,有时偏差还很大; 另一方面,不同电表的数据到达TDengine的时间也会有差异。 所以,如果我们在查询中使用最慢的那台电表的数据的时间戳作为 last_timestamp, 就可能重复读入其它电表的数据; 如果使用最快的电表的时间戳,其它电表的数据就可能被漏掉。

    TDengine的订阅功能为上面这个问题提供了一个彻底的解决方案。

    首先是使用taos_subscribe创建订阅:

    1. TAOS_SUB* tsub = NULL;
    2. if (async) {
    3.   // create an asynchronized subscription, the callback function will be called every 1s
    4.   tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
    5. } else {
    6.   // create an synchronized subscription, need to call 'taos_consume' manually
    7.   tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
    8. }

    参数taos是一个已经建立好的数据库连接,在同步模式下无特殊要求。 但在异步模式下,需要注意它不会被其它线程使用,否则可能导致不可预计的错误, 因为回调函数在API的内部线程中被调用,而TDengine的部分API不是线程安全的。

    参数sql是查询语句,可以在其中使用where子句指定过滤条件。 在我们的例子中,如果只想订阅电流超过10A时的数据,可以这样写:

    1. select * from meters where current > 10;

    注意,这里没有指定起始时间,所以会读到所有时间的数据。 如果只想从一天前的数据开始订阅,而不需要更早的历史数据,可以再加上一个时间条件:

    订阅的topic实际上是它的名字,因为订阅功能是在客户端API中实现的, 所以没必要保证它全局唯一,但需要它在一台客户端机器上唯一。

    如果名topic的订阅不存在,参数restart没有意义; 但如果用户程序创建这个订阅后退出,当它再次启动并重新使用这个topic时, restart就会被用于决定是从头开始读取数据,还是接续上次的位置进行读取。 本例中,如果restarttrue(非零值),用户程序肯定会读到所有数据。 但如果这个订阅之前就存在了,并且已经读取了一部分数据, 且restartfalse0),用户程序就不会读到之前已经读取的数据了。

    taos_subscribe的最后一个参数是以毫秒为单位的轮询周期。 在同步模式下,如果前后两次调用taos_consume的时间间隔小于此时间, taos_consume会阻塞,直到间隔超过此时间。 异步模式下,这个时间是两次调用回调函数的最小时间间隔。

    taos_subscribe的倒数第二个参数用于用户程序向回调函数传递附加参数, 订阅API不对其做任何处理,只原样传递给回调函数。此参数在同步模式下无意义。

    订阅创建以后,就可以消费其数据了,同步模式下,示例代码是下面的 else 部分:

    1. if (async) {
    2.   getchar();
    3. } else while(1) {
    4.   TAOS_RES* res = taos_consume(tsub);
    5.   if (res == NULL) {
    6.     printf("failed to consume data.");
    7.     break;
    8.   } else {
    9.     print_result(res, blockFetch);
    10.     getchar();
    11.   }
    12. }

    这里是一个 while 循环,用户每按一次回车键就调用一次taos_consume, 而taos_consume的返回值是查询到的结果集,与taos_use_result完全相同, 例子中使用这个结果集的代码是函数print_result

    1. void print_result(TAOS_RES* res, int blockFetch) {
    2.   TAOS_ROW row = NULL;
    3.   int num_fields = taos_num_fields(res);
    4.   TAOS_FIELD* fields = taos_fetch_fields(res);
    5.   if (blockFetch) {
    6.     nRows = taos_fetch_block(res, &row);
    7.     for (int i = 0; i < nRows; i++) {
    8.       char temp[256];
    9.       taos_print_row(temp, row + i, fields, num_fields);
    10.       puts(temp);
    11.     }
    12.   } else {
    13.     while ((row = taos_fetch_row(res))) {
    14.       char temp[256];
    15.       taos_print_row(temp, row, fields, num_fields);puts(temp);
    16.     }
    17.   }
    18.   printf("%d rows consumed.\n", nRows);
    19. }

    其中的 taos_print_row 用于处理订阅到数据,在我们的例子中,它会打印出所有符合条件的记录。 而异步模式下,消费订阅到的数据则显得更为简单:

    1. void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
    2.   print_result(res, *(int*)param);
    3. }
    1. taos_unsubscribe(tsub, keep);

    其第二个参数,用于决定是否在客户端保留订阅的进度信息。 如果这个参数是false0),那无论下次调用taos_subscribe的时的restart参数是什么, 订阅都只能重新开始。 另外,进度信息的保存位置是 {DataDir}/subscribe/ 这个目录下, 每个订阅有一个与其topic同名的文件,删掉某个文件,同样会导致下次创建其对应的订阅时只能重新开始。

    代码介绍完毕,我们来看一下实际的运行效果。假设:

    • 示例代码已经下载到本地
    • TDengine 也已经在同一台机器上安装好
    • 示例所需的数据库、超级表、子表已经全部创建好

    则可以在示例代码所在目录执行以下命令来编译并启动示例程序:

    示例程序启动后,打开另一个终端窗口,启动 TDengine 的 shell 向 D1001 插入一条电流为 12A 的数据:

    1. $ taos
    2. > use test;
    3. > insert into D1001 values(now, 12, 220, 1);

    这时,因为电流超过了10A,您应该可以看到示例程序将它输出到了屏幕上。 您可以继续插入一些数据观察示例程序的输出。

    订阅功能也提供了 Java 开发接口,相关说明请见 。需要注意的是,目前 Java 接口没有提供异步订阅模式,但用户程序可以通过创建 TimerTask 等方式达到同样的效果。

    下面以一个示例程序介绍其具体使用方法。它所完成的功能与前面介绍的 C 语言示例基本相同,也是订阅数据库中所有电流超过 10A 的记录。

    准备数据

    1. # 创建 power 库
    2. taos> create database power;
    3. # 切换库
    4. taos> use power;
    5. # 创建超级表
    6. taos> create table meters(ts timestamp, current float, voltage int, phase int) tags(location binary(64), groupId int);
    7. # 创建表
    8. taos> create table d1001 using meters tags ("Beijing.Chaoyang", 2);
    9. taos> create table d1002 using meters tags ("Beijing.Haidian", 2);
    10. # 插入测试数据
    11. taos> insert into d1001 values("2020-08-15 12:00:00.000", 12, 220, 1),("2020-08-15 12:10:00.000", 12.3, 220, 2),("2020-08-15 12:20:00.000", 12.2, 220, 1);
    12. taos> insert into d1002 values("2020-08-15 12:00:00.000", 9.9, 220, 1),("2020-08-15 12:10:00.000", 10.3, 220, 1),("2020-08-15 12:20:00.000", 11.2, 220, 1);
    13. # 从超级表 meters 查询电流大于 10A 的记录
    14. taos> select * from meters where current > 10;
    15. ts | current | voltage | phase | location | groupid |
    16. ===========================================================================================================
    17. 2020-08-15 12:10:00.000 | 10.30000 | 220 | 1 | Beijing.Haidian | 2 |
    18. 2020-08-15 12:20:00.000 | 11.20000 | 220 | 1 | Beijing.Haidian | 2 |
    19. 2020-08-15 12:00:00.000 | 12.00000 | 220 | 1 | Beijing.Chaoyang | 2 |
    20. 2020-08-15 12:10:00.000 | 12.30000 | 220 | 2 | Beijing.Chaoyang | 2 |
    21. 2020-08-15 12:20:00.000 | 12.20000 | 220 | 1 | Beijing.Chaoyang | 2 |
    22. Query OK, 5 row(s) in set (0.004896s)

    示例程序

    1. public class SubscribeDemo {
    2. private static final String sql = "select * from meters where current > 10";
    3. public static void main(String[] args) {
    4. Connection connection = null;
    5. TSDBSubscribe subscribe = null;
    6. Class.forName("com.taosdata.jdbc.TSDBDriver");
    7. Properties properties = new Properties();
    8. properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
    9. properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
    10. String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/power?user=root&password=taosdata";
    11. connection = DriverManager.getConnection(jdbcUrl, properties);
    12. subscribe = ((TSDBConnection) connection).subscribe(topic, sql, true); // 创建订阅
    13. int count = 0;
    14. while (count < 10) {
    15. TimeUnit.SECONDS.sleep(1); // 等待1秒,避免频繁调用 consume,给服务端造成压力
    16. TSDBResultSet resultSet = subscribe.consume(); // 消费数据
    17. if (resultSet == null) {
    18. continue;
    19. }
    20. ResultSetMetaData metaData = resultSet.getMetaData();
    21. while (resultSet.next()) {
    22. int columnCount = metaData.getColumnCount();
    23. for (int i = 1; i <= columnCount; i++) {
    24. System.out.print(metaData.getColumnLabel(i) + ": " + resultSet.getString(i) + "\t");
    25. }
    26. System.out.println();
    27. count++;
    28. }
    29. }
    30. } catch (Exception e) {
    31. e.printStackTrace();
    32. } finally {
    33. try {
    34. if (null != subscribe)
    35. subscribe.close(true); // 关闭订阅
    36. if (connection != null)
    37. connection.close();
    38. } catch (SQLException throwables) {
    39. throwables.printStackTrace();
    40. }
    41. }
    42. }
    43. }

    运行示例程序,首先,它会消费符合查询条件的所有历史数据:

    1. # java -jar subscribe.jar
    2. ts: 1597464000000 current: 12.0 voltage: 220 phase: 1 location: Beijing.Chaoyang groupid : 2
    3. ts: 1597464600000 current: 12.3 voltage: 220 phase: 2 location: Beijing.Chaoyang groupid : 2
    4. ts: 1597465200000 current: 12.2 voltage: 220 phase: 1 location: Beijing.Chaoyang groupid : 2
    5. ts: 1597464600000 current: 10.3 voltage: 220 phase: 1 location: Beijing.Haidian groupid : 2

    接着,使用 taos 客户端向表中新增一条数据:

    因为这条数据的电流大于10A,示例程序会将其消费: