使用示例

本文以C Link SDK中的Demo文件为例,介绍如何调用Link SDK的API,将MQTT协议的设备接入物联网平台并进行消息收发。

背景信息

MQTT接入的更多信息,请参见MQTT接入概述

物联网平台提供的C Link SDK中,非云网关设备接入的Demo文件为./mqtt_basic_demo.c,云网关设备接入的Demo文件为./mqtt_userdefine_demo.c

步骤一:初始化

  1. 添加头文件。

    #include "aiot_state_api.h"
    #include "aiot_sysdep_api.h"
    #include "aiot_mqtt_api.h"
  2. 配置底层依赖和日志输出。

        aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
        aiot_state_set_logcb(demo_state_logcb);
  3. 调用aiot_mqtt_init,创建MQTT客户端实例,并初始化默认参数。

        mqtt_handle = aiot_mqtt_init();
        if (mqtt_handle == NULL) {
            printf("aiot_mqtt_init failed\n");
            return -1;
        }

步骤二:配置功能

更多功能的配置项,请参见aiot_mqtt_option_t

  1. 配置连接参数。

    非云网关设备

    • 示例代码:

      
          /* TODO: 替换为自己设备的认证信息。 */
          char *product_key       = "a18wP******";
          char *device_name       = "LightSwitch";
          char *device_secret     = "uwMTmVAMnGGHaAkqmeDY6cHxxB******";
          /* TODO: 替换为自己设备的接入域名。 */
          char  *mqtt_host = "iot-06z00ax1o******.mqtt.iothub.aliyuncs.com";
          ...
          ...
      
          /* 以下参数为可选,您可使用SDK文件中默认内容。 */
          /* 配置MQTT服务器地址。 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)url);
          /* 配置MQTT服务器端口。 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port);
          /* 配置设备ProductKey。 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key);
          /* 配置设备DeviceName。 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name);
          /* 配置设备DeviceSecret。 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret);
          /* 配置网络连接的安全凭据。 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
                                                                  
    • 相关参数:

      参数

      说明

      mqtt_host

      设备的接入域名。

      • 企业版实例和新版公共实例:在实例详情页面的开发配置面板,查看接入域名。

      • 旧版公共实例:接入域名格式为${YourProductKey}.iot-as-mqtt.${YourRegionId}.aliyuncs.com

      新旧版公共实例和企业版实例、以及接入域名的更多信息,请参见查看实例终端节点

      product_key

      设备认证信息。具体信息,请参见获取设备认证信息

      本示例代码的身份认证方式为一机一密。

      device_name

      device_secret

    • MQTT保活说明:

      重要
      • 设备端在保活时间间隔内,至少需要发送一次报文,包括ping请求。

      • 从物联网平台发送CONNACK响应CONNECT消息时,开始心跳计时。收到PUBLISH、SUBSCRIBE、PING或 PUBACK消息时,会重置计时器。物联网平台每隔30秒定时检测一次设备的保活心跳,设备上线时间点距离最新定时检测时间点的时间,是定时检测的等待时间。定义最大超时时间为:保活心跳时间*1.5+定时检测的等待时间。超过最大超时时间未收到设备消息,服务器会自动断开连接。

      C Link SDK具备保活能力,您可以设置以下配置项,自定义设备连接的保活心跳。如果不配置,则取默认值。

      配置项

      默认值

      说明

      AIOT_MQTTOPT_HEARTBEAT_MAX_LOST

      2

      可容忍的心跳丢失阈值。即:心跳请求报文达到设置的次数后,发起重连。

      AIOT_MQTTOPT_HEARTBEAT_INTERVAL_MS

      25,000

      每次发起重连之间的间隔时间。单位毫秒, 取值范围:1,000~1,200,000。

      AIOT_MQTTOPT_KEEPALIVE_SEC

      1,200

      可容忍的心跳丢失时间阈值。即:失去心跳后,设置的时间内,允许发起重连。单位秒,取值范围:30~1,200。建议取值大于300。

    云网关设备

    • 示例代码:

      /* TODO: 替换为自己设备的设备信息、接入地址(mqtt_host)、端口(port)、证书(user_ca_cert) */
      char *username          = "LightSwitch";
      char *password          = "*******";
      char *client_id         = "client_*******";
      char *mqtt_host         = "iot-******.igw.iothub.aliyuncs.com";
      char *product_key       = "*******";
      uint16_t port           = 1883;
      
      const char *user_ca_cert = \
      {
          "-----BEGIN CERTIFICATE-----\r\n" \
          "MIIC4jCCAco*************************************MQswCQYDVQQGEwJD\r\n" \
          "TjETMBEGA1U*************************************IENBMB4XDTIyMTIw\r\n" \
          "MjE0NDk1Mlo*************************************Q04xEzARBgNVBAoM\r\n" \
          "CkFsaXl1biB*************************************KoZIhvcNAQEBBQAD\r\n" \
          "ggEPADCCAQo*************************************2VaEUrnXNoO40w71\r\n" \
          "i3l4Alchs1M*************************************VVxRGEtybsIH8CYO\r\n" \
          "kyzGgOKbx7M*************************************49l3opCIfg9LOwjF\r\n" \
          "R6x+ZY6yGdv*************************************CDxW2mILl+VwYd9s\r\n" \
          "2udrJ7riJ5i*************************************/P9s+2UaBX89TTUd\r\n" \
          "lYWKe3tHRg+*************************************BgkqhkiG9w0BAQsF\r\n" \
          "AAOCAQEAhr8*************************************odRVrUaVBBLguSAH\r\n" \
          "OZmtwUy0ZUf*************************************aJ27G4prMD2DMoby\r\n" \
          "uTbXKOPYCvT*************************************5smSmfXIrBrbGrG6\r\n" \
          "0CL1Y8DTNlF*************************************TjixfpGS3C/Ogu0H\r\n" \
          "uMMbA4RCFhF*************************************X0VprxMrDarUJAa1\r\n" \
          "tJ************************Iw==\r\n" \
          "-----END CERTIFICATE-----\r\n" \
      };
          
          ...
          ...
            
          /* 配置MQTT服务器地址 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)mqtt_host);
          /* 配置MQTT服务器端口 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port);
          /* 配置设备username */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERNAME, (void *)username);
          /* 配置设备password */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PASSWORD, (void *)password);
          /* 配置设备client_id */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_CLIENTID, (void *)client_id);
          /* 配置设备productKey */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key);
          /* 配置设备deviceName */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)username);
          /* 配置网络连接的安全凭据, 上面已经创建好了 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
          /* 配置MQTT默认消息接收回调函数 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler);
          /* 配置MQTT事件回调函数 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)demo_mqtt_event_handler);
          /* 关闭对Topic的以'/'为开头的校验 */
          uint8_t topic_check = 0;
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_TOPIC_HEADER_CHECK, (void *)&topic_check); 
    • 相关参数:

      参数

      说明

      product_key

      设备所属云网关产品的ProductKey

      username

      设备认证信息。具体信息,请参见获取云网关设备认证信息

      password

      client_id

      客户端ID,需自定义,长度不可超过64个字符。建议使用设备的MAC地址或SN码,方便您识别区分不同的客户端。

      mqtt_host

      MQTT云网关设备接入地址和端口号(默认为1883)。获取方法,请参见创建云网关产品(MQTT)

      port

      user_ca_cert

      设备根证书root-ca.crt的内容。

    • 取消通信Topic以/开头的校验(AIOT_MQTTOPT_TOPIC_HEADER_CHECK):

          /* 关闭对Topic的以'/'为开头的校验 */
          uint8_t topic_check = 0;
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_TOPIC_HEADER_CHECK, (void *)&topic_check);

      物联网平台与云网关设备之间通过MQTT协议的Topic实现消息通信,通信Topic符合标准MQTT协议的Topic规范即可,无需以/开头。

      MQTT协议云网关设备通信的更多内容,请参见消息通信说明

  2. 配置状态监控和消息回调。

    1. 配置状态监控回调函数。

      • 示例代码:

         int main(int argc, char *argv[])
        {
            ...
            ...
        
            /* 配置MQTT默认消息接收回调函数。 */
            aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler);
            /* 配置MQTT事件回调函数。 */
            aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)demo_mqtt_event_handler);
            ...
            ...
        }
                                      
      • 相关参数:

        配置项

        示例值

        说明

        AIOT_MQTTOPT_RECV_HANDLER

        demo_mqtt_default_recv_handler

        当接收消息时,根据该回调函数定义的处理逻辑,执行对应的处理。

        AIOT_MQTTOPT_EVENT_HANDLER

        demo_mqtt_event_handler

        当设备连接状态发生变化时,根据该回调函数定义的处理逻辑,执行对应的处理。

    2. 定义状态监控的回调函数。

      重要
      • 避免定义过于耗时的事件处理逻辑,以免阻塞收包线程。

      • 连接状态的变化包括网络异常、自动重连已成功、已断开连接等。

      • 如果要根据连接状态的变化做应对处理,可在TODO处,按照需要修改代码。

      /* MQTT事件回调函数, 当网络连接、重连或断开时,触发该函数, 事件定义见core/aiot_mqtt_api.h。 */
      void demo_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event, void *userdata)
      {
          switch (event->type) {
              /* 调用了aiot_mqtt_connect()接口, 与MQTT服务器建立连接。 */
              case AIOT_MQTTEVT_CONNECT: {
                  printf("AIOT_MQTTEVT_CONNECT\n");
                  /* TODO: 处理SDK建立连接成功, 不可在此调用耗时较长的阻塞函数。 */
              }
              break;
      
              /* SDK因网络状况被动断开连接后, 成功自动发起重连。 */
              case AIOT_MQTTEVT_RECONNECT: {
                  printf("AIOT_MQTTEVT_RECONNECT\n");
                  /* TODO: 处理SDK重连成功, 不可在此调用耗时较长的阻塞函数。 */
              }
              break;
      
              /* SDK因网络状况被动断开了连接, network底层读写失败, heartbeat没有按预期得到服务端心跳应答。 */
              case AIOT_MQTTEVT_DISCONNECT: {
                  char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") :
                                ("heartbeat disconnect");
                  printf("AIOT_MQTTEVT_DISCONNECT: %s\n", cause);
                  /* TODO: 处理SDK被动断开连接, 不可在此调用耗时较长的阻塞函数。 */
              }
              break;
      
              default: {
      
              }
          }
      }
                                      
    3. 定义消息接收的回调函数。

      重要
      • 避免定义过于耗时的事件处理逻辑,以免阻塞收包线程。

      • 如果您要根据接收的消息做应对处理,可在TODO处,按照需要修改代码。

      
      /* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时, 且您未设置对应回调的处理时,以下接口被调用。 */
      void demo_mqtt_default_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata)
      {
          switch (packet->type) {
              case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: {
                  printf("heartbeat response\n");
                  /* TODO: 处理服务器对心跳的回应, 一般不处理。 */
              }
              break;
      
              case AIOT_MQTTRECV_SUB_ACK: {
                  printf("suback, res: -0x%04X, packet id: %d, max qos: %d\n",
                         -packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos);
                  /* TODO: 处理服务器对订阅请求的回应, 一般不处理。 */
              }
              break;
      
              case AIOT_MQTTRECV_PUB: {
                  printf("pub, qos: %d, topic: %.*s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic);
                  printf("pub, payload: %.*s\n", packet->data.pub.payload_len, packet->data.pub.payload);
                  /* TODO: 处理服务器下发的业务报文。 */
              }
              break;
      
              case AIOT_MQTTRECV_PUB_ACK: {
                  printf("puback, packet id: %d\n", packet->data.pub_ack.packet_id);
                  /* TODO: 处理服务器对QoS=1上报消息的回应, 一般不处理。 */
              }
              break;
      
              default: {
      
              }
          }
      }

步骤三:请求连接

调用aiot_mqtt_connect,根据配置连接的参数,向物联网平台,发起连接认证请求。

/* 与服务器建立MQTT连接。 */
    res = aiot_mqtt_connect(mqtt_handle);
    if (res < STATE_SUCCESS) {
        /* 尝试建立连接失败, 销毁MQTT实例, 回收资源。 */
        aiot_mqtt_deinit(&mqtt_handle);
        printf("aiot_mqtt_connect failed: -0x%04X\n", -res);
        printf("please check variables like mqtt_host, produt_key, device_name, device_secret in demo\r\n");
        return -1;
    }

步骤四:开启保活线程

调用aiot_mqtt_process,向服务器发送心跳报文,使设备保持长连接状态,并重发QoS=1的未应答报文。

  1. 开启保活线程。

        res = pthread_create(&g_mqtt_process_thread, NULL, demo_mqtt_process_thread, mqtt_handle);
        if (res < 0) {
            printf("pthread_create demo_mqtt_process_thread failed: %d\n", res);
            return -1;
        }
  2. 设置保活线程处理函数。

    void *demo_mqtt_process_thread(void *args)
    {
        int32_t res = STATE_SUCCESS;
    
        while (g_mqtt_process_thread_running) {
            res = aiot_mqtt_process(args);
            if (res == STATE_USER_INPUT_EXEC_DISABLED) {
                break;
            }
            sleep(1);
        }
        return NULL;
    }

步骤五:开启接收线程

调用aiot_mqtt_recv,收取服务器下发的MQTT消息,根据消息回调函数,执行对应处理。在断线时自动重连,根据事件回调函数,执行对应处理。

  1. 开启接收线程。

    
        res = pthread_create(&g_mqtt_recv_thread, NULL, demo_mqtt_recv_thread, mqtt_handle);
        if (res < 0) {
            printf("pthread_create demo_mqtt_recv_thread failed: %d\n", res);
            return -1;
        }
                                        
  2. 设置接收线程处理函数。

    void *demo_mqtt_recv_thread(void *args)
    {
        int32_t res = STATE_SUCCESS;
    
        while (g_mqtt_recv_thread_running) {
            res = aiot_mqtt_recv(args);
            if (res < STATE_SUCCESS) {
                if (res == STATE_USER_INPUT_EXEC_DISABLED) {
                    break;
                }
                sleep(1);
            }
        }
        return NULL;
    }

步骤六:订阅Topic

调用aiot_mqtt_sub,订阅指定Topic。

  • 示例代码:

        {
            char *sub_topic = "/a18wP******/LightSwitch/user/get";
    
            res = aiot_mqtt_sub(mqtt_handle, sub_topic, NULL, 1, NULL);
            if (res < 0) {
                printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res);
                return -1;
            }
        }
    说明

    完成配置后,请删除相关代码两边的注释符号。

  • 相关参数:

    参数

    示例

    说明

    sub_topic

    /a18wP******/LightSwitch/user/get

    拥有订阅权限的Topic。

    • a18wP******为设备的ProductKey。

    • LightSwitch为设备的DeviceName。

    本示例为默认的自定义Topic。

    设备通过该Topic,可接收物联网平台的消息。

    关于Topic的更多信息,请参见什么是Topic

步骤七:发送消息

调用aiot_mqtt_pub,向指定Topic发送消息。

  • 示例代码:

         {
            char *pub_topic = "/a18wP******/LightSwitch/user/update";
            char *pub_payload = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}";
    
            res = aiot_mqtt_pub(mqtt_handle, pub_topic, (uint8_t *)pub_payload, (uint32_t)strlen(pub_payload), 0);
            if (res < 0) {
                printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res);
                return -1;
            }
        }
    说明

    完成配置后,请删除相关代码两边的注释符号。

  • 相关参数:

    参数

    示例

    说明

    pub_topic

    /a18wP******/LightSwitch/user/update

    拥有发布权限的Topic。

    • a18wP******为设备的ProductKey。

    • LightSwitch为设备的DeviceName。

    设备通过该Topic向物联网平台发送消息。

    关于Topic的更多信息,请参见什么是Topic

    pub_payload

    {\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}

    上报至物联网平台的消息内容。

    由于示例消息的Topic类型为自定义,因此数据格式可自定义。

    关于数据格式的更多信息,请参见数据格式

设备与物联网平台建立MQTT通信后,请确保通信量不超过阈值。

步骤八:断开连接

说明

MQTT接入常应用于长连接的设备,程序通常不会运行至此。

例程的主线程任务为配置参数并成功建立连接。连接建立后,主线程可进入休眠。

调用aiot_mqtt_disconnect,向物联网平台发送断开连接的报文,然后断开网络连接。

    res = aiot_mqtt_disconnect(mqtt_handle);
    if (res < STATE_SUCCESS) {
        aiot_mqtt_deinit(&mqtt_handle);
        printf("aiot_mqtt_disconnect failed: -0x%04X\n", -res);
        return -1;
    }

步骤九:退出程序

调用aiot_mqtt_deinit,销毁MQTT客户端实例,释放资源。

    res = aiot_mqtt_deinit(&mqtt_handle);
    if (res < STATE_SUCCESS) {
        printf("aiot_mqtt_deinit failed: -0x%04X\n", -res);
        return -1;
    }

后续步骤

  • 例程文件配置完成后,需进行编译,生成可执行文件:

    • 非云网关设备:./output/mqtt-basic-demo

    • 云网关设备:./output/mqtt-userdefine-demo

      更多信息,请参见编译与运行

  • 关于运行结果的详细说明,请参见运行日志