规则引擎说明

    名词说明:

    • RuleModel(规则模型):由多个RuleNode(规则节点),RuleLink(规则连线)组成
    • RuleNode(规则节点): 规则节点描述具体执行的逻辑
    • RuleLink(规则连线): 用于将多个节点连接起来,将上一个节点的输出结果作为下一个节点的输入结果.
    • Input(输入): 规则节点的数据输入
    • Output(输出): 规则节点的数据输出
    • Scheduler(调度器): 负责将模型转为任务(Job),并进行任务调度到Worker
    • Worker(工作器): 负责执行,维护任务.
    • ExecutionContext(执行上下文): 启动任务时的上下文,通过上下文获取输入输出配置信息等进行任务处理.
    • TaskExecutor(任务执行器): 具体执行任务逻辑的实现
    • TaskExecutorProvider(任务执行器提供商): 用于根据模型配置以及上下文创建任务执行器.
    • RuleData(规则数据): 任务执行过程中的数据实例
    1. 实现接口TaskExecutorProvider
    2. 注解@Component
    1. @Component
    2. @AllArgsConstructor
    3. public class SceneRuleTaskExecutorProvider implements TaskExecutorProvider {
    4. private final EventBus eventBus;
    5. @Override
    6. public String getExecutor() {
    7. return "scene";
    8. }
    9. @Override
    10. public Mono<TaskExecutor> createTask(ExecutionContext context) {
    11. return Mono.just(new DeviceSceneTaskExecutor(context));
    12. }
    13. class DeviceSceneTaskExecutor extends FunctionTaskExecutor {
    14. private String id;
    15. private String name;
    16. public DeviceSceneTaskExecutor(ExecutionContext context) {
    17. super("场景联动", context);
    18. reload();
    19. }
    20. @Override
    21. public void reload() {
    22. //从任务配置中获取配置
    23. this.id = (String) getContext().getJob().getConfiguration().get("id");
    24. this.name = (String) getContext().getJob().getConfiguration().get("name");
    25. @Override
    26. protected Publisher<RuleData> apply(RuleData input) {
    27. Map<String, Object> data = new HashMap<>();
    28. data.put("sceneId", id);
    29. data.put("sceneName", name);
    30. data.put("executeTime", System.currentTimeMillis());
    31. input.acceptMap(data::putAll);
    32. return eventBus
    33. .publish(String.join("/", "scene", id), data)
    34. //转换新的数据
    35. .thenReturn(context.newRuleData(input.newData(data)));
    36. }
    37. }
    38. }
    1. {
    2. executor: "timer",
    3. configuration:{"cron":"cron表达式"}
    4. }

    延迟(限流)执行

    1. {
    2. executor: "delay",
    3. configuration:{
    4. pauseType:"延迟类型: delayv(上游节点指定固定延迟),delay(固定延迟),random(随机延迟),rate(速率限制),group(分组速率限制)",
    5. //延迟类型为delay时,使用以下配置
    6. timeout:10,//延迟时间
    7. timeoutUnits: "延迟时间单位:Seconds(秒),Millis(毫秒)",
    8. //延迟类型为random时,使用以下配置
    9. randomFirst: 100, //最小延迟时间
    10. randomLast: 1000 //最大延迟时间
    11. randomUnits: "随机延迟时间单位:Seconds(秒),Millis(毫秒)",
    12. //延迟类型为rate或者group时使用以下配置
    13. rate:10,//速率,如: 10条
    14. nbRateUnits:10,//速率时间单位,如: 1秒
    15. rateUnits:"速率时间单位:Seconds(秒),Millis(毫秒)",
    16. //延迟类型为group时使用以下配置
    17. groupExpression:"deviceId" //分组表达式,表达式语言为jsonata
    18. }
    19. }

    脚本说明

    脚本使用jsr223引擎, 通过调用内置变量handler.onMessage注册消息监听函数,当上游产生数据时,此函数将被调用,并传入数据.

    例如:

    1. var ctx = context;
    2. handler.onMessage(function(ruleData){
    3. var data = ruleData.data; //上游节点的输出
    4. return { // 输出到下一个节点
    5. "key":"value"
    6. }
    7. });

    通过指定输出数量值,可以控制输出到指定的节点,如:

    1. var ctx = context;
    2. handler.onMessage(function(ruleData){
    3. return [
    4. {"to":"node1"}, //输出到第一个节点
    5. {"to":"node2"} //输出到第二个节点
    6. ];
    7. });

    你还可以通过上下文作用域保存,获取数据.

    1. handler.onMessage(function(ruleData){
    2. return ctx.node()
    3. .counter()
    4. .inc(data.value) // 获取当前节点的计数器并递增
    5. .map(function(i){
    6. return {
    7. "total":i;
    8. }
    9. })

    作用域

    • ctx.scope(String id)或者ctx.scope(RuleData ruleData)上下文作用域,根据ruleData.contextId决定.
    • ctx.node()当前节点作用域
    • ctx.node(String id)指定节点作用域
    • ctx.flow()当前流程作用域
    • ctx.flow(String id)指定流程作用域
    • ctx.flow(String id).node(String id)指定流程指定节点的作用域
    • ctx.global()全局作用域

    作用域支持方法:

    • .all(String… key)获取指定key的数据,如果未指定这返回全部,类型为Mono<Map<String,Object>>
    • .get(String key)获取指定key的数据,返回类型为Mono<Object>
    • .put(String key,Object value)设置值,返回类型为Mono<Void>
    • .putAll(Map<String,Object>)设置多个值,参数为Map,返回类型为Mono<Void>
    • .clear()清空作用域,返回类型为Mono<Void>
    • .counter()获取计数器
    • .counter(String name)获取指定名字的计数器
    • .counter().inc(double number)计数器递增,返回最新值:Mono<Double>
    • .counter().dec(double number)计数器递减,返回最新值:Mono<Double>
    • .counter().getAndSet(double number)获取最新值后设置新的值,返回:Mono<Double>
    • .counter().setAndGet(double number)设置最新值后返回最新的值,返回:Mono<Double>

    特别注意

    作用域的返回值均是reactor的API ,注意将操作组合成一个流后返回,如:

    日志输出和错误处理

    ctx.getLogger().debug(“Log message {}”,data); ctx.getLogger().warn(“Warning”); ctx.getLogger().error(“Error”); 使用以下功能触发错误:

    throw new Error(“错误”); throw new java.lang.RuntimeException(“错误”);

    ReactorQL

    1. {
    2. executor:"reactor-ql",
    3. configuration:{
    4. "sql":"ReactorQL语句"
    5. }
    6. }

    说明

    通过ReactorQL可以订阅设备消息等消息,还可以进行分组聚合计算等操作. 见:

    1. {
    2. executor:"device-message-sender",
    3. configuration:{
    4. "productId":"产品ID",
    5. "deviceId":"设备ID,为空时发送到产品下所有设备",
    6. "selector":"设备选择器",//见设备选择器说明
    7. "from":"消息来源:pre-node(上游节点),fixed(固定消息)",
    8. "timeout":"10s",//超时时间
    9. "message":{ //设备指令内容
    10. "messageType":"消息类型"
    11. },
    12. "waitType":"等待类型:sync(等待设备回复),forget(忽略返回结果)"
    13. }
    14. }

    说明

    设备指令内容见:平台统一设备消息定义

    设备选择器说明

    如果下发指令的设备是动态获取的,可使用表达式函数来获取设备并发送到对应到设备。

    例如:

    • 获取分组为demo-group下的设备:in_group('demo-group')

    • 获取当前设备相同分组下的设备:same_group(deviceId)

    • 获取标签supplier为测试厂商下的设备:tag('supplier','测试厂商')

    • 按状态筛选 :state('online'),状态:online,offline,notActive

    • 函数的参数可以是固定的字符串,如:product('demo-device'),也可以是上游节点传递的变量,如: same_group(deviceId)

    • 多个表达式使用,分隔,例如:same_group(deviceId),tag('supplier','测试厂商')

    消息通知

    1. {
    2. executor:"notifier",
    3. configuration:{
    4. "notifyType":"通知类型:sms(短信),email(邮件),voice(语音),dingTalk(钉钉),weixin(微信);",
    5. "notifierId":"通知配置ID",
    6. "templateId":"模版ID"
    7. }

    配置:

    节点输入

    { “url”:”如果为null则使用节点配置中的值”, “method”:”如果为null则使用节点配置中的值”, “contentType”:”application/json”, “headers”:{}, “queryParameters”:{},//拼接到url上的参数 “payload”:{}//post请求时的请求体 }