本文介绍如何通过阿里自有的MQTT方案调用C SDK的API 接入阿里云物联网平台。

背景信息

设备在使用MQTT连接物联网平台时,会对设备的身份进行认证,因此调用相关API连接物联网平台时需要输入设备的身份信息:ProuductKey、DeviceName、DeviceSecret,设备的身份信息需要事先从物联网平台获取。下面是调用SDK提供的API连接物联网平台以及数据收发的主要流程示意:

数据收发的主要流程

如上图流程所示, 使用MQTT通信时有5个协议标准操作。

  • 建立连接: MQTT是面向连接的通信协议收发数据前, 设备必须先以云平台认可的用户名和密码登录成功, 对应aiot_mqtt_connect
  • 发布消息: MQTT连接建立后, 设备可向云平台上报消息, 消息的去向需要以某个主题(Topic)表达, 对应aiot_mqtt_pub
  • 订阅消息: MQTT连接建立后, 云平台可向设备下推指令, 而设备是否接收来源于某个主题(Topic)的下推, 以订阅动作向云平台表达, 对应aiot_mqtt_sub
  • 维持连接: MQTT是自动保活的长连接, 需要设备周期性的向云平台发送心跳报文, 对应aiot_mqtt_process, 它同时也重发QoS1的未应答报文。
  • 接收消息: MQTT连接建立, 且主题订阅成功后, 设备可在合适时, 读取云平台在已订阅主题上发给设备的报文, 对应aiot_mqtt_recv
说明
  • C SDK提供了一份阿里巴巴自研的MQTT Client实现, 并结合阿里云物联网平台的特性做了性能和稳定性方面的优化, 推荐使用阿里自有的MQTT方案连接云平台。
  • 设备调用aiot_mqtt_pub向云端上报消息太过频繁时, 会被云端限流从而导致设备端不会收到任何来自网络的通知, 这时只能人工在控制台页面查看。

    云端对设备上报的限流阈值, 请实时查看云平台产品限制页面了解。

API的使用

  1. API列表

    以下是完整的MQTT部分API列表及简要说明 (详见core/aiot_mqtt_api.h)。

    接口名 说明
    aiot_mqtt_init 创建MQTT客户端实例并设置默认参数。
    aiot_mqtt_setopt 设置mqtt连接参数, 详见MQTT设置参数说明
    aiot_mqtt_deinit 释放mqtt客户端实例的资源。
    aiot_mqtt_connect 与mqtt服务器建立连接, 同步接口。
    aiot_mqtt_disconnect 与mqtt服务器断开连接但保留客户端实例的资源, 同步接口。
    aiot_mqtt_heartbeat 向mqtt服务器发送PINGREQ报文, 维持连接, 仅在特殊场景使用。
    aiot_mqtt_process 包含定时心跳发送和QoS1消息的重传, 同步接口, 需要用户周期性调用。
    aiot_mqtt_pub 发送一条PUBLISH消息到mqtt服务器, 异步接口, 用于设备上报, 消息QoS为0。
    aiot_mqtt_sub 发送一条SUBSCRIBE消息到mqtt服务器, 异步接口, 用于启动对云端消息的接收。
    aiot_mqtt_unsub 发送一条UNSUBSCRIBE消息到mqtt服务器, 异步接口, 用于停止对云端消息的接收。
    aiot_mqtt_recv 尝试收取mqtt报文并分发给用户的订阅回调函数, 同步接口。
  2. API使用概述

    MQTT模块用于建立与阿里云物联网平台的连接, API使用流程如下:

    1. 调用 aiot_mqtt_init 初始化MQTT会话, 获取会话句柄。
    2. 调用 aiot_mqtt_setopt 配置MQTT会话的参数, 常用配置项见 aiot_mqtt_setopt 的说明。
    3. 调用 aiot_mqtt_connect 建立与阿里云物联网平台的连接。
    4. 启动一个线程, 线程中间歇性调用 aiot_mqtt_process 处理心跳和QoS1的消息。
    5. 启动一个线程, 线程中持续调用 aiot_mqtt_recv 接收网络上的MQTT报文。

    当接收到一条报文时, 按以下顺序检查当前MQTT会话的参数, 当满足某条的描述时, 会通过对应的回调函数进行通知, 并停止检查。

    1. 检查收到的报文topic是否已经通过 aiot_mqtt_setoptAIOT_MQTTOPT_APPEND_TOPIC_MAP 参数配置回调函数。
    2. 检查收到的报文topic是否已经通过 aiot_mqtt_sub API配置回调函数。
    3. 检查是否通过 aiot_mqtt_setoptAIOT_MQTTOPT_RECV_HANDLER 参数配置默认回调函数。

    经过以上步骤后, MQTT连接已建立并能保持与物联网平台的连接, 接下来按自己的场景用 aiot_mqtt_subaiot_mqtt_pub 等API实现业务逻辑即可。

  3. aiot_mqtt_setopt

    下面列出常用的配置选项, 至少需要配置以下选项才可使用MQTT的基本功能。

    其余配置选项均设有默认值, 可按业务需要进行调整。

    • AIOT_MQTTOPT_HOST: 配置连接的阿里云MQTT站点地址。
    • AIOT_MQTTOPT_PORT: 配置连接的阿里云MQTT站点端口号。
    • AIOT_MQTTOPT_PRODUCT_KEY: 配置设备的 productKey。
    • AIOT_MQTTOPT_DEVICE_NAME: 配置设备的 deviceName。
    • AIOT_MQTTOPT_DEVICE_SECRET: 配置设备的 deviceSecret。
    • AIOT_MQTTOPT_NETWORK_CRED: 配置建立MQTT连接时的安全凭据。
    • AIOT_MQTTOPT_RECV_HANDLER: 配置默认的数据接收回调函数。
    • AIOT_MQTTOPT_EVENT_HANDLER: 配置MQTT事件通知回调函数。
  4. aiot_mqtt_connect

    使用aiot_mqtt_setopt配置的mqtt连接参数连接mqtt服务器, 使用的建联参数按如下顺序优先选择。

    若配置了以下选项, 直接用配置的连接参数连接 AIOT_MQTTOPT_HOST 选项指定MQTT服务器的host地址。

    • AIOT_MQTTOPT_USERNAME
    • AIOT_MQTTOPT_PASSWORD
    • AIOT_MQTTOPT_CLIENTID

    若配置了以下选项, 则强制以阿里云平台的签名算法计算连接参数作为MQTT的用户名/密码, 连接阿里云平台。

    • AIOT_MQTTOPT_PRODUCT_KEY
    • AIOT_MQTTOPT_DEVICE_NAME
    • AIOT_MQTTOPT_DEVICE_SECRET
  5. aiot_mqtt_disconnect

    向MQTT服务器发送MQTT DISCONNECT报文, 然后断开网络连接。

    如果需要再次与MQTT服务器建立连接, 调用aiot_mqtt_connect 即可。

  6. aiot_mqtt_heartbeat

    aiot_mqtt_process 包含了定时发送心跳的机制, 如果有特殊需要的话, 可以使用此函数直接发送心跳报文。

  7. aiot_mqtt_process
    • 发送心跳至mqtt broker以维护mqtt连接, 心跳发送间隔由AIOT_MQTTOPT_HEARTBEAT_INTERVAL_MS 配置项控制。
    • 如果一条qos1的mqtt PUBLISH报文在AIOT_MQTTOPT_REPUB_TIMEOUT_MS 时间内没有收到mqtt PUBACK应答报文, 该函数会重发此消息, 直到成功为止。
  8. aiot_mqtt_recv

    除了从网络上接收MQTT报文之外, 本函数也包含了重连机制。

    • 当MQTT心跳丢失超过AIOT_MQTTOPT_HEARTBEAT_MAX_LOST 配置的次数时, 触发重连机制。
    • 当SDK检测到网络断开时, 触发重连机制。
    说明

    重连间隔由 AIOT_MQTTOPT_RECONN_INTERVAL_MS 指定。

常见的建连失败

SDK有2种渠道向外界表达建连失败之类的内部运行状态。

  • 一是在API的返回值处告知用户。
  • 二是从SDK内部, 调用用户传入的日志回调函数。

无论哪种渠道, 都统一用状态码(1个2字节的非正数整型)描述失败的原因。

状态码 数值 说明
STATE_MQTT_CONNACK_RCODE_SERVER_UNAVAILABLE -0x0303 MQTT服务器拒绝提供连接, 服务当前不可用。
STATE_MQTT_CONNACK_RCODE_BAD_USERNAME_PASSWORD -0x0304 MQTT服务器认为连接时的用户名/密码是非法的。
STATE_MQTT_CONNACK_RCODE_NOT_AUTHORIZED -0x0305 MQTT服务器进行连接身份验证失败, 登录密码错误, 一般是设备证书错误。
STATE_PORT_NETWORK_DNS_FAILED -0x0F05 TCP域名解析失败, 检查域名或ip是否配置正确。
STATE_PORT_NETWORK_CONNECT_FAILED -0x0F0C TCP建联失败。
STATE_PORT_TLS_INVALID_MAX_FRAGMENT -0x0F0B TLS报文最大长度被设置为0, 是不合理的参数。
STATE_PORT_TLS_INVALID_SERVER_CERT -0x0F14 TLS服务端证书配置错误, 检查服务端证书是否正确。
STATE_PORT_TLS_INVALID_CLIENT_CERT -0x0F15 TLS设备端证书配置错误, 检查客户端证书是否正确。
STATE_PORT_TLS_INVALID_CLIENT_KEY -0x0F16 TLS客户端密钥配置错误, 检查客户端密钥是否正确。
STATE_PORT_TLS_DNS_FAILED -0x0F17 TLS域名解析失败, 检查域名或ip是否配置正确。
STATE_PORT_TLS_SOCKET_CREATE_FAILED -0x0F18 TLS socket 创建失败。
STATE_PORT_TLS_SOCKET_CONNECT_FAILED -0x0F19 TLS socket 连接失败。
STATE_PORT_TLS_INVALID_RECORD -0x0F1A SSL收到的数据包出错, 一般是由于tls fragment过小。

以上是状态码的一部分, 所有MQTT连接有关的完整状态码清单可以在core/aiot_state_api.h中找到。

连接状态监控

使用函数aiot_mqtt_setopt()注册一个事件监控函数,对事件进行处理,并列出可以监控的事件。具体实现可参见如下:

/* MQTT事件回调函数, 当网络连接/重连/断开时被触发, 事件定义见core/aiot_mqtt_api.h */
void demo_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event, void *userdata)
{
    switch (event->type) {
        /* SDK因为用户调用了aiot_mqtt_connect()接口, 与mqtt服务器建立连接已成功 */
        case AIOT_MQTTEVT_CONNECT: {
            printf("AIOT_MQTTEVT_CONNECT\n");
            /* TODO: 处理SDK建连成功, 不可以在这里调用耗时较长的阻塞函数 */
        }
        break;

        /* SDK因为网络状况被动断连后, 自动发起重连已成功 */
        case AIOT_MQTTEVT_RECONNECT: {
            printf("AIOT_MQTTEVT_RECONNECT\n");
            /* TODO: 处理SDK重连成功, 不可以在这里调用耗时较长的阻塞函数 */
        }
        break;

        /* SDK因为网络的状况而被动断开了连接, network是底层读写失败, heartbeat是没有按预期得到服务端心跳应答 */
        case AIOT_MQTTEVT_DISCONNECT: {
            char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") :
                          ("heartbeat disconnect");
            printf("AIOT_MQTTEVT_DISCONNECT: %s\n", cause);
            /* TODO: 处理SDK被动断连, 不可以在这里调用耗时较长的阻塞函数 */
        }
        break;

        default: {

        }
    }
}

/* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时, 且无对应用户回调处理时被调用 */
void demo_mqtt_default_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata)
{
    switch (packet->type) {
        case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: {
            printf("heartbeat response\n");
            /* TODO: 处理服务器对心跳的回应, 一般不处理 */
        }
        break;

        case AIOT_MQTTRECV_SUB_ACK: {
            printf("suback, res: -0x%04X, packet id: %d, max qos: %d\n",
                   -packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos);
            /* TODO: 处理服务器对订阅请求的回应, 一般不处理 */
        }
        break;

        case AIOT_MQTTRECV_PUB: {
            printf("pub, qos: %d, topic: %.*s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic);
            printf("pub, payload: %.*s\n", packet->data.pub.payload_len, packet->data.pub.payload);
            /* TODO: 处理服务器下发的业务报文 */
        }
        break;

        case AIOT_MQTTRECV_PUB_ACK: {
            printf("puback, packet id: %d\n", packet->data.pub_ack.packet_id);
            /* TODO: 处理服务器对QoS1上报消息的回应, 一般不处理 */
        }
        break;

        default: {

        }
    }
}
            

重连机制

连接因为网络原因断开时, SDK内含了自动重连的功能, 这些重连的动作是在aiot_mqtt_recv()接口中完成的。

SDK周期性向服务器发送心跳报文保活, 正常情况下服务器会应答这些心跳, 而心跳如果达到一定次数未被应答, SDK将会发起自动重连。

  • 可容忍的心跳丢失阈值由AIOT_MQTTOPT_HEARTBEAT_MAX_LOST配置的次数决定, 配置接口是aiot_mqtt_setopt
  • 自动重连启动后, 会反复重试直到某次成功, 重连的间隔由AIOT_MQTTOPT_RECONN_INTERVAL_MS选项配置, 配置接口是aiot_mqtt_setopt

SDK调用底层依赖函数, 从协议栈读或写的时候发现连接已断开, 也会触发重连。重连操作请参见如下:

/* 执行aiot_mqtt_recv的线程, 包含网络自动重连和从服务器收取MQTT消息 */
void *demo_mqtt_recv_thread(void *args)
{
    int32_t res = STATE_SUCCESS;

    while (g_mqtt_recv_thread_running) {
        res = aiot_mqtt_recv(args);
        if (res < STATE_SUCCESS) {
            if (res == STATE_USER_INPUT_EXEC_DISABLED) {
                break;
            }
            sleep(1);
        }
    }
    return NULL;
}

获取离线推送消息

MQTT协议中, 设备一般是先建立连接, 再向云平台订阅某个Topic, 之后用户服务器可通过云平台, 向这个Topic下发指令, 即可达到设备。

  1. 如果用户服务器以QoS1等级向设备下发指令时, 设备不在线(未连接到云平台), 当次的指令下发就无法到达设备。
  2. 之后设备再上线连接到云平台的时, 若指定了cleanSession连接参数为0, 云平台会重发之前未到达设备的QoS1消息。
  3. 设备如果是使用的C-SDK连接阿里云平台, SDK收到消息后会进入用户以AIOT_MQTTOPT_RECV_HANDLER选项设置的默认回调函数。

另外, 如果用户希望设备上线时能够接收到离线期间的云端QoS1消息, 并且不在默认数据接收回调中处理这些离线消息, 可以这样编写代码。

/* MQTT离线消息处理回调, 可注册这个回调给SDK, 当SDK从服务器收到离线MQTT消息时进入 */
void demo_mqtt_offline_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata)
{
    if (AIOT_MQTTRECV_PUB != packet->type) {
        return;
    }

    /* TODO: 根据packet参数中的topic和payload编写业务逻辑 */
    printf("pub, qos: %d, topic: %.*s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic);
    printf("pub, payload: %.*s\n", packet->data.pub.payload_len, packet->data.pub.payload);
}

int main(int argc, char *argv[])
{
    ...
    ...
    /* 配置SDK的底层依赖 */
    aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
    /* 配置SDK的日志输出 */
    aiot_state_set_logcb(demo_state_logcb);

    /* 创建1个MQTT客户端实例并内部初始化默认参数 */
    mqtt_handle = aiot_mqtt_init();
    if (mqtt_handle == NULL) {
        printf("aiot_mqtt_init failed\n");
        return -1; 
    }

    /* 准备一个aiot_mqtt_topic_map_t结构体, 描述topic和回调函数的对应关系 */
    /* TODO: 实际使用时, 请替换sub_topic和topic_map.handler, userdata可酌情使用, SDK调用回调时会传回来 */
    char * sub_topic = "/sys/a13FN5TplKq/mqtt_basic_demo/user/get";
    aiot_mqtt_topic_map_t topic_map = {
        .topic = sub_topic,
        .handler = demo_mqtt_offline_recv_handler,
        .userdata = NULL
    };

    /* 用AIOT_MQTTOPT_APPEND_TOPIC_MAP选项, 可在连接前设置topic和回调函数之间的对应关系 */
    aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_APPEND_TOPIC_MAP, (void *)&topic_map);
    ...
    ...

    /* 设置连接MQTT时, cleanSession参数为0, SDK会向服务器表达接收离线期间的QoS1消息 */
    uint8_t clean = 0;
    aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_CLEAN_SESSION, (void *)&clean);

    /* 与服务器建立MQTT连接 */
    res = aiot_mqtt_connect(mqtt_handle);
    if (res < STATE_SUCCESS) {
        /* 尝试建立连接失败, 销毁MQTT实例, 回收资源 */
        aiot_mqtt_deinit(&mqtt_handle);
        printf("aiot_mqtt_connect failed: -0x%04X\n", -res);
        return -1;
    }

    /* 这时建连成功, 在
           /sys/a13FN5TplKq/mqtt_basic_demo/user/get
       上到达的离线QoS1消息就会触发
           demo_mqtt_offline_recv_handler */
    ...
}

物联网平台实例URL的确定

设备可以接入物联网平台的公共实例、或者企业实例,设备在使用C SDK连接物联网平台的时候需要指定实例的URL以让设备能够正确连接到相应的实例

  • 公共实例

    公共实例的URL类似"iot-as-mqtt.RegionID.aliyuncs.com",其中的RegionID即代表相应的站点,其取值可以从“地域和可用区”中查看,需要注意的是物联网平台这个服务并不是所有的阿里云地域都进行了部署,因此这里填入的RegionID一定要和产品创建的地域相同,比如“华东2(上海)”的RegionID是cn-shanghai,因此最终的URL为:iot-as-mqtt.cn-shanghai.aliyuncs.com。

  • 企业实例
    用户需要在企业实例中查看实例的接入URL,请参见文档“实例管理”中的“查看实例终端节点”章节了解如何查看实例详情,其中MQTT接入的URL如下图所示:mqtt_url

例程讲解

现对照demos/mqtt_basic_demo.c例程, 分步骤讲解如何使用API。

API的使用流程

本示例适用于支持POSIX线程的Linux设备, 它演示了用SDK配置MQTT参数并建立连接, 之后创建2个线程。

说明
  • 第一个线程用于保活长连接。
  • 第二个线程用于接收消息, 并在有消息到达时进入默认的数据回调, 在连接状态变化时进入事件回调。

需要用户关注或修改的部分,已用 TODO 在注释中标明。

  • 设置设备的设备证书

    例程使用的设备证书是公用的, 所以应用于实际业务时, 请替换如下的TODO部分, 传入用户自己真实的设备证书。

    /* TODO: 替换为自己设备的设备证书 */
    char *product_key       = "xxxxxx";
    char *device_name       = "xxxxxx";
    char *device_secret     = "xxxxxx";
  • 进入程序入口, 给SDK配置全局的底层依赖和日志回调

    底层依赖描述了硬件平台的资源使用方式, 比如怎样获取时钟, 分配内存等, 日志回调是用户的函数, SDK有log输出的时候会进入这个函数。

    int main(int argc, char *argv[])
    {
        ...
        ...
        /* 配置SDK的底层依赖 */
        aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
        /* 配置SDK的日志输出 */
        aiot_state_set_logcb(demo_state_logcb);
  • 给MQTT连接配置连接参数

    这些连接参数包括如何建立TLS连接, 服务器地址在哪, 连接时的设备证书是什么, 连接后的数据及事件回调函数是什么等等。请注意下面代码中url以及public_instance的设置一定要正确,对于url的取值请参见上面的章节“物联网平台实例URL的确定”。

       int8_t     public_instance = 1;  /* 用公共实例, 该参数要设置为1. 若用企业实例, 要将该参数设置为0 */
       char       *url = "iot-as-mqtt.cn-shanghai.aliyuncs.com";
    
    /* 创建1个MQTT客户端实例并内部初始化默认参数 */
        mqtt_handle = aiot_mqtt_init();
        if (mqtt_handle == NULL) {
            printf("aiot_mqtt_init failed\n");
            return -1;
        }
    
        /* 配置MQTT服务器地址 */
        aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)host);
        /* 配置MQTT服务器端口 */
        aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port);
        /* 配置设备productKey */
        aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key);
        /* 配置设备deviceName */
        aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name);
        /* 配置设备deviceSecret */
        aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret);
        /* 配置网络连接的安全凭据, 上面已经创建好了 */
        aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
        /* 配置MQTT默认消息接收回调函数 */
        aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler);
        /* 配置MQTT事件回调函数 */
        aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)demo_mqtt_event_handler);
  • 向服务器发起连接

    使用aiot_mqtt_connect()接口, 以上面设置的连接参数, 向指定的MQTT服务器发起连接请求。

    /* 与服务器建立MQTT连接 */
        res = aiot_mqtt_connect(mqtt_handle);
        if (res < STATE_SUCCESS) {
            /* 尝试建立连接失败, 销毁MQTT实例, 回收资源 */
            aiot_mqtt_deinit(&mqtt_handle);
            printf("aiot_mqtt_connect failed: -0x%04X\n", -res);
            return -1;
        }
  • 建连成功后, 开启专用的保活线程

    保活线程每到心跳周期会在aiot_mqtt_process中向服务器发送心跳报文, 保持长连接不断, 服务器就可以向设备下推消息。

    /* 创建一个单独的线程, 专用于执行aiot_mqtt_process, 它会自动发送心跳保活, 以及重发QoS1的未应答报文 */
        res = pthread_create(&g_mqtt_process_thread, NULL, demo_mqtt_process_thread, mqtt_handle);
        if (res < 0) {
            printf("pthread_create demo_mqtt_process_thread failed: %d\n", res);
            return -1;
        }
  • 建连成功后, 开启专用的接收线程

    接收线程在连接正常时, 读取服务器下推的MQTT消息, 并交给用户的demo_mqtt_event_handler处理, 连接异常时, 则等待aiot_mqtt_recv()自动重连。

    /* 创建一个单独的线程用于执行aiot_mqtt_recv, 它会循环收取服务器下发的MQTT消息, 并在断线时自动重连 */
        res = pthread_create(&g_mqtt_recv_thread, NULL, demo_mqtt_recv_thread, mqtt_handle);
        if (res < 0) {
            printf("pthread_create demo_mqtt_recv_thread failed: %d\n", res);
            return -1;
        }
  • 云端下推消息到达时, 用户需要编写处理逻辑

    demo_mqtt_event_handler是用户的函数, 实际使用时, 用户需要修改的部分已在注释中用TODO标明。

    • 主要对应业务消息, 要处理AIOT_MQTTRECV_PUB, 可根据消息的TopicPayload编写处理逻辑
    • 例程中对业务消息, 仅做了打印示意, 用户使用时需自行填写, 但注意处理逻辑不要耗时太久, 否则收包线程会被阻塞。
    /* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时, 且无对应用户回调处理时被调用 */
    void demo_mqtt_default_recv_handler(void *handle, const aiot_mqtt_recv_t *const packet, void *userdata)
    {
        switch (packet->type) {
        ...
            case AIOT_MQTTRECV_PUB: {
                printf("pub, qos: %d, topic: %.*s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic);
                printf("pub, payload: %.*s\n", packet->data.pub.payload_len, packet->data.pub.payload);
                /* TODO: 处理服务器下发的业务报文 */
            }
            break;
        ...
    }

    设备建连后, 使用物联网平台控制台监控运维 > 在线调试 页面, 可以模拟业务消息的下推, 例如:

    在线调试

    点击发送指令后, 例程会出现如下打印, 就是在demo_mqtt_default_recv_handler中发生的, 其中不以pub开头的行是日志回调demo_state_logcb中打印。

    [1577603312.688][LK-0309] pub: /sys/a13FN5XXXX/mqtt_basic_demo/thing/service/property/set
    
    [LK-030A] < 7B 22 6D 65 74 68 6F 64  22 3A 22 74 68 69 6E 67 | {"method":"thing
    [LK-030A] < 2E 73 65 72 76 69 63 65  2E 70 72 6F 70 65 72 74 | .service.propert
    [LK-030A] < 79 2E 73 65 74 22 2C 22  69 64 22 3A 22 31 31 36 | y.set","id":"116
    [LK-030A] < 33 39 34 30 30 37 34 22  2C 22 70 61 72 61 6D 73 | 39XXXXX","params
    [LK-030A] < 22 3A 7B 22 4C 69 67 68  74 53 77 69 74 63 68 22 | ":{"LightSwitch"
    [LK-030A] < 3A 30 7D 2C 22 76 65 72  73 69 6F 6E 22 3A 22 31 | :0},"version":"1
    [LK-030A] < 2E 30 2E 30 22 7D                                | .0.0"}
    
    pub, qos: 0, topic: /sys/a13FN5XXXX/mqtt_basic_demo/thing/service/property/set
    pub, payload: {"method":"thing.service.property.set","id":"11639XXXXX","params":{"LightSwitch":0},"version":"1.0.0"}
  • 连接状态变化时, 用户可在事件回调中得知

    连接状态的变化包括SDK检测到网络异常, 自动重连已成功, 或已断开连接等, 若要处理, 可在 TODO 标明处修改代码。

    /* MQTT事件回调函数, 当网络连接/重连/断开时被触发, 事件定义见core/aiot_mqtt_api.h */
    void demo_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *const event, void *userdata)
    {
        switch (event->type) {
            /* SDK因为用户调用了aiot_mqtt_connect()接口, 与mqtt服务器建立连接已成功 */
            case AIOT_MQTTEVT_CONNECT: {
                printf("AIOT_MQTTEVT_CONNECT\n");
                /* TODO: 处理SDK建连成功, 不可以在这里调用耗时较长的阻塞函数 */
            }
            break;
    
            /* SDK因为网络状况被动断连后, 自动发起重连已成功 */
            case AIOT_MQTTEVT_RECONNECT: {
                printf("AIOT_MQTTEVT_RECONNECT\n");
                /* TODO: 处理SDK重连成功, 不可以在这里调用耗时较长的阻塞函数 */
            }
            break;
    
            /* SDK因为网络的状况而被动断开了连接, network是底层读写失败, heartbeat是没有按预期得到服务端心跳应答 */
            case AIOT_MQTTEVT_DISCONNECT: {
                char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") :
                              ("heartbeat disconnect");
                printf("AIOT_MQTTEVT_DISCONNECT: %s\n", cause);
                /* TODO: 处理SDK被动断连, 不可以在这里调用耗时较长的阻塞函数 */
            }
            break;
    
            default: {
    
            }
        }
    }
  • 主循环进入休眠

    主线程的任务就是配置连接参数, 建立连接成功, 之后的业务循环主要由接收线程工作完成, 因此接收线程运行后, 主线程可进入休眠。

    /* 主循环进入休眠 */
        while (1) {
            sleep(1);
        }