基于 libmosquitto 集成安全 Agent 最佳实践

前置条件

(1)确认现有设备通过 libmosquitto 实现接入阿里云物联网平台:

libmosquitto 是著名的开源 MQTT broker Mosquitto 提供的客户端库和 API,用于帮助应用程序接入 MQTT broker。阿里云物联网平台同样可以通过 libmosquitto 进行接入。

在这里可以下载到 Mosquitto 的源代码:https://mosquitto.org/download/ 解压之后进入工程目录,可通过cmake 构建出 mosquitto MQTT broker 和 libmosquitto 库,本文主要使用 libmosquitto 库。

cd Mosquitto
[root@localhost Mosquitto]# cmake CMakeLists.txt
-- WITH_DLT = OFF
-- Could NOT find cJSON (missing: CJSON_INCLUDE_DIR CJSON_LIBRARY) 
-- Optional dependency cJSON not found. Some features will be disabled.
-- Configuring done
-- Generating done
-- Build files have been written to: /root/project/Mosquitto
[root@localhost Mosquitto]# make
[ 0%] Building C object lib/CMakeFiles/libmosquitto.dir/actions.c.o
[ 1%] Building C object lib/CMakeFiles/libmosquitto.dir/callbacks.c.o
[ 1%] Building C object lib/CMakeFiles/libmosquitto.dir/connect.c.o
[ 2%] Building C object lib/CMakeFiles/libmosquitto.dir/handle_auth.c.o
[ 3%] Building C object lib/CMakeFiles/libmosquitto.dir/handle_connack.c.o
[ 3%] Building C object lib/CMakeFiles/libmosquitto.dir/handle_disconnect.c.o
[ 4%] Building C object lib/CMakeFiles/libmosquitto.dir/handle_ping.c.o
...
[100%] Built target mqtt.7 # 构建完成

[root@localhost Mosquitto]# make install # 构建安装
[ 27%] Built target libmosquitto
[ 28%] Built target mosquittopp
[ 32%] Built target mosquitto_rr
[ 35%] Built target mosquitto_sub
[ 38%] Built target mosquitto_pub
...
[100%] Built target mqtt.7
Install the project...
# make install 成功之后 libmosquitto 位于 /usr/local/lib64 或者 /usr/local/lib 下

构建完之后生成的 libmosquitto.so 位于 Linux 的 /usr/local/lib64 或者 /usr/local/lib 目录下。可以从第 6 章节示例代码附件中获取本文档使用的 Demo 库,它包含以下 3 个源文件:

文件名

说明

CMakeLists.txt

Demo 构建用的 CMake 文件

aliot_security_example.c

利用 libmosquitto 接入阿里云物联网平台并复用 MQTT 会话给 IoT 安全 Agent 的示例。拷贝到 Mosquitto 根目录 example/security 子目录下。

aliot_mqtt_sign.c

该文件中的代码用于生成 MQTT 建连参数。应用程序调用该文件中定义的aiotMqttSign() 函数,计算出连接参数 username、password和clientId。

在构建出了 libmosquitto.so 之后,上述三个源代码可独立放置,CMakeLists.txt 当中有查找 libmosquitto.so 的依赖配置。

(2)需要安装 IoT 安全运营中心 Agent

在调试设备上,下载并安装 IoT 安全运营中心 Agent,安装过程中上云方式选择“接入公有云安全运营中心”。

安装好 IoT 安全运营中心 Agent 之后,请确认调试设备的 /usr/local/lib/ 目录当中存在 libsessionmux.so 且 /usr/local/include/ 目录当中存在 sagent_defs.h 以及 session_mux.h 两个头文件。这个例子当中正是使用 libsessionmux.so 库来复用 libmosquitto 创建的通道给 IoT 安全运营中心 Agent,供其向服务端收发数据。

安装好 IoT 安全运营中心 Agent 之后,执行下面的命令替换上云方式:

sudo service dpsd stop
sudo cp /system/dps/etc/service.ini /system/dps/etc/service_linux.ini
sudo cp /system/dps/etc/service_custom.ini /system/dps/etc/service.ini
sudo service dpsd start

1. 改造连接物联网平台程序并接入安全 Agent

以示例代码中的 aliot_security_example.c 源码为例,讲述如何进一步通过微改造,接入 IoT 安全运营中心 Agent。

不论设备制造商在开发物联网平台接入时,使用了何种 SDK(包括但不限于阿里云物联网平台 LinkSDK、Paho、libmosquitto 等),这样的改造都遵循一定规律,具有一定的要领,主要有下面 4 点:

  • 需要同时启动 IoT 安全 Agent 服务程序和物联网平台连接程序。

  • 要在设备自身的物联网平台接入程序源码里,定义一个 IoT 安全 Agent 的上行数据处理回调,当 IoT 安全 Agent 有数据发送请求时,这个回调会被调用,此时利用当前的 MQTT publish 接口帮助发送数据给 IoT 安全运营中心云服务。同时这个回调函数定义当中也需要调用 MQTT subscribe 接口帮助 IoT 安全 Agent 订阅来自 IoT 安全运营中心云服务的下行 topic。

  • 在设备调用 MQTT 客户端接口成功连接到物联网平台之后,要调用 IoT 安全 Agent 提供的连接附着接口,并向接口输入 ProductKey,DeviceName,MQTT 连接句柄,以及上面第 2 条当中定义的回调函数句柄。

  • 在 MQTT 数据接收处理函数中,对下行 topic 进行匹配过滤,如果是发送给 IoT 安全 Agent 的 topic,则调用 IoT 安全 Agent 提供的接口发送给 Agent。

下面的改造步骤将全程贯穿上面 4 个要领,请逐一理解。

(1)在 aliot_security_example.c 源码中包含两个 .h 文件,并定义一个全局变量:

./*
 * This example shows how to publish messages from outside of the Mosquitto network loop.
 */

#include <mosquitto.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

/* 接入IoT安全运营中心, 包含下面两个文件, 并定义一个全局会话变量 */
#include "sagent_defs.h"
#include "session_mux.h"

static channel_session_t *sec_session = NULL;

(2)定义一个函数,用于处理 IoT 安全运营中心 Agent 的发送消息事件,直接复制粘贴即可。

/* 接入IoT安全运营中心消息回调 */
ipc_error_t demo_security_message_handler(void *context, pid_t pid, char *message, uint16_t length) {
 int rc = 0;
 channel_session_t *session = (channel_session_t *) context;
 struct mosquitto *mosq = NULL;
 char topic[MAX_TOPIC_NAME_LEN] = { 0 };

 fprintf(stdout, "[SOC] message callback : %d\n", length);
 dump_binary_string(message, length, 128);

 if (NULL != session) {
 fprintf(stdout, "[SOC] cloud_session found : %s\n", session->session_name);
 mosq = (struct mosquitto *)session->custom_context;
 get_session_pub_topic(session, topic, MAX_TOPIC_NAME_LEN);
 } else {
 fprintf(stderr, "[SOC] channel session is empty\n");
 }

 if (0 == strncmp(message, IPC_DPS_CONNECTED, strlen(IPC_DPS_CONNECTED))) {
 /* 订阅 MQTT topic */
 struct mosquitto_message *msg;
 char sec_topic_sub[MAX_TOPIC_NAME_LEN] = { 0 };
 get_session_sub_topic(session, sec_topic_sub, MAX_TOPIC_NAME_LEN);

 fprintf(stdout, "DPS CONNECTED received, go subscribe topic : %s\n", sec_topic_sub);
 rc = mosquitto_subscribe(mosq, NULL, sec_topic_sub, 0);
 if (rc) {
		 fprintf(stderr, "[SOC] Error: %s\n", mosquitto_strerror(rc));
	 } else {
 fprintf(stdout, "[SOC] subscribed successfully : %s\n", sec_topic_sub);
 }
 return 0;
 }

 if (NULL != mosq) {
 fprintf(stdout, "[SOC] upstream message\n");
 dump_binary_string(message, length, 128);

 rc = mosquitto_publish(mosq, NULL, topic, length, message, 0, false);
 if (rc != MOSQ_ERR_SUCCESS) {
 fprintf(stderr, "[SOC] Error publishing: %s\n", mosquitto_strerror(rc));
 }
 } else {
 fprintf(stderr, "[SOC] MQTT client as custom context is empty\n");
 }

 return 0;
}

(3)在 Demo 程序 libmosquitto 异步建立连接成功之后的 on_connect 回调函数里,调用 IoT 安全运营中心 Agent 提供的接口:

/* Callback called when the client receives a CONNACK message from the broker. */
void on_connect(struct mosquitto *mosq, void *obj, int reason_code) {
 /* Print out the connection result. mosquitto_connack_string() produces an
 * appropriate string for MQTT v3.x clients, the equivalent for MQTT v5.0
 * clients is mosquitto_reason_string().
 */
 printf("on_connect: %s\n", mosquitto_connack_string(reason_code));
 if (reason_code != 0) {
 /* If the connection fails for any reason, we don't want to keep on
 * retrying in this example, so disconnect. Without this, the client
 * will attempt to reconnect. */
 mosquitto_disconnect(mosq);
 }

 /* You may wish to set a flag here to indicate to your application that the
 * client is now connected. */
 /* 接入IoT安全运营中心, 在MQTT连接成功之后, 调用下面的函数
 * 参数分别为:product_key, device_name, 定义的消息回调, mqtt_handler句柄 */
 sec_session = attach_to_security_service(product_key, // 传入 ProductKey
 device_name, // 传入 DeviceName
 demo_security_message_handler, // 传入第 2 步中定义的回调函数指针
 (void*) mosq, // 传入 MQTT handler
 "aliot");
}

(4)在 libmosquitto 注册的消息回调接收处理函数中,对来自 IoT 安全运营中心服务端的下行 topic 进行过滤并转发给安全 Agent。

void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) {
	fprintf(stdout, "%s %s (%d)\n", msg->topic, (const char *)msg->payload, msg->payloadlen);

	/* 接入IoT安全运营中心, 处理相关的报文 */
 if (NULL != sec_session) {
 char topic[MAX_TOPIC_NAME_LEN] = { 0 };
 get_session_sub_topic(sec_session, topic, MAX_TOPIC_NAME_LEN);
 if (0 == strncmp(msg->topic, topic, strlen(msg->topic))) {
 fprintf(stdout, "[SOC] downstream message\n");
 dump_binary_string(msg->payload, msg->payloadlen, 128);
 send_to_session(sec_session, msg->payload, msg->payloadlen);
 }
 }
}

2. 编译和测试

将源代码拷贝到目标设备上,或者将 IoT 安全运营中心 Agent 相应架构的 libsessionmux.so 拷贝至交叉编译环境 $sysroot/usr/local/lib 下开始编译。

使用 cmake 构建上面的 Demo 工程,产生可执行程序 aliot_security_demo:

[root@localhost security]# cmake CMakeLists.txt 
...
[root@localhost security]# make
[100%] Built target aliot_security_demo
[root@localhost security]# ./aliot_security_demo iot_instance_id product_key device_name device_secret

执行时传入物联网平台实例 ID,ProductKey,DeviceName,DeviceSecret。并确保目标设备已安装好 IoT 安全运营中心 Agent,安全服务启动状态下(sudo systemctl start dpsd.serviec 启动)运行参考程序,即可连接 IoT 安全运营中心,并在安全运营中心控制台资产列表中看到上线设备。

3. 固件升级和生产参考

通过上面的改造,在目标设备上已经形成了阿里云物联网平台接入+安全 Agent 的协同工作。如果您的设备固件是通过 rootfs 烧录方式在产线上进行生产,或者是采用 rootfs 进行全量或者差分 FOTA,请注意为固件更新添加以下一些文件系统路径:

  • /system/dps

  • /etc/systemd/system/dpsd.service 或者 /etc/init/dpsd.conf

  • /usr/lib/libsessionmux.so 或者 /usr/lib64/libsessionmux.so

  • /usr/local/include

4. 示例代码

可以通过阅读和编译运行下面源代码,进行快速体验,并跑通一个基于 libmosquitto 开发的程序接入物联网平台 + 安全 Agent 的例子。

源文件和 Makefile:下载

实践步骤:

(1)安装 IoT 安全运营中心 Agent,安装时选择使用已经在物联网平台上注册的产品。

(2)下载并构建 libmosquitto 动态库。

(3)解压缩附件 demo 包,按照上文方法进行编译执行即可。运行测试程序之前需要首先执行:

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib

后续步骤:确认集成效果