前置条件
(1)确认现有设备通过 libmosquitto 实现接入阿里云物联网平台:
libmosquitto 是著名的开源 MQTT broker Mosquitto 提供的客户端库和 API,用于帮助应用程序接入 MQTT broker。阿里云物联网平台同样可以通过 libmosquitto 进行接入。
在这里可以下载到 Mosquitto 的源代码:https://mosquitto.org/download/ 解压之后进入工程目录,可通过cmake 构建出 mosquitto MQTT broker 和 libmosquitto 库,本文主要使用 libmosquitto 库。
cd Mosquitto
[root@localhost Mosquitto]# cmake CMakeLists.txt
-- WITH_DLT = OFF
-- Could NOT find cJSON (missing: CJSON_INCLUDE_DIR CJSON_LIBRARY)
-- Optional dependency cJSON not found. Some features will be disabled.
-- Configuring done
-- Generating done
-- Build files have been written to: /root/project/Mosquitto
[root@localhost Mosquitto]# make
[ 0%] Building C object lib/CMakeFiles/libmosquitto.dir/actions.c.o
[ 1%] Building C object lib/CMakeFiles/libmosquitto.dir/callbacks.c.o
[ 1%] Building C object lib/CMakeFiles/libmosquitto.dir/connect.c.o
[ 2%] Building C object lib/CMakeFiles/libmosquitto.dir/handle_auth.c.o
[ 3%] Building C object lib/CMakeFiles/libmosquitto.dir/handle_connack.c.o
[ 3%] Building C object lib/CMakeFiles/libmosquitto.dir/handle_disconnect.c.o
[ 4%] Building C object lib/CMakeFiles/libmosquitto.dir/handle_ping.c.o
...
[100%] Built target mqtt.7 # 构建完成
[root@localhost Mosquitto]# make install # 构建安装
[ 27%] Built target libmosquitto
[ 28%] Built target mosquittopp
[ 32%] Built target mosquitto_rr
[ 35%] Built target mosquitto_sub
[ 38%] Built target mosquitto_pub
...
[100%] Built target mqtt.7
Install the project...
# make install 成功之后 libmosquitto 位于 /usr/local/lib64 或者 /usr/local/lib 下
构建完之后生成的 libmosquitto.so 位于 Linux 的 /usr/local/lib64 或者 /usr/local/lib 目录下。可以从第 6 章节示例代码附件中获取本文档使用的 Demo 库,它包含以下 3 个源文件:
文件名 | 说明 |
CMakeLists.txt | Demo 构建用的 CMake 文件 |
aliot_security_example.c | 利用 libmosquitto 接入阿里云物联网平台并复用 MQTT 会话给 IoT 安全 Agent 的示例。拷贝到 Mosquitto 根目录 example/security 子目录下。 |
aliot_mqtt_sign.c | 该文件中的代码用于生成 MQTT 建连参数。应用程序调用该文件中定义的aiotMqttSign() 函数,计算出连接参数 username、password和clientId。 |
在构建出了 libmosquitto.so 之后,上述三个源代码可独立放置,CMakeLists.txt 当中有查找 libmosquitto.so 的依赖配置。
(2)需要安装 IoT 安全运营中心 Agent
在调试设备上,下载并安装 IoT 安全运营中心 Agent,安装过程中上云方式选择“接入公有云安全运营中心”。
安装好 IoT 安全运营中心 Agent 之后,请确认调试设备的 /usr/local/lib/ 目录当中存在 libsessionmux.so 且 /usr/local/include/ 目录当中存在 sagent_defs.h 以及 session_mux.h 两个头文件。这个例子当中正是使用 libsessionmux.so 库来复用 libmosquitto 创建的通道给 IoT 安全运营中心 Agent,供其向服务端收发数据。
安装好 IoT 安全运营中心 Agent 之后,执行下面的命令替换上云方式:
sudo service dpsd stop
sudo cp /system/dps/etc/service.ini /system/dps/etc/service_linux.ini
sudo cp /system/dps/etc/service_custom.ini /system/dps/etc/service.ini
sudo service dpsd start
1. 改造连接物联网平台程序并接入安全 Agent
以示例代码中的 aliot_security_example.c 源码为例,讲述如何进一步通过微改造,接入 IoT 安全运营中心 Agent。
不论设备制造商在开发物联网平台接入时,使用了何种 SDK(包括但不限于阿里云物联网平台 LinkSDK、Paho、libmosquitto 等),这样的改造都遵循一定规律,具有一定的要领,主要有下面 4 点:
需要同时启动 IoT 安全 Agent 服务程序和物联网平台连接程序。
要在设备自身的物联网平台接入程序源码里,定义一个 IoT 安全 Agent 的上行数据处理回调,当 IoT 安全 Agent 有数据发送请求时,这个回调会被调用,此时利用当前的 MQTT publish 接口帮助发送数据给 IoT 安全运营中心云服务。同时这个回调函数定义当中也需要调用 MQTT subscribe 接口帮助 IoT 安全 Agent 订阅来自 IoT 安全运营中心云服务的下行 topic。
在设备调用 MQTT 客户端接口成功连接到物联网平台之后,要调用 IoT 安全 Agent 提供的连接附着接口,并向接口输入 ProductKey,DeviceName,MQTT 连接句柄,以及上面第 2 条当中定义的回调函数句柄。
在 MQTT 数据接收处理函数中,对下行 topic 进行匹配过滤,如果是发送给 IoT 安全 Agent 的 topic,则调用 IoT 安全 Agent 提供的接口发送给 Agent。
下面的改造步骤将全程贯穿上面 4 个要领,请逐一理解。
(1)在 aliot_security_example.c 源码中包含两个 .h 文件,并定义一个全局变量:
./*
* This example shows how to publish messages from outside of the Mosquitto network loop.
*/
#include <mosquitto.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
/* 接入IoT安全运营中心, 包含下面两个文件, 并定义一个全局会话变量 */
#include "sagent_defs.h"
#include "session_mux.h"
static channel_session_t *sec_session = NULL;
(2)定义一个函数,用于处理 IoT 安全运营中心 Agent 的发送消息事件,直接复制粘贴即可。
/* 接入IoT安全运营中心消息回调 */
ipc_error_t demo_security_message_handler(void *context, pid_t pid, char *message, uint16_t length) {
int rc = 0;
channel_session_t *session = (channel_session_t *) context;
struct mosquitto *mosq = NULL;
char topic[MAX_TOPIC_NAME_LEN] = { 0 };
fprintf(stdout, "[SOC] message callback : %d\n", length);
dump_binary_string(message, length, 128);
if (NULL != session) {
fprintf(stdout, "[SOC] cloud_session found : %s\n", session->session_name);
mosq = (struct mosquitto *)session->custom_context;
get_session_pub_topic(session, topic, MAX_TOPIC_NAME_LEN);
} else {
fprintf(stderr, "[SOC] channel session is empty\n");
}
if (0 == strncmp(message, IPC_DPS_CONNECTED, strlen(IPC_DPS_CONNECTED))) {
/* 订阅 MQTT topic */
struct mosquitto_message *msg;
char sec_topic_sub[MAX_TOPIC_NAME_LEN] = { 0 };
get_session_sub_topic(session, sec_topic_sub, MAX_TOPIC_NAME_LEN);
fprintf(stdout, "DPS CONNECTED received, go subscribe topic : %s\n", sec_topic_sub);
rc = mosquitto_subscribe(mosq, NULL, sec_topic_sub, 0);
if (rc) {
fprintf(stderr, "[SOC] Error: %s\n", mosquitto_strerror(rc));
} else {
fprintf(stdout, "[SOC] subscribed successfully : %s\n", sec_topic_sub);
}
return 0;
}
if (NULL != mosq) {
fprintf(stdout, "[SOC] upstream message\n");
dump_binary_string(message, length, 128);
rc = mosquitto_publish(mosq, NULL, topic, length, message, 0, false);
if (rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "[SOC] Error publishing: %s\n", mosquitto_strerror(rc));
}
} else {
fprintf(stderr, "[SOC] MQTT client as custom context is empty\n");
}
return 0;
}
(3)在 Demo 程序 libmosquitto 异步建立连接成功之后的 on_connect 回调函数里,调用 IoT 安全运营中心 Agent 提供的接口:
/* Callback called when the client receives a CONNACK message from the broker. */
void on_connect(struct mosquitto *mosq, void *obj, int reason_code) {
/* Print out the connection result. mosquitto_connack_string() produces an
* appropriate string for MQTT v3.x clients, the equivalent for MQTT v5.0
* clients is mosquitto_reason_string().
*/
printf("on_connect: %s\n", mosquitto_connack_string(reason_code));
if (reason_code != 0) {
/* If the connection fails for any reason, we don't want to keep on
* retrying in this example, so disconnect. Without this, the client
* will attempt to reconnect. */
mosquitto_disconnect(mosq);
}
/* You may wish to set a flag here to indicate to your application that the
* client is now connected. */
/* 接入IoT安全运营中心, 在MQTT连接成功之后, 调用下面的函数
* 参数分别为:product_key, device_name, 定义的消息回调, mqtt_handler句柄 */
sec_session = attach_to_security_service(product_key, // 传入 ProductKey
device_name, // 传入 DeviceName
demo_security_message_handler, // 传入第 2 步中定义的回调函数指针
(void*) mosq, // 传入 MQTT handler
"aliot");
}
(4)在 libmosquitto 注册的消息回调接收处理函数中,对来自 IoT 安全运营中心服务端的下行 topic 进行过滤并转发给安全 Agent。
void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) {
fprintf(stdout, "%s %s (%d)\n", msg->topic, (const char *)msg->payload, msg->payloadlen);
/* 接入IoT安全运营中心, 处理相关的报文 */
if (NULL != sec_session) {
char topic[MAX_TOPIC_NAME_LEN] = { 0 };
get_session_sub_topic(sec_session, topic, MAX_TOPIC_NAME_LEN);
if (0 == strncmp(msg->topic, topic, strlen(msg->topic))) {
fprintf(stdout, "[SOC] downstream message\n");
dump_binary_string(msg->payload, msg->payloadlen, 128);
send_to_session(sec_session, msg->payload, msg->payloadlen);
}
}
}
2. 编译和测试
将源代码拷贝到目标设备上,或者将 IoT 安全运营中心 Agent 相应架构的 libsessionmux.so 拷贝至交叉编译环境 $sysroot/usr/local/lib 下开始编译。
使用 cmake 构建上面的 Demo 工程,产生可执行程序 aliot_security_demo:
[root@localhost security]# cmake CMakeLists.txt
...
[root@localhost security]# make
[100%] Built target aliot_security_demo
[root@localhost security]# ./aliot_security_demo iot_instance_id product_key device_name device_secret
执行时传入物联网平台实例 ID,ProductKey,DeviceName,DeviceSecret。并确保目标设备已安装好 IoT 安全运营中心 Agent,安全服务启动状态下(sudo systemctl start dpsd.serviec 启动)运行参考程序,即可连接 IoT 安全运营中心,并在安全运营中心控制台资产列表中看到上线设备。
3. 固件升级和生产参考
通过上面的改造,在目标设备上已经形成了阿里云物联网平台接入+安全 Agent 的协同工作。如果您的设备固件是通过 rootfs 烧录方式在产线上进行生产,或者是采用 rootfs 进行全量或者差分 FOTA,请注意为固件更新添加以下一些文件系统路径:
/system/dps
/etc/systemd/system/dpsd.service 或者 /etc/init/dpsd.conf
/usr/lib/libsessionmux.so 或者 /usr/lib64/libsessionmux.so
/usr/local/include
4. 示例代码
可以通过阅读和编译运行下面源代码,进行快速体验,并跑通一个基于 libmosquitto 开发的程序接入物联网平台 + 安全 Agent 的例子。
源文件和 Makefile:下载
实践步骤:
(1)安装 IoT 安全运营中心 Agent,安装时选择使用已经在物联网平台上注册的产品。
(2)下载并构建 libmosquitto 动态库。
(3)解压缩附件 demo 包,按照上文方法进行编译执行即可。运行测试程序之前需要首先执行:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib
后续步骤:确认集成效果