基于 Paho 集成安全 Agent 最佳实践

前置条件

(1)确认现有设备通过 Paho 实现接入阿里云物联网平台:

假设厂商现有设备当中,是借鉴了阿里云物联网平台 Paho 客户端最佳实践Paho 客户端最佳实践而接入的。

设备端开发环境中存在开源 Paho 库以及接入物联网平台的 Paho 客户端 Demo 包。Demo 程序主要包括以下一些文件:

文件名

说明

CMakeLists.txt

工程的整体配置文件。

src/samples/MQTTAsync_publish.c

该文件包含设备与物联网平台连接和通信的逻辑代码。

src/samples/aliot_mqtt_sign.c

该文件中的代码用于生成 MQTT 建连参数。应用程序调用该文件中定义的aiotMqttSign() 函数,计算出连接参数 username、password 和 clientId。

src/samples/CMakeLists.txt

Demo 的工程配置文件。

首先构建出 Paho 的库文件,可参考上面的最佳实践进行。

然后通过 cmake 可以构建出供设备连接物联网平台的程序示例。本文档后续将基于这个示例展示如果通过改造上述 MQTTAsync_publish.c 源码进一步集成 IoT 安全 Agent。

(2)需要安装 IoT 安全运营中心 Agent

在调试设备上,下载并安装 IoT 安全运营中心 Agent,请参考安装 Agent文档完成安装。

安装好 IoT 安全运营中心 Agent 之后,请确认调试设备的 /usr/<lib/lib64>/ (CPU 架构 32 位是 lib,64 位是 lib64)目录当中存在 libsessionmux.so。这个例子当中正是使用 libsessionmux.so 库来复用 Paho 创建的通道给 IoT 安全运营中心 Agent,供其向服务端收发数据。

1. 改造上云程序并接入安全 Agent

以 src/samples/MQTTAsync_publish.c 为例,讲述如何进一步通过微改造,接入 IoT 安全运营中心 Agent。

不论设备制造商在开发物联网平台接入时,使用了何种 SDK(包括但不限于阿里云物联网平台 LinkSDK、Paho、libmosquitto 等),这样的改造都遵循一定规律,具有一定的要领,主要有下面 4 点:

  • 需要同时启动 IoT 安全 Agent 服务程序和物联网平台连接程序。

  • 要在设备自身的物联网平台接入程序源码里,定义一个 IoT 安全 Agent 的上行数据处理回调,当 IoT 安全 Agent 有数据发送请求时,这个回调会被调用,此时利用当前的 MQTT publish 接口帮助发送数据给 IoT 安全运营中心云服务。同时这个回调函数定义当中也需要调用 MQTT subscribe 接口帮助 IoT 安全 Agent 订阅来自 IoT 安全运营中心云服务的下行 topic。

  • 在设备调用 MQTT 客户端接口成功连接到物联网平台之后,要调用 IoT 安全 Agent 提供的连接附着接口,并向接口输入 ProductKey,DeviceName,MQTT 连接句柄,以及上面第 2 条当中定义的回调函数句柄。

  • 在 MQTT 数据接收处理函数中,对下行 topic 进行匹配过滤,如果是发送给 IoT 安全 Agent 的 topic,则调用 IoT 安全 Agent 提供的接口发送给 Agent。

下面的改造步骤将全程贯穿上面 4 个要领,请逐一理解。

(1)在 Paho 源码目录树的 src/samples 下,复制 MQTTAync_publish.c 为 MQTTAsync_security.c,并确认aliot_mqtt_sign.c 也在 samples 目录下(也可以直接试用第 6 章附件中的代码放置在 src/samples 目录下),并在 MQTTAsync_security.c 源码额外包含两个 .h 文件,并定义一个全局变量:

...
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
...
/* 接入IoT安全运营中心, 包含下面两个文件, 并定义一个全局会话变量 */
#include "sagent_defs.h"
#include "session_mux.h"

static channel_session_t *sec_session = NULL;

(2)定义一个函数,用于处理 IoT 安全运营中心 Agent 的发送消息事件,直接复制粘贴即可。

/* 接入IoT安全运营中心消息回调 */
ipc_error_t demo_security_message_handler(void *context, pid_t pid, char *message, uint16_t length) {
 int rc = 0;
 channel_session_t *session = (channel_session_t *) context;
 MQTTAsync client = NULL;
 char topic[MAX_TOPIC_NAME_LEN] = { 0 };

 if (NULL != session) {
 fprintf(stdout, "[SOC] cloud_session found : %s\n", session->session_name);
 client = (MQTTAsync)session->custom_context;
 get_session_pub_topic(session, topic, MAX_TOPIC_NAME_LEN);
 } else {
 fprintf(stderr, "[SOC] channel session is empty\n");
 }

 if (0 == strncmp(message, IPC_DPS_CONNECTED, length)) {
 /* 订阅 MQTT topic */
 char sec_topic_sub[MAX_TOPIC_NAME_LEN] = { 0 };
 get_session_sub_topic(session, sec_topic_sub, MAX_TOPIC_NAME_LEN);
 if ((rc = MQTTAsync_subscribe(client, sec_topic_sub, 0, NULL)) != MQTTASYNC_SUCCESS) {
 fprintf(stderr, "[SOC] Failed to start subscribe, return code %d\n", rc);
 }
 return 0;
 }

 if (NULL != client) {
 MQTTAsync_message pubmsg = MQTTAsync_message_initializer;

 fprintf(stdout, "[SOC] upstream message\n");
 dump_binary_string(message, length, 128);

 pubmsg.payload = message;
 pubmsg.payloadlen = (int)length;
 pubmsg.qos = 0;
 pubmsg.retained = 0;
 rc = MQTTAsync_sendMessage(client, topic, &pubmsg, NULL);
 fprintf(stdout, "[SOC] MQTTAsync_sendMessage result = %d\n", rc);
 } else {
 fprintf(stderr, "[SOC] MQTT client as custom context is empty\n");
 }

 return 0;
}

(3)在 Demo 程序 Paho MQTT Async 异步建立连接成功之后的 onConnect 回调函数里,调用 IoT 安全 Agent 提供的接口:

void onConnect(void *context, MQTTAsync_successData *response) {
 // 此函数被调用,说明 MQTT 连接建立成功了
 MQTTAsync client = (MQTTAsync)context;
 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
 MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
 int rc;

 printf("Successfully connected\n");
 opts.onSuccess = onSend;
 opts.onFailure = onSendFailure;
 opts.context = client;
 pubmsg.payload = PAYLOAD;
 pubmsg.payloadlen = (int)strlen(PAYLOAD);
 pubmsg.qos = QOS;
 pubmsg.retained = 0;
 if ((rc = MQTTAsync_sendMessage(client, user_topic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) {
 printf("Failed to start sendMessage, return code %d\n", rc);
 exit(EXIT_FAILURE);
 }

 /* 接入IoT安全运营中心, 在MQTT连接成功之后, 调用下面的函数
 * 参数分别为:product_key, device_name, 定义的消息回调, mqtt_handler句柄 */
 sec_session = attach_to_security_service(product_key, // 传入 ProductKey
 device_name, // 传入 DeviceName
 demo_security_message_handler, // 传入第 2 步中定义的回调函数指针
 client, // 传入 MQTT handler
 "aliot");
}

(4)在 Paho MQTT Aync 异步消息接收处理函数中,对来自 IoT 安全运营中心服务端的下行 topic 进行过滤并转发给安全 Agent。

int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *m) {
 /* not expecting any messages */

 /* 接入IoT安全运营中心, 处理相关的报文 */
 if (NULL != sec_session) {
 char topic[MAX_TOPIC_NAME_LEN] = { 0 };
 get_session_sub_topic(sec_session, topic, MAX_TOPIC_NAME_LEN);
 if (0 == strncmp(topicName, topic, topicLen)) {
 fprintf(stdout, "[SOC] downstream message\n");
 dump_binary_string(m->payload, m->payloadlen, 128);
 send_to_session(sec_session, m->payload, m->payloadlen);
 }
 }

 return 1;
}

2. 编译和测试

将 IoT 安全运营中心 Agent 相应架构的 libsessionmux.so 拷贝至交叉编译环境 $sysroot/usr/local/lib 下,并将对应的 include 目录中的头文件拷贝至 $sysroot/usr/local/include 目录。

在 Paho 源码树中,修改 src/samples/CMakeLists.txt,添加 MQTTAsync_security 应用程序构建,并为这个应用程序添加上述示范程序对 libsessionmux 的依赖:

INCLUDE_DIRECTORIES(
 .
 ${PROJECT_SOURCE_DIR}/src
 ${PROJECT_BINARY_DIR}
 # 新增头文件搜索路径
 /usr/local/include
)

# 新增动态库搜索路径
LINK_DIRECTORIES( /usr/local/lib )
...
# 新增一个 sample 应用程序 MQTTAsync_security, 依赖上面两个源文件
ADD_EXECUTABLE(MQTTAsync_security MQTTAsync_security.c aliot_mqtt_sign.c)
...
# 追加安全 Agent 依赖的 SEC_AGENT_LIB
TARGET_LINK_LIBRARIES(MQTTAsync_security paho-mqtt3a sessionmux)
...

在 Paho 代码根目录下创建目录 build,进入 build 子目录并执行:

mkdir build //在工程的根目录下执行
cd build
cmake ..
make -j

其它 GNU 构建工具,例如 scons 或者 make 均仅需加入对 $sysroot/usr/local/lib/libsessionmux.so 和对它的两个头文件位于 $sysroot/usr/local/include 的查找依赖即可,本文不再一一举例。

构建出 build/src/samples/MQTTAsync_security 可执行程序,执行时传入物联网平台实例 ID,ProductKey,DeviceName,DeviceSecret。

并确保目标设备已安装好 IoT 安全运营中心 Linux Agent,安全服务启动状态下(sudo systemctl start dpsd.serviec 启动)运行参考程序,即可连接 IoT 安全运营中心,并在IoT安全运营中心控制台资产列表中看到上线设备。

# 首先确认 IoT 安全 Agent 服务已启动
root@bloodless-vm-ubuntu:~/project/Paho# service dpsd status
dpsd start/running, process 1144

# 来到 Paho/build 目录,执行 cmake 和 make
root@bloodless-vm-ubuntu:~/project/Paho# cd build/
root@bloodless-vm-ubuntu:~/project/Paho/build# cmake ..
...
root@bloodless-vm-ubuntu:~/project/Paho/build# make
... 

# 构建生成 Paho/build/src/samples/MQTTAsync_security 可执行程序
# 执行这个程序,传入参数 iot_instance_id, product_key, device_name, device_secret
root@bloodless-vm-ubuntu:~/project/Paho/build# cd src/samples/
root@bloodless-vm-ubuntu:~/project/Paho/build/src/samples# ./MQTTAsync_security iot_instance_id product_key device_name device_secret

3. 固件升级和生产参考

通过上面的改造,在目标设备上已经形成了阿里云物联网平台接入+安全 Agent 的协同工作。如果您的设备固件是通过 rootfs 烧录方式在产线上进行生产,或者是采用 rootfs 进行全量或者差分 FOTA,请注意为固件更新添加以下一些文件系统路径:

  • /system/dps

  • /etc/systemd/system/dpsd_lite.service 或者 /etc/init/dpsd_lite.conf

  • /usr/lib/libsessionmux.so

  • /usr/lib64/libsessionmux.so // 64 位架构,指向 /usr/lib/libsessionmux.so 的软链接

4. 示例代码

可以通过阅读和编译运行下面源代码,进行快速体验,并跑通一个Paho 接入物联网平台 + 安全 Agent 的例子。源文件和 Makefile:下载

实践步骤:

  1. 按照阿里云物联网平台 Paho 上云最佳实践部署 Paho 源码。

  2. 解压缩附件 demo 包,将 CMakeLists.txt 和 MQTTAsync_security.c 放置到 Paho/src/samples 目录,并经头文件和动态库放置到相应的位置,按照上文方法进行编译执行即可。

后续步骤:确认安装效果