利用 Function 函数计算服务进行消息处理

    • 本文测试所用设备系统为 Ubuntu 18.04
    • 本文测试使用的函数运行时是 Python3
    • 模拟 MQTT client 行为的客户端为 MQTTBox提示:Darwin 系统可以通过源码安装 Baetyl,可参考 。

    与基于 Hub 服务实现设备间消息转发不同的是,本文主要介绍利用本地函数计算服务进行消息处理。其中 Hub 服务用于建立 Baetyl 与 MQTT 客户端之间的连接,Python 运行时服务用于处理 MQTT 消息,而本地函数计算服务则通过 MQTT 消息上下文衔接本地 Hub 服务与 Python 运行时服务。

    本文将以 TCP 连接方式为例,展示本地函数计算服务的消息处理、计算功能。

    • 步骤一:安装 Baetyl,并导入示例配置包。参考 快速安装 Baetyl 进行操作;
    • 步骤三:通过 MQTTBox 以 TCP 方式与 Baetyl Hub 服务 ;
      • 若成功与 Hub 服务建立连接,则依据配置的主题权限信息向有权限的主题发布消息,同时向拥有订阅权限的主题订阅消息,并观察 Baetyl 日志信息;
        • 若 Baetyl 日志显示已经启动 Python 运行时服务,则表明发布的消息受到了预期的函数处理;
        • 若 Baetyl 日志显示未成功启动 Python 运行时服务,则重复上述步骤,直至看到 Baetyl 主程序成功启动了 Python 运行时服务。
    • 步骤四:通过 MQTTBox 查看对应主题消息的收发状态。基于本地函数计算服务实现设备消息处理流程

    消息处理测试

    依据 步骤一 导入示例配置包后,确认一下应用配置、 Hub 服务配置以及函数计算服务配置。

    将 Baetyl 应用配置改成如下配置:

    Baetyl Hub 服务配置改成如下配置:

    Baetyl 本地函数计算服务相关配置无需修改,具体配置如下:

    Python 函数代码无需修。/usr/local/var/db/baetyl/function-sayhi-code/index.py 实现如下:

    如上配置,假若 MQTTBox 基于上述配置信息已与 Hub 服务建立连接,向 Hub 发送主题为 t 的消息,函数计算服务会降消息分别路由给 python27-sayhipython36-sayhinode85-sayhisql-filter 函数处理,并分别输出主题为 t/py2hi、、t/node8hit/sqlfilter 的消息。这时订阅主题 # 的 MQTT client 将会接收到这这些消息,以及原消息 t 和 Hub 服务直接转主题的消息 t/topic

    提示:凡是在 rules 消息路由配置项中出现、用到的函数,必须在 functions 配置项中进行函数实例的配置,否则无法正常启动函数运行时实例。

    依据 步骤二,执行 sudo systemctl start baetyl 以容器模式启动 Baetyl,如果 Baetyl 已经启动,执行 sudo systemctl restart baetyl 来重启。

    查看 Baetyl 主程序的日志,执行 sudo tail -f -n 40 /usr/local/var/log/baetyl/baetyl.log 显示如下:

    ../_images/function-start-log.pngBaetyl 加载、启动日志

    同样,我们也可以通过执行命令 查看系统当前正在运行的 docker 容器列表,具体如下图示。

    通过 docker ps 命令查看系统当前运行 docker 容器列表

    本次测试中,我们采用 TCP 连接方式对 MQTTBox 进行连接信息配置,然后点击 Add subscriber 按钮订阅主题 # ,该主题用于接收所有 Hub 服务收到的消息。

    通过查看 /usr/local/var/db/baetyl/function-sayhi-code/index.py 代码文件可以发现,在接收到某字典类格式的消息后,函数 handler 会对其进行一系列处理,然后将处理结果返回。返回的结果中包括各种追加的上下文信息,比如 messageTopicfunctionName 等。

    这里,我们通过 MQTTBox 将消息 {"id":1} 发布给主题 t ,然后观察 MQTTBox 接收到的消息如下。

    ../_images/mqttbox-tcp-process-success.pngMQTTBox 接收消息

    发送消息后,我们快速执行命令 docker ps 查看系统当前正在运行的容器列表,所有函数运行时服务实例都被启动了,其结果如下图示。

    通过 docker ps 命令查看系统当前正在运行的容器列表

    综上,我们通过 Hub 服务和函数计算服务模拟了消息在本地处理的过程,可以看出该框架非常适合用于边缘处理消息流。