ReactorQL

    1. 处理实时数据
    2. 聚合计算实时数据
    3. 跨数据源联合数据处理

    TIP

    聚合处理实时数据时,必须使用interval函数或者_window函数.

    当温度大于40度时,将数据转发到下一步.

    处理指定多个型号的设备数据

    1. select * from (
    2. select
    3. this.properties.temperature temperature,
    4. this.deviceId deviceId
    5. from
    6. "/device/T0001/*/message/property/**" -- 订阅T0001型号下的所有设备消息
    7. where this.properties.temperature > 40
    8. union all -- 实时数据只能使用 union all
    9. select
    10. this.properties.temperature temperature,
    11. this.deviceId deviceId
    12. from
    13. "/device/T0002/*/message/property/**" -- 订阅T0002型号下的所有设备消息
    14. where this.properties.temperature > 42
    15. )

    计算每5分钟的温度平均值,当平均温度大于40度时,将数据转发到下一步.

    1. select
    2. avg(this.properties.temperature) temperature
    3. from
    4. "/device/*/*/message/property/**" -- 订阅所有设备的所有属性消息
    5. group by interval('5m')
    6. having temperature > 40 --having 必须使用别名.

    计算每10条数据为一个窗口,每2条数据滚动的平均值.

    1. [1,2,3,4,5,6,7,8,9,10] 第一组
    2. [3,4,5,6,7,8,9,10,11,12] 第二组
    3. [5,6,7,8,9,10,11,12,13,14] 第三组
    1. select
    2. avg(this.properties.temperature) temperature
    3. from
    4. "/device/*/*/message/property/**" -- 订阅所有设备的所有属性消息
    5. group by _window(10,2)
    6. having temperature > 40 --having 必须使用别名.

    聚合统计平均值,并且提取聚合结果中的数据.

    1. select
    2. rows_to_array(idList) deviceIdList, --将[{deviceId:1},{deviceId:2}] 转为[1,2]
    3. avgTemp,
    4. from
    5. (
    6. select
    7. collect_list((select this.deviceId deviceId)) idList, --聚合结果里的
    8. avg(temperature) avgTemp ,
    9. from "/device/*/*/message/property/**" ,
    10. group by interval('1m') having avgTemp > 40
    11. )

    5分钟之内只取第一次。

    1. from "/device/*/*/message/event/fire_alarm"
    2. _window('5m'),take(1) -- -1为取最后一次

    限流: 10秒内超过2次则获取最后一条数据

    SQL中的this表示主表当前的数据,如果存在嵌套属性的时候,必须指定this或者以表别名开头. 如: this.properties.temperature ,写成: properties.temperature是无法获取到值到.

    TIP

    以下功能只在企业版中支持

    获取设备已保存的全部最新属性,(注意: 由于使用es存储设备数据,此数据并不是完全实时的)

    1. device.properties(this.deviceId) props,
    2. this.properties reports
    3. from "/device/*/*/message/property/report"

    TIP

    device.properties(this.deviceId,'property1','property2')还可以通过参数获取指定的属性,如果未设置则获取全部属性。

    device.properties.history

    查询设备历史数据

    1. --聚合查询
    2. select * from device.properties.history(
    3. select avg(temperature) avgVal
    4. from "deviceId" -- from 支持: 按设备ID查询: "deviceId", 查询多个设备: device('1','2') 按产品查询: product('id')
    5. where timestamp between now()-86400000 and now()
    6. )
    1. --按时间分组
    2. select * from device.properties.history(
    3. select avg(temperature) avgVal
    4. from "deviceId"
    5. where timestamp between now()-86400000 and now()
    6. group by interval('1d')
    7. )
    1. -- 订阅实时数据,然后查询对应设备的历史数据
    2. select
    3. (
    4. select maxVal,avgVal from
    5. device.properties.history(
    6. select
    7. max(temp3) maxVal,
    8. avg(temp3) avgVal
    9. from device(t.deviceId)
    10. -- 前一天的数据
    11. where timestamp between time('now-1d') and t.timestamp
    12. )
    13. ) $this,
    14. t.properties.temp3 temp3
    15. from "/device/*/*/message/property/**" t

    device.properties.latest

    TIP

    此功能需要开启设备最新数据存储

    1. select * from device.properties.latest(
    2. select
    3. temperature
    4. from "productId" --表名为产品ID
    5. where id = 'deviceId' -- id则为设备ID
    6. )

    聚合查询

    1. select * from device.properties.latest(
    2. select
    3. avg(temperature) temperature
    4. from "productId" --表名为产品ID
    5. )

    获取设备标签信息

    TIP

    device.tags(this.deviceId,'tag1','tag2')还可以通过参数获取指定的标签,如果未设置则获取全部标签。

    device.selector

    选择设备,如:

    1. select dev.deviceId from "/device/*/*/message/property/report" t
    2. -- 获取和上报属性在同一个分组里,并且产品idlight-product的设备
    3. left join (
    4. select this.id deviceId from
    5. device.selector(same_group(t.deviceId),product('light-product'))
    6. ) dev

    支持参数:

    1. in_gourp(‘groupId’) 在指定的设备分组中
    2. in_group_tree(‘groupId’) 在指定分组中(包含下级分组)
    3. same_group(‘deviceId’) 在指定设备的相同分组中
    4. product(‘productId’) 指定产品ID对应的设备
    5. tag(‘tag1Key’,’tag1Value’,’tag2Key’,’tag2Value’) 按指定的标签获取
    6. state(‘online’) 按指定的状态获取
    7. in_tenant(‘租户ID’) 在指定租户中的设备
    8. org(‘机构ID’) 在指定机构中

    mqtt.client.publish

    推送消息到mqtt客户端.

    1. select
    2. mqtt.client.publish(
    3. 'networkId' -- 第一个参数: 网络组件中mqtt客户端的ID
    4. ,'topic' -- 第二个参数: topic
    5. ,this -- 消息体,会根据消息类型转为不同格式的消息
    6. ) publishSuccess -- 返回推送结果 true false
    7. from "/rule-engine/device/alarm/sensor-1/**"

    从mqtt客户端订阅消息

    1. select
    2. t.did deviceId,
    3. t.l location,
    4. t.v value
    5. from mqtt.client.subscribe(
    6. 'networkId' -- 第一个参数: 网络组件中mqtt客户端的ID
    7. ,'JSON' -- 第二个参数: 消息类型: JSON,STRING,BINARY,HEX
    8. ,'topic' -- topic
    9. ,'topic2' -- topic2
    10. ) t
    11. where t.v > 30 -- 过滤条件

    http.request

    发起http请求

    1. select
    2. http.request(
    3. 'networkId' -- 第一个参数: 网络组件中http客户端的ID
    4. -- 下面的参数两两对应组成键值对,注意: 使用逗号(,)分割.
    5. ,'url','https://www.baidu.com'
    6. ,'method','POST'
    7. ,'contentType','application/json'
    8. -- 请求头
    9. ,'headers',new_map('key1','value1','key2','value2')
    10. -- body参数在contentTypeapplication/json时生效
    11. ,'body',new_map('key1','value1','key2','value2')
    12. -- requestParam参数在contentType不为json时生效,相当于:application/x-www-form-urlencoded的处理方式
    13. ,'requestParam',new_map('key1','value1','key2','value2')
    14. -- 直接拼接到url上的参数 https://www.baidu.com?key1=value1&key2=value2
    15. ,'queryParameters',new_map('key1','value1','key2','value2')
    16. ) response
    17. from dual

    message.subscribe

    1. select
    2. t.topic topic,
    3. t.message.deviceId deviceId,
    4. t.message.headers.productId productId,
    5. t.message.timestamp ts
    6. from message.subscribe(
    7. 'false' -- 是否订阅来自集群的消息(可选参数,默认为false)
    8. ,'/device/*/*/online'
    9. ) t

    推送消息到消息网关

    1. select
    2. message.publish(
    3. '/device-online/'||t.message.deviceId -- 推送到此topic
    4. ,t.message -- 消息内容
    5. ) subscribeNumber -- 返回有多少订阅者收到了消息