本文以C Link SDK中的Demo文件./demos/mqtt_x509_auth_demo.c为例,介绍使用X.509证书,将MQTT协议的设备接入物联网平台并进行消息收发。

背景信息

步骤一:初始化

  1. 添加头文件。
    #include "aiot_state_api.h"
    #include "aiot_sysdep_api.h"
    #include "aiot_mqtt_api.h"
  2. 配置底层依赖和日志输出。
        aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
        aiot_state_set_logcb(demo_state_logcb);
  3. 调用aiot_mqtt_init,创建MQTT客户端实例,并初始化默认参数。
        mqtt_handle = aiot_mqtt_init();
        if (mqtt_handle == NULL) {
            printf("aiot_mqtt_init failed\n");
            return -1;
        }

步骤二:配置功能

调用aiot_mqtt_setopt,配置以下功能。

  1. 配置连接参数
  2. 配置设备身份信息的回调
  3. 配置状态监控和消息回调

更多功能的配置项,请参见aiot_mqtt_option_t

  1. 配置连接参数。
    • 示例代码:
      const char client_cert[] = {
          "-----BEGIN CERTIFICATE-----\r\n"
          "MIIDiDCCAnCgAwIBAgIIAJ3GD7c2860wDQYJKoZIhvcNAQELBQAwUzEoMCYGA1UE\r\n"
          … 
          …
          "v4aDacYavCH03JXKQ6zWpAwnwLcYrbW7XdhtDrqFCj+v6VJ6NDZaTGEW3/I=\r\n"
          "-----END CERTIFICATE-----\r\n"
      };
      
      const char client_private_key[] = {
      
          "-----BEGIN RSA PRIVATE KEY-----\r\n"
          "MIIEowIBAAKCAQEApyRaelm4b4sKOlqBywOIR4RIJrYEfNtYIAofMIkkwnClrqgh\r\n"
          …    
          …
          "mPw5JEAkNBy6wOWepJ9Tv1wY8yFEzV2dVsx3P93p5P3UdZb4M7i0\r\n"
          "-----END RSA PRIVATE KEY-----\r\n"
      };
          ...
      int main(int argc, char *argv[])
      {
          int32_t     res = STATE_SUCCESS;
          void       *mqtt_handle = NULL;
          char       *host = "x509.itls.cn-shanghai.aliyuncs.com";
      
          uint16_t    port = 1883; 
          aiot_sysdep_network_cred_t cred; 
      
          char *product_key       = "";
          char *device_name       = "";
          char *device_secret     = "";
      
          ...
          /* 安全凭据结构体, 如果要用TLS, 这个结构体中配置CA证书等参数 */
          aiot_sysdep_network_cred_t cred;
      
          /* 创建SDK的安全凭据, 用于建立TLS连接 */
          memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
          cred.option = AIOT_SYSDEP_NETWORK_CRED_SVRCERT_CA;  /* 使用RSA证书校验MQTT服务端 */
          cred.max_tls_fragment = 16384; /* 最大的分片长度为16 KB, 其它可选值还有4 KB、2 KB、1 KB和0.5 KB */
          cred.sni_enabled = 1;                               /* TLS建连时, 支持Server Name Indicator */
          cred.x509_server_cert = ali_ca_crt;                 /* 用来验证MQTT服务端的RSA根证书 */
          cred.x509_server_cert_len = strlen(ali_ca_crt);     /* 用来验证MQTT服务端的RSA根证书长度 */
      
          /* TODO: 请注意以下四行代码, 使用X509双向认证时, 对安全凭据的设置就只要增加这一部分 */
          cred.x509_client_cert = client_cert;
          cred.x509_client_cert_len = strlen(client_cert);
          cred.x509_client_privkey = client_private_key;
          cred.x509_client_privkey_len = strlen(client_private_key);
      
          /* 配置网络连接的安全凭据 */
          aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
      
          ...
      }
    • 相关参数:
      参数示例说明
      client_cert[]
          "-----BEGIN CERTIFICATE-----\r\n"
          "MIIDiDCCAnCgAwIBAgIIAJ3GD7c2860wDQYJKoZIhvcNAQELBQAwUzEoMCYGA1UE\r\n"
          … 
          …
          "v4aDacYavCH03JXKQ6zWpAwnwLcYrbW7XdhtDrqFCj+v6VJ6NDZaTGEW3/I=\r\n"
          "-----END CERTIFICATE-----\r\n"
      设备的X.509证书信息。

      在物联网平台的设备详情页,单击X.509证书对应的下载按钮,下载证书信息。解压证书文件后,以示例值的格式,将该参数的值替换为.cer文件中的信息。

      证书内容由多行字符串组成。示例中的省略号(…)表示省去的字符串,每一行字符串的开头加",结尾加\r\n"

      client_private_key[]
          "-----BEGIN RSA PRIVATE KEY-----\r\n"
          "MIIEowIBAAKCAQEApyRaelm4b4sKOlqBywOIR4RIJrYEfNtYIAofMIkkwnClrqgh\r\n"
          … 
          …
          "mPw5JEAkNBy6wOWepJ9Tv1wY8yFEzV2dVsx3P93p5P3UdZb4M7i0\r\n"
          "-----END RSA PRIVATE KEY-----\r\n"
      设备的X.509证书密钥。

      在物联网平台的设备详情页,单击X.509证书对应的下载按钮,下载证书信息。解压证书文件后,以示例值的格式,将该参数的值替换为.key文件中的信息。

      证书密钥内容由多行字符串组成。示例中的省略号(…)表示省去的字符串,每一行字符串的开头加",结尾加\r\n"

      hostx509.itls.cn-shanghai.aliyuncs.com设备的接入域名。其格式为x509.itls.${YourRegionId}.aliyuncs.com

      其中,${YourRegionId}为设备接入的地域ID。更多信息,请参见地域和可用区

      port1883端口号。
      product_key""X.509认证无需配置这三个参数,请确保其值为空。
      device_name""
      device_secret""
  2. 配置设备身份信息的回调。
    设备使用X.509证书,与物联网平台建立连接后,物联网平台将含有设备ProductKeyDeviceName的消息发送至设备。因此,您需提前配置用于接收该消息的回调函数。根据定义的回调函数,将这两个参数保存至指定位置,以便后续使用。

    您需自定义函数demo_get_device_info,以下示例代码对该消息做数据解析和打印处理。

    • 示例代码:
      
      static void demo_get_device_info(const char *topic, uint16_t topic_len, const char *payload, uint32_t payload_len)
      {
          const char *target_topic = "/ext/auth/identity/response";
          char *p_product_key = NULL;
          uint32_t product_key_len = 0;
          char *p_device_name = NULL;
          uint32_t device_name_len = 0;
          int32_t res = STATE_SUCCESS;
      
          if (topic_len != strlen(target_topic) || memcmp(topic, target_topic, topic_len) != 0) {
              return;
          }
      
          /* TODO: 为了便于演示, 此处使用了SDK内部接口core_json_value(),该接口仅供演示使用。
      
                   实际使用时, 需要换成设备上可用的JSON解析函数库的接口,以处理Payload, 例如cJSON等
          */
          res = core_json_value(payload, payload_len, "productKey", strlen("productKey"), &p_product_key, &product_key_len);
          if (res < 0) {
              return;
          }
          res = core_json_value(payload, payload_len, "deviceName", strlen("deviceName"), &p_device_name, &device_name_len);
          if (res < 0) {
              return;
          }
      
          if (g_product_key == NULL) {
              g_product_key = malloc(product_key_len + 1);
              if (NULL == g_product_key) {
                  return;
              }
      
              memset(g_product_key, 0, product_key_len + 1);
              memcpy(g_product_key, p_product_key, product_key_len);
          }
          if (g_device_name == NULL) {
              g_device_name = malloc(device_name_len + 1);
              if (NULL == g_product_key) {
                  return;
              }
      
              memset(g_device_name, 0, device_name_len + 1);
              memcpy(g_device_name, p_device_name, device_name_len);
          }
      
          printf("device productKey: %s\r\n", g_product_key);
          printf("device deviceName: %s\r\n", g_device_name);
      }
    • 相关说明:

      物联网平台通过Topic /ext/auth/identity/response下发设备的ProductKeyDeviceNamePayload格式为:

      {
          "productKey":"***",
          "deviceName":"***"
      }
  3. 配置状态监控和消息回调。
    1. 配置状态监控回调函数。
      • 示例代码:
         int main(int argc, char *argv[])
        {
            ...
            ...
        
            /* 配置MQTT默认消息接收回调函数。 */
            aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler);
            /* 配置MQTT事件回调函数。 */
            aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)demo_mqtt_event_handler);
            ...
            ...
        }
                                      
      • 相关参数:
        配置项示例值说明
        AIOT_MQTTOPT_RECV_HANDLERdemo_mqtt_default_recv_handler当接收消息时,根据该回调函数定义的处理逻辑,执行对应的处理。
        AIOT_MQTTOPT_EVENT_HANDLERdemo_mqtt_event_handler当设备连接状态发生变化时,根据该回调函数定义的处理逻辑,执行对应的处理。
    2. 定义状态监控的回调函数。
      重要
      • 请勿将事件的处理逻辑,定义得过于耗时,以免阻塞收包线程。
      • 连接状态的变化包括网络异常、自动重连已成功、已断开连接等。
      • 如果要根据连接状态的变化做应对处理,可在TODO处,按照需要修改代码。

      /* MQTT事件回调函数, 当网络连接、重连或断开时,触发该函数, 事件定义见core/aiot_mqtt_api.h。 */
      void demo_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event, void *userdata)
      {
          switch (event->type) {
              /* 调用了aiot_mqtt_connect()接口, 与MQTT服务器建立连接。 */
              case AIOT_MQTTEVT_CONNECT: {
                  printf("AIOT_MQTTEVT_CONNECT\n");
                  /* TODO: 处理SDK建立连接成功, 不可在此调用耗时较长的阻塞函数。 */
              }
              break;
      
              /* SDK因网络状况被动断开连接后, 成功自动发起重连。 */
              case AIOT_MQTTEVT_RECONNECT: {
                  printf("AIOT_MQTTEVT_RECONNECT\n");
                  /* TODO: 处理SDK重连成功, 不可在此调用耗时较长的阻塞函数。 */
              }
              break;
      
              /* SDK因网络状况被动断开了连接, network底层读写失败, heartbeat没有按预期得到服务端心跳应答。 */
              case AIOT_MQTTEVT_DISCONNECT: {
                  char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") :
                                ("heartbeat disconnect");
                  printf("AIOT_MQTTEVT_DISCONNECT: %s\n", cause);
                  /* TODO: 处理SDK被动断开连接, 不可在此调用耗时较长的阻塞函数。 */
              }
              break;
      
              default: {
      
              }
          }
      }
                                      
    3. 定义消息接收的回调函数。
      重要
      • 请勿将消息的处理逻辑,定义得过于耗时,以免阻塞收包线程。
      • 如果您要根据接收的消息做应对处理,可在TODO处,按照需要修改代码。
      
      /* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时, 且您未设置对应回调的处理时,以下接口被调用。 */
      void demo_mqtt_default_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata)
      {
          switch (packet->type) {
              case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: {
                  printf("heartbeat response\n");
                  /* TODO: 处理服务器对心跳的回应, 一般不处理。 */
              }
              break;
      
              case AIOT_MQTTRECV_SUB_ACK: {
                  printf("suback, res: -0x%04X, packet id: %d, max qos: %d\n",
                         -packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos);
                  /* TODO: 处理服务器对订阅请求的回应, 一般不处理。 */
              }
              break;
      
              case AIOT_MQTTRECV_PUB: {
                  printf("pub, qos: %d, topic: %.*s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic);
                  printf("pub, payload: %.*s\n", packet->data.pub.payload_len, packet->data.pub.payload);
                  /* TODO: 处理服务器下发的业务报文。 */
              }
              break;
      
              case AIOT_MQTTRECV_PUB_ACK: {
                  printf("puback, packet id: %d\n", packet->data.pub_ack.packet_id);
                  /* TODO: 处理服务器对QoS=1上报消息的回应, 一般不处理。 */
              }
              break;
      
              default: {
      
              }
          }
      }

步骤三:请求连接

调用aiot_mqtt_connect向物联网平台,发起连接认证请求。

/* 与服务器建立MQTT连接。 */
    res = aiot_mqtt_connect(mqtt_handle);
    if (res < STATE_SUCCESS) {
        /* 尝试建立连接失败, 销毁MQTT实例, 回收资源。 */
        aiot_mqtt_deinit(&mqtt_handle);
        printf("aiot_mqtt_connect failed: -0x%04X\n", -res);
        return -1;
    }

步骤四:开启保活线程

调用aiot_mqtt_process,向服务器发送心跳报文,使设备保持长连接状态,并重发QoS=1的未应答报文。

  1. 开启保活线程。
        res = pthread_create(&g_mqtt_process_thread, NULL, demo_mqtt_process_thread, mqtt_handle);
        if (res < 0) {
            printf("pthread_create demo_mqtt_process_thread failed: %d\n", res);
            return -1;
        }
  2. 设置保活线程处理函数。
    void *demo_mqtt_process_thread(void *args)
    {
        int32_t res = STATE_SUCCESS;
    
        while (g_mqtt_process_thread_running) {
            res = aiot_mqtt_process(args);
            if (res == STATE_USER_INPUT_EXEC_DISABLED) {
                break;
            }
            sleep(1);
        }
        return NULL;
    }

步骤五:开启接收线程

调用aiot_mqtt_recv,收取服务器下发的MQTT消息,根据消息回调函数,执行对应处理。在断线时自动重连,根据事件回调函数,执行对应处理。

  1. 开启接收线程。
    
        res = pthread_create(&g_mqtt_recv_thread, NULL, demo_mqtt_recv_thread, mqtt_handle);
        if (res < 0) {
            printf("pthread_create demo_mqtt_recv_thread failed: %d\n", res);
            return -1;
        }
                                        
  2. 设置接收线程处理函数。
    void *demo_mqtt_recv_thread(void *args)
    {
        int32_t res = STATE_SUCCESS;
    
        while (g_mqtt_recv_thread_running) {
            res = aiot_mqtt_recv(args);
            if (res < STATE_SUCCESS) {
                if (res == STATE_USER_INPUT_EXEC_DISABLED) {
                    break;
                }
                sleep(1);
            }
        }
        return NULL;
    }

步骤六:订阅Topic

调用aiot_mqtt_sub,订阅指定Topic。

  • 示例代码:
        {
            char *sub_topic = "/a18wP******/LightSwitch/user/get";
    
            res = aiot_mqtt_sub(mqtt_handle, sub_topic, NULL, 1, NULL);
            if (res < 0) {
                printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res);
                return -1;
            }
        }
    说明 完成配置后,请删除相关代码两边的注释符号。
  • 相关参数:
    参数示例说明
    sub_topic/a18wP******/LightSwitch/user/get
    拥有订阅权限的Topic。其中:
    • a18wP******为设备的ProductKey。
    • LightSwitch为设备的DeviceName。

    本示例为默认的自定义Topic。

    设备通过该Topic,可接收物联网平台的消息。

    关于Topic的更多信息,请参见什么是Topic

步骤七:发送消息

调用aiot_mqtt_pub,向指定Topic发送消息。

  • 示例代码:
         {
            char *pub_topic = "/a18wP******/LightSwitch/user/update";
            char *pub_payload = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}";
    
            res = aiot_mqtt_pub(mqtt_handle, pub_topic, (uint8_t *)pub_payload, (uint32_t)strlen(pub_payload), 0);
            if (res < 0) {
                printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res);
                return -1;
            }
        }
    说明 完成配置后,请删除相关代码两边的注释符号。
  • 相关参数:
    参数示例说明
    pub_topic /a18wP******/LightSwitch/user/update拥有发布权限的Topic。其中:
    • a18wP******为设备的ProductKey。
    • LightSwitch为设备的DeviceName。
    设备通过该Topic向物联网平台发送消息。

    关于Topic的更多信息,请参见什么是Topic

    pub_payload{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}上报至物联网平台的消息内容。

    由于示例消息的Topic类型为自定义,因此数据格式可自定义。

    关于数据格式的更多信息,请参见数据格式

设备与物联网平台建立MQTT通信后,请确保通信量不超过阈值

步骤八:断开连接

说明

MQTT接入常应用于长连接的设备,程序通常不会运行至此。

例程的主线程任务为配置参数并成功建立连接。连接建立后,主线程可进入休眠。

调用aiot_mqtt_disconnect,向物联网平台发送断开连接的报文,然后断开网络连接。

    res = aiot_mqtt_disconnect(mqtt_handle);
    if (res < STATE_SUCCESS) {
        aiot_mqtt_deinit(&mqtt_handle);
        printf("aiot_mqtt_disconnect failed: -0x%04X\n", -res);
        return -1;
    }

步骤九:退出程序

调用aiot_mqtt_deinit,销毁MQTT客户端实例,释放资源。
    res = aiot_mqtt_deinit(&mqtt_handle);
    if (res < STATE_SUCCESS) {
        printf("aiot_mqtt_deinit failed: -0x%04X\n", -res);
        return -1;
    }

后续步骤

  • 例程文件配置完成后,需进行编译,生成可执行文件./output/mqtt-x509-auth-demo

    更多信息,请参见编译与运行

  • 关于运行结果的详细说明,请参见运行日志