本文以C Link SDK中的Demo文件./mqtt_basic_demo.c
为例,介绍如何调用Link SDK的API,将MQTT协议的设备接入物联网平台并进行消息收发。
步骤一:初始化
- 添加头文件。
#include "aiot_state_api.h"
#include "aiot_sysdep_api.h"
#include "aiot_mqtt_api.h"
- 配置底层依赖和日志输出。
aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
aiot_state_set_logcb(demo_state_logcb);
- 调用aiot_mqtt_init,创建MQTT客户端实例,并初始化默认参数。
mqtt_handle = aiot_mqtt_init();
if (mqtt_handle == NULL) {
printf("aiot_mqtt_init failed\n");
return -1;
}
步骤二:配置功能
更多功能的配置项,请参见aiot_mqtt_option_t。
- 配置连接参数。
- 示例代码:
char *product_key = "a18wP******";
char *device_name = "LightSwitch";
char *device_secret = "uwMTmVAMnGGHaAkqmeDY6cHxxB******";
char *mqtt_host = "iot-06z00ax1o******.mqtt.iothub.aliyuncs.com";
...
...
/* 配置MQTT服务器地址。 */
aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)url);
/* 配置MQTT服务器端口。 */
aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port);
/* 配置设备ProductKey。 */
aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key);
/* 配置设备DeviceName。 */
aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name);
/* 配置设备DeviceSecret。 */
aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret);
/* 配置网络连接的安全凭据。 */
aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
- 相关参数:
参数 |
示例 |
说明 |
mqtt_host |
iot-06z00ax1o******.mqtt.iothub.aliyuncs.com |
设备的接入域名。
- 企业版实例和新版公共实例:在实例详情页面的开发配置面板,查看接入域名。
- 旧版公共实例:接入域名格式为
${YourProductKey}.iot-as-mqtt.${YourRegionId}.aliyuncs.com 。
新旧版公共实例和企业版实例、以及接入域名的更多信息,请参见查看实例终端节点。
|
product_key |
a18wP****** |
设备认证信息。更多信息,请参见获取设备认证信息。
本例程的身份认证方式为一机一密。
|
device_name |
LightSwitch |
device_secret |
uwMTmVAMnGGHaAkqmeDY6cHxxB****** |
- MQTT保活说明:
C Link SDK具备保活能力,您可以设置以下配置项,自定义设备连接的保活心跳。如果不配置,则取默认值。
配置项 |
默认值 |
说明 |
AIOT_MQTTOPT_HEARTBEAT_MAX_LOST |
2 |
可容忍的心跳丢失阈值。即:心跳请求报文达到设置的次数后,发起重连。 |
AIOT_MQTTOPT_HEARTBEAT_INTERVAL_MS |
25,000 |
每次发起重连之间的间隔时间。单位毫秒, 取值范围:1000~1,200,000。 |
AIOT_MQTTOPT_KEEPALIVE_SEC |
1200 |
可容忍的心跳丢失时间阈值。即:失去心跳后,设置的时间内,允许发起重连。单位秒,取值范围:30~1,200。建议取值大于300。 |
- 配置状态监控和消息回调。
- 配置状态监控回调函数。
- 示例代码:
int main(int argc, char *argv[])
{
...
...
/* 配置MQTT默认消息接收回调函数。 */
aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler);
/* 配置MQTT事件回调函数。 */
aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)demo_mqtt_event_handler);
...
...
}
- 相关参数:
配置项 |
示例值 |
说明 |
AIOT_MQTTOPT_RECV_HANDLER |
demo_mqtt_default_recv_handler |
当接收消息时,根据该回调函数定义的处理逻辑,执行对应的处理。 |
AIOT_MQTTOPT_EVENT_HANDLER |
demo_mqtt_event_handler |
当设备连接状态发生变化时,根据该回调函数定义的处理逻辑,执行对应的处理。 |
- 定义状态监控的回调函数。
/* MQTT事件回调函数, 当网络连接、重连或断开时,触发该函数, 事件定义见core/aiot_mqtt_api.h。 */
void demo_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event, void *userdata)
{
switch (event->type) {
/* 调用了aiot_mqtt_connect()接口, 与MQTT服务器建立连接。 */
case AIOT_MQTTEVT_CONNECT: {
printf("AIOT_MQTTEVT_CONNECT\n");
/* TODO: 处理SDK建立连接成功, 不可在此调用耗时较长的阻塞函数。 */
}
break;
/* SDK因网络状况被动断开连接后, 成功自动发起重连。 */
case AIOT_MQTTEVT_RECONNECT: {
printf("AIOT_MQTTEVT_RECONNECT\n");
/* TODO: 处理SDK重连成功, 不可在此调用耗时较长的阻塞函数。 */
}
break;
/* SDK因网络状况被动断开了连接, network底层读写失败, heartbeat没有按预期得到服务端心跳应答。 */
case AIOT_MQTTEVT_DISCONNECT: {
char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") :
("heartbeat disconnect");
printf("AIOT_MQTTEVT_DISCONNECT: %s\n", cause);
/* TODO: 处理SDK被动断开连接, 不可在此调用耗时较长的阻塞函数。 */
}
break;
default: {
}
}
}
- 定义消息接收的回调函数。
注意
- 请勿将消息的处理逻辑,定义得过于耗时,以免阻塞收包线程。
- 如果您要根据接收的消息做应对处理,可在
TODO
处,按照需要修改代码。
/* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时, 且您未设置对应回调的处理时,以下接口被调用。 */
void demo_mqtt_default_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata)
{
switch (packet->type) {
case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: {
printf("heartbeat response\n");
/* TODO: 处理服务器对心跳的回应, 一般不处理。 */
}
break;
case AIOT_MQTTRECV_SUB_ACK: {
printf("suback, res: -0x%04X, packet id: %d, max qos: %d\n",
-packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos);
/* TODO: 处理服务器对订阅请求的回应, 一般不处理。 */
}
break;
case AIOT_MQTTRECV_PUB: {
printf("pub, qos: %d, topic: %.*s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic);
printf("pub, payload: %.*s\n", packet->data.pub.payload_len, packet->data.pub.payload);
/* TODO: 处理服务器下发的业务报文。 */
}
break;
case AIOT_MQTTRECV_PUB_ACK: {
printf("puback, packet id: %d\n", packet->data.pub_ack.packet_id);
/* TODO: 处理服务器对QoS=1上报消息的回应, 一般不处理。 */
}
break;
default: {
}
}
}
步骤三:请求连接
调用aiot_mqtt_connect,根据配置连接的参数,向物联网平台,发起连接认证请求。
/* 与服务器建立MQTT连接。 */
res = aiot_mqtt_connect(mqtt_handle);
if (res < STATE_SUCCESS) {
/* 尝试建立连接失败, 销毁MQTT实例, 回收资源。 */
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_connect failed: -0x%04X\n", -res);
return -1;
}
步骤四:开启保活线程
调用aiot_mqtt_process,向服务器发送心跳报文,使设备保持长连接状态,并重发QoS=1
的未应答报文。
- 开启保活线程。
res = pthread_create(&g_mqtt_process_thread, NULL, demo_mqtt_process_thread, mqtt_handle);
if (res < 0) {
printf("pthread_create demo_mqtt_process_thread failed: %d\n", res);
return -1;
}
- 设置保活线程处理函数。
void *demo_mqtt_process_thread(void *args)
{
int32_t res = STATE_SUCCESS;
while (g_mqtt_process_thread_running) {
res = aiot_mqtt_process(args);
if (res == STATE_USER_INPUT_EXEC_DISABLED) {
break;
}
sleep(1);
}
return NULL;
}
步骤五:开启接收线程
调用aiot_mqtt_recv,收取服务器下发的MQTT消息,根据消息回调函数,执行对应处理。在断线时自动重连,根据事件回调函数,执行对应处理。
- 开启接收线程。
res = pthread_create(&g_mqtt_recv_thread, NULL, demo_mqtt_recv_thread, mqtt_handle);
if (res < 0) {
printf("pthread_create demo_mqtt_recv_thread failed: %d\n", res);
return -1;
}
- 设置接收线程处理函数。
void *demo_mqtt_recv_thread(void *args)
{
int32_t res = STATE_SUCCESS;
while (g_mqtt_recv_thread_running) {
res = aiot_mqtt_recv(args);
if (res < STATE_SUCCESS) {
if (res == STATE_USER_INPUT_EXEC_DISABLED) {
break;
}
sleep(1);
}
}
return NULL;
}
步骤六:订阅Topic
调用aiot_mqtt_sub,订阅指定Topic。
- 示例代码:
{
char *sub_topic = "/a18wP******/LightSwitch/user/get";
res = aiot_mqtt_sub(mqtt_handle, sub_topic, NULL, 1, NULL);
if (res < 0) {
printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res);
return -1;
}
}
- 相关参数:
参数 |
示例 |
说明 |
sub_topic |
/a18wP******/LightSwitch/user/get |
拥有订阅权限的Topic。
a18wP****** 为设备的ProductKey。
LightSwitch 为设备的DeviceName。
本示例为默认的自定义Topic。 设备通过该Topic,可接收物联网平台的消息。关于Topic的更多信息,请参见什么是Topic。
|
步骤七:发送消息
调用aiot_mqtt_pub,向指定Topic发送消息。
- 示例代码:
{
char *pub_topic = "/a18wP******/LightSwitch/user/update";
char *pub_payload = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}";
res = aiot_mqtt_pub(mqtt_handle, pub_topic, (uint8_t *)pub_payload, (uint32_t)strlen(pub_payload), 0);
if (res < 0) {
printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res);
return -1;
}
}
- 相关参数:
参数 |
示例 |
说明 |
pub_topic |
/a18wP******/LightSwitch/user/update |
拥有发布权限的Topic。
a18wP****** 为设备的ProductKey。
LightSwitch 为设备的DeviceName。
设备通过该Topic向物联网平台发送消息。关于Topic的更多信息,请参见什么是Topic。
|
pub_payload |
{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}} |
上报至物联网平台的消息内容。 由于示例消息的Topic类型为自定义,因此数据格式可自定义。
关于数据格式的更多信息,请参见数据格式。
|
设备与物联网平台建立MQTT通信后,
请确保通信量不超过阈值。
- 通信限制的更多信息,请参见连接通信。
- 如果通信超过阈值,请登录物联网平台查看堆积消息。更多信息,请参见查看和监控消费组。
步骤八:断开连接
说明
MQTT接入常应用于长连接的设备,程序通常不会运行至此。
例程的主线程任务为配置参数并成功建立连接。连接建立后,主线程可进入休眠。
调用aiot_mqtt_disconnect,向物联网平台发送断开连接的报文,然后断开网络连接。
res = aiot_mqtt_disconnect(mqtt_handle);
if (res < STATE_SUCCESS) {
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_disconnect failed: -0x%04X\n", -res);
return -1;
}
步骤九:退出程序
调用
aiot_mqtt_deinit,销毁MQTT
客户端实例,释放资源。 res = aiot_mqtt_deinit(&mqtt_handle);
if (res < STATE_SUCCESS) {
printf("aiot_mqtt_deinit failed: -0x%04X\n", -res);
return -1;
}