最佳实践

    设备: 具体的某一个设备.

    网络组件: 用于管理各种网络服务(MQTT,TCP等),动态配置,启停. 只负责接收,发送报文,不负责任何处理逻辑。

    协议: 用于自定义消息解析规则,用于认证、将设备发送给平台报文解析为平台统一的报文,以及处理平台下发给设备的指令。

    设备网关: 负责平台侧统一的设备接入,使用网络组件处理对应的请求以及报文,使用配置的协议解析为平台统一的设备消息(),然后推送到事件总线。

    事件总线: 基于topic和事件驱动,负责进程内的数据转发.(设备告警,规则引擎处理设备数据都是使用事件总线订阅设备消息)。

    平台使用自定义的协议包将设备上报的报文解析为平台统一的消息,来进行统一管理。

    平台统一消息基本于物模型中的定义相同,主要由属性(property),功能(function),事件(event)组成.

    消息主要由deviceId,messageId,headers,timestamp组成.

    deviceId为设备的唯一标识,messageId为消息的唯一标识,headers为消息头,通常用于对自定义消息处理的行为,如是否异步消息, 是否分片消息等.

    常用的Headers (opens new window):

    1. async 是否异步,boolean类型.
    2. timeout 指定超时时间. 毫秒.
    3. frag_msg_id 分片主消息ID,为下发消息的messageId
    4. frag_num 分片总数
    5. frag_part 当前分片索引
    6. frag_last 是否为最后一个分片,当无法确定分片数量的时候,可以将分片设置到足够大,最后一个分片设置:frag_last=true来完成返回.
    7. keepOnlineDeviceOnlineMessage配合使用,在TCP短链接,保持设备一直在线状态,连接断开不会设置设备离线.
    8. keepOnlineTimeoutSeconds 指定在线超时时间,在短链接时,如果超过此间隔没有收到消息则认为设备离线.
    9. ignoreStorage 不存储此消息数据,如: 读写属性回复默认也会记录到属性时序数据库中,设置为true后,将不记录.(1.9版本后支持)
    10. ignoreLog 不记录此消息到日志,如: 设置为true,将不记录此消息的日志.
    11. mergeLatest 是否合并最新属性数据,设置此消息头后,将会把最新的消息合并到消息体里(需要).

    TIP

    messageId通常由平台自动生成,如果设备不支持消息id,可在自定义协议中通过Map的方式来做映射,将设备返回的消息与平台的messageId进行绑定.

    属性相关消息

    1. 获取设备属性(ReadPropertyMessage)对应设备回复的消息ReadPropertyMessageReply.
    2. 修改设备属性(WritePropertyMessage)对应设备回复的消息WritePropertyMessageReply.
    3. 设备上报属性(ReportPropertyMessage) 由设备上报.

    注意

    设备回复的消息是通过messageId进行绑定,messageId应该注意要全局唯一,如果设备无法做到,可以在编解码时通过添加前缀等方式实现.

    消息定义:

    1. WritePropertyMessage{
    2. Map<String,Object> headers;
    3. String deviceId;
    4. String messageId;
    5. long timestamp; //时间戳(毫秒)
    6. Map<String,Object> properties;
    7. }
    8. WritePropertyMessageReply{
    9. Map<String,Object> headers;
    10. String deviceId;
    11. String messageId;
    12. long timestamp; //时间戳(毫秒)
    13. boolean success;
    14. Map<String,Object> properties; //回复被修改的属性最新值
    15. }
    1. ReportPropertyMessage{
    2. Map<String,Object> headers;
    3. String deviceId;
    4. String messageId; //可为空
    5. long timestamp; //时间戳(毫秒)
    6. Map<String,Object> properties;
    7. }

    功能相关消息

    调用设备功能到消息(FunctionInvokeMessage)由平台发往设备,对应到返回消息FunctionInvokeMessageReply.

    消息定义:

    1. FunctionInvokeMessage{
    2. Map<String,Object> headers;
    3. String functionId;//功能标识,在元数据中定义.
    4. String deviceId;
    5. String messageId;
    6. long timestamp; //时间戳(毫秒)
    7. List<FunctionParameter> inputs;//输入参数
    8. }
    9. FunctionParameter{
    10. String name;
    11. Object value;
    12. }
    13. FunctionInvokeMessageReply{
    14. Map<String,Object> headers;
    15. String deviceId;
    16. String messageId;
    17. long timestamp;
    18. boolean success;
    19. Object output; //输出值,需要与元数据定义中的类型一致
    20. }

    事件消息

    事件消息EventMessage由设备端发往平台.

    消息定义:

    1. EventMessage{
    2. Map<String,Object> headers;
    3. String event; //事件标识,在元数据中定义
    4. Object data; //与元数据中定义的类型一致,如果是对象类型,请转为java.util.HashMap,禁止使用自定义类型.
    5. long timestamp; //时间戳(毫秒)
    6. }

    其他消息

    1. DeviceOnlineMessage 设备上线消息,通常用于网关代理的子设备的上线操作.
    2. DeviceOfflineMessage 设备离线消息,通常用于网关代理的子设备的下线操作.
    3. ChildDeviceMessage 子设备消息,通常用于网关代理的子设备的消息.
    4. ChildDeviceMessageReply 子设备消息回复,用于平台向网关代理的子设备发送消息后设备回复给平台的结果.
    5. UpdateTagMessage更新设备标签.
    6. DerivedMetadataMessage 更新设备独立物模型.
    7. DeviceRegisterMessage 设备注册消息,通过设置消息头message.addHeader("deviceName","设备名称");message.addHeader("productId","产品ID")可实现设备自动注册

    消息定义:

    1. ChildDeviceMessage{
    2. String deviceId;
    3. String childDeviceId;
    4. Message childDeviceMessage; //子设备消息
    5. }

    父子设备消息处理请看这里

    协议包将设备上报后的报文解析为平台统一的设备消息后,会将消息转换为对应的topic 并发送到事件总线,可以通过从事件总线订阅消息来处理这些消息。

    所有设备消息的topic的前缀均为: /device/{productId}/{deviceId}.

    如:产品product-1下的设备device-1上线消息: /device/product-1/device-1/online.

    可通过通配符订阅所有设备的指定消息,如:/device/*/*/online,或者订阅所有消息:/device/**.

    TIP

    1. 此topic和mqtt的topic没有任何关系,仅仅作为系统内部通知的方式
    2. 使用通配符订阅可能将收到大量的消息,请保证消息的处理速度,否则会影响系统消息吞吐量.

    注意

    列表中的topic已省略前缀/device/{productId}/{deviceId},使用时请加上.

    在1.6版本后,支持订阅设备分组和租户下的设备消息了.

    • 租户topic前缀: /tenant/{tenantId}
    • 成员topic前缀: /user/{userId}

    flow

    设备接入的核心是协议包,无论是直连设备,或者是云云对接,理论上都可以在自定义协议包里进行处理。

    物模型定义

    在接入一个设备时,首先根据设备以及设备接入文档(报文说明), 将设备物模型的属性,功能以及事件设计好。

    通常情况下:

    对于设备固有不变的信息,建议使用设备标签进行管理.

    属性用于定义一些指标数据,如:电压,温度等. 属性都应该是简单的数据类型,如:int,float,string等,避免使用结构体等复杂类型.

    功能用于定义设备具有的一些可执行动作,例如: 消音,关灯,云台控制.根据情况设计好输入参数和输出参数.

    事件用于定义设备在特定条件时,发生的动作,如:火警,检测到人脸,通常为结构体类型,用于保存比较复杂的数据.

    注意

    在设计物模型时,尽量屏蔽掉非必要参数,让物模型简单化,通用化.

    协议包开发

    建议使用策略模式来定义功能码,以及不同功能码对应的解析规则。如: 使用枚举来定义功能码.

    避免进行array copy,应该使用偏移量直接处理报文.

    常见场景算法实践:

    1. 使用枚举来处理不同类型的数据

    伪代码如下:

    1. @AllArgsConstructor
    2. @Getter
    3. public enum UpstreamCommand {
    4. //注册命令
    5. register((byte) 0x00, RegisterMessage::decode),
    6. //数据命令
    7. byte code;
    8. BiFunction<byte[], Integer, ? extends MyDeviceMessage> decoder;
    9. public static UpstreamCommand of(byte code) {
    10. for (UpstreamCommand value : values()) {
    11. if (value.code == code) {
    12. return value;
    13. }
    14. }
    15. throw new UnsupportedOperationException("不支持的命令:0x" + Hex.encodeHexString(new byte[]{data}));
    16. }
    17. //调用此方法解码数据
    18. public static MyDeviceMessage decode(byte[] body) {
    19. //业务流水号
    20. int msgId = BytesUtils.beToInt(body, 0, 2); //2字节小端转int
    21. //命令
    22. UpstreamCommand command = of(body[4]);
    23. return command.getDecoder().apply(body, 5);
    24. }
    25. }
    1. 使用二进制位来表示状态,0表示正常,1表示异常:

    使用枚举定义二进制位表示的含义,使用位运算来判断对应数据是哪一位

    1. @AllArgsConstructor
    2. @Getter
    3. public enum DataType{
    4. OK(-9999,"正常"),
    5. Bit0(0,"测试"),
    6. Bit1(1,"火警"),
    7. Bit2(2,"故障")
    8. //....
    9. ;
    10. private int bit;
    11. private String text;
    12. public static DataType of(long value) {
    13. if (value == 0) {
    14. return OK;
    15. }
    16. for (DataType type : values()) {
    17. if ((value & (1 << type.bit)) != 0) {
    18. //数据位支持多选的话,装到集合里即可
    19. return partState;
    20. }
    21. }
    22. throw new UnsupportedOperationException("不支持的状态:" + value);
    23. }
    24. }

    注意

    应该将对报文处理的类封装为独立的类,然后在开发过程中,使用单元测试验证处理是否正确。 避免直接在DeviceMessageCodec里编写处理逻辑。

    从1.5.0开始,企业版支持在产品中配置数据存储策略, 不同的策略使用不同的数据存储方式来保存设备数据.

    默认-行式存储

    这是系统默认情况下使用的存储方案,使用elasticsearch存储设备数据. 每一个属性值都保存为一条索引记录.典型应用场景: 设备每次只会上报一部分属性, 以及支持读取部分属性数据的时候.

    优点: 灵活,几乎满足任意场景下的属性数据存储.

    缺点: 设备属性个数较多时,数据量指数增长,可能性能较低.

    警告

    1. 在确定好存储方案后,尽量不要跨类型进行修改,如将: 行式存储修改为列式存储,可能会导致数据结构错乱.
    2. 在创建物模型时,请根据存储策略判断好是否支持此类型以及ID格式

    索引结构:

    1. 属性索引模版: properties_{productId}_template

    mappings:

    1. {
    2. "properties" : {
    3. "geoValue" : { // 地理位置值,物模型配置的类型为地理位置时,此字段有值
    4. "type" : "geo_point"
    5. },
    6. "productId" : { // 产品ID
    7. "ignore_above" : 512,
    8. "type" : "keyword"
    9. },
    10. "objectValue" : { // 结构体值,物模型配置的类型为结构体时,此字段有值
    11. "type" : "nested"
    12. },
    13. "type" : { // 物模型中配置的属性类型
    14. "ignore_above" : 512,
    15. "type" : "keyword"
    16. },
    17. "timeValue" : { //时间类型的值
    18. "format" : "epoch_millis||strict_date_hour_minute_second||strict_date_time||strict_date",
    19. "type" : "date"
    20. },
    21. "deviceId" : { //设备ID
    22. "ignore_above" : 512,
    23. "type" : "keyword"
    24. },
    25. "createTime" : { //平台记录数据的时间
    26. "format" : "epoch_millis||strict_date_hour_minute_second||strict_date_time||strict_date",
    27. "type" : "date"
    28. },
    29. "property" : { //属性ID,与物模型属性ID一致
    30. "ignore_above" : 512,
    31. "type" : "keyword"
    32. },
    33. "numberValue" : { //物模型中属性类型为数字相关类型时,此字段有值
    34. "type" : "double"
    35. },
    36. "id" : {
    37. "ignore_above" : 512,
    38. "type" : "keyword"
    39. },
    40. "value" : { //通用值,所有类型都转为字符串
    41. "type" : "keyword"
    42. },
    43. "timestamp" : { //设备消息中的时间戳
    44. "format" : "epoch_millis||strict_date_hour_minute_second||strict_date_time||strict_date",
    45. "type" : "date"
    46. }
    47. }
    48. }
    1. 事件索引模版: event_{productId}_{eventId}_template

    mappings:

    1. 设备日志索引模版: device_log_{productId}_template

    mappings:

    1. {
    2. "properties" : {
    3. "productId" : { //产品ID
    4. "ignore_above" : 512,
    5. "type" : "keyword"
    6. },
    7. "createTime" : { //创建时间
    8. "format" : "epoch_millis||strict_date_hour_minute_second||strict_date_time||strict_date",
    9. "type" : "date"
    10. },
    11. "id" : {
    12. "ignore_above" : 512,
    13. "type" : "keyword"
    14. "type" : { //日志类型
    15. "ignore_above" : 512,
    16. "type" : "keyword"
    17. },
    18. "deviceId" : { //设备ID
    19. "ignore_above" : 512,
    20. "type" : "keyword"
    21. },
    22. "content" : { //日志内容
    23. "ignore_above" : 512,
    24. "type" : "keyword"
    25. },
    26. "timestamp" : { //设备消息中的时间戳
    27. "format" : "epoch_millis||strict_date_hour_minute_second||strict_date_time||strict_date",
    28. "type" : "date"
    29. }
    30. }
    31. }

    默认-列式存储

    使用elasticsearch存储设备数据. 一个属性作为一列,一条属性消息作为一条索引记录进行存储,适合设备每次都上报所有的属性值的场景.

    优点: 在属性个数较多,并且设备每次都会上报全部属性时,性能更高

    缺点: 设备必须上报全部属性.

    默认情况下:读取属性,修改属性指令,都必须返回全部属性数据. 如果无法返回全部数据,可以在消息中设置头partialProperties,如: message.addHeader("partialProperties",true), 用来标记是返回的部分数据,平台在保存数据前,将获取之前保存的其他属性然后一起保存.(使用此功能将降低系统的吞吐量)

    索引结构:

    1. 属性索引模版: properties_{productId}_template

    mappings:

    1. {
    2. "properties" : {
    3. "productId" : { // 产品ID
    4. "ignore_above" : 512,
    5. "type" : "keyword"
    6. },
    7. "deviceId" : { //设备ID
    8. "ignore_above" : 512,
    9. "type" : "keyword"
    10. },
    11. "createTime" : { //平台记录数据的时间
    12. "format" : "epoch_millis||strict_date_hour_minute_second||strict_date_time||strict_date",
    13. "type" : "date"
    14. },
    15. "id" : {
    16. "ignore_above" : 512,
    17. "type" : "keyword"
    18. },
    19. //根据物模型中配置的属性,一个属性对应一列
    20. //....
    21. "timestamp" : { //设备消息中的时间戳
    22. "format" : "epoch_millis||strict_date_hour_minute_second||strict_date_time||strict_date",
    23. "type" : "date"
    24. }
    25. }
    26. }
    1. 事件和日志与默认-行式存储一致

    默认-行式存储行为一致,使用influxdb进行数据存储.

    measurement说明:

    1. 属性: properties_{productId}
    2. 事件: event_{productId}_{eventId}
    3. 日志: device_log_{productId}

    InfluxDB-列式存储

    默认-列式存储行为一致,使用influxdb进行数据存储

    measurement说明:

    1. 属性: properties_{productId}
    2. 事件: event_{productId}_{eventId}
    3. 日志: device_log_{productId}

    TDEngine-列式存储

    默认-列式存储行为一致,使用TDEngine进行数据存储

    超级表说明:

    1. 属性: properties_{productId}
    2. 事件: event_{productId}_{eventId}
    3. 日志: device_log_{productId}

    ClickHouse-行式存储

    默认-行式存储行为一致,使用ClickHouse进行数据存储.

    说明

    ClickHouse不支持大量并发查询,请根据实际情况选择.

    Cassandra-行式存储

    默认-行式存储行为一致,使用Cassandra进行数据存储.

    分区说明:

    1. 设备属性: (deviceId,property,partition)
    2. 设备事件: (deviceId,partition)
    3. 设备日志: (deviceId,partition)

    partition规则:

    通过jetlinks.device.storage.cassandra.partition-interval=MONTHS进行配置, 支持:MINUTES,HOURS,DAYS,WEEKS,MONTHS,YEARS,FOREVER.

    注意

    1. 在查询条件没有设置时间的时候,默认只会查询最近2个分区周期内的数据。 前端在查询时,建议默认指定时间范围,并且时间间隔不能与配置的分区周期相差太多,否则会影响查询性能.
    2. Cassandra仅支持使用滚动分页,跳转分页深度过大会降低查询速度.执行count的性能也较低,建议查询时,使用.

    在后台代码中实现DeviceDataStoragePolicy接口,然后注入到Spring即可。

    记录设备最新数据到数据库

    1.5.0企业版本中增加了记录最新设备数据到数据库中,以便进行更丰富的统计查询等操作.

    通过配置文件开启:

    1. jetlinks:
    2. device:
    3. storage:
    4. enable-last-data-in-db: true #是否将设备最新到数据存储到数据库

    注意

    开启后,设备的属性,事件属性的最新值将记录到表dev_lsg_{productId}中. 主键id为设备ID. 开启此功能可能降低系统的吞吐量.

    物模型与数据库表字段说明

    1. 属性id即字段
    2. 当事件类型为结构体类型时, 字段为:{eventId}_{结构体属性ID},否则为:{eventId}_value
    3. 在发布产品时,会自动创建或者修改表结构.

    数据类型说明

    1. 数字类型统一为number(32,物模型中配置的精度)类型
    2. 时间类型为timestamp
    3. 结构体(object)和array为Clob
    4. 设置了字符长度expands.maxLength大于2048时,为Clob
    5. 其他为varchar

    提示

    请勿频繁修改物模型类型,可能导致数据格式错误.

    根据最新的设备数据查询设备数量等操作

    在任何设备相关等动态查询条件中.使用自定义条件dev-latest来进行查询.例如:

    1. {
    2. "terms":[
    3. {
    4. "column":"id$dev-latest$demo-device"
    5. ,"value":"temp1 > 10" //查询表达式
    6. }
    7. ]

    id$dev-latest$demo-device说明: id: 表示使用查询主表的id作为设备id dev-latest: 表示使用自定义条件dev-latest,此处为固定值 demo-device: 产品ID

    temp1 > 10说明:

    temp1最新值大于10. 条件支持=,>,<,>=,<=,in,btw,isnull等通用条件,支持嵌套查询.如: temp1 > 10 and (temp2 >30 or state = 1)

    说明