全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
消息队列 MQ

C 接入示例

更新时间:2018-03-07 20:44:29

本文主要介绍如何使用 C 客户端收发 MQTT 消息,并给出示例代码供前期开发测试参考。

注意:

本文给出的实例均基于 Eclipse Paho C SDK 实现,该 SDK 下载请参见 MQTT 接入准备。如使用其他第三方的客户端,请适当修改。

1. 资源创建

使用 MQ 提供的 MQTT 服务,首先需要核实应用中使用的 Topic 资源是否已经创建,如果没有请先去控制台创建 Topic,Group ID 等资源。

创建资源时需要根据需求选择对应的 Region,例如,MQTT 需要使用华北 2 的接入点,那么 Topic 等资源就在华北 2 创建,资源创建具体请参见 创建资源

注意:MQTT 使用的多级子 Topic 不需要创建,代码里直接使用即可,没有限制。

2. C 客户端依赖

Paho MQTT C 客户端 SDK 从官网下载源码后,参考文档进行编译安装。可能需要安装以下开发库和工具。

  1. openssl:用于签名计算。
  2. cmake:用于编译示例程序,实际也可以根据习惯选用其他编译工具。

3. MQTT 收发消息

本段示例代码演示如何使用 C 客户端收发 MQTT 消息,其中编译工具使用 CMake。

CMakeLists.txt 文件

  1. cmake_minimum_required(VERSION 3.7)
  2. project(mqttdemo)
  3. INCLUDE_DIRECTORIES(
  4. .
  5. ${CMAKE_SOURCE_DIR}/src
  6. ${CMAKE_BINARY_DIR}
  7. )
  8. SET(CMAKE_BUILD_TYPE "Debug")
  9. SET(CMAKE_CXX_FLAGS_DEBUG "$ENV{CXXFLAGS} -O0 -Wall -g2 -ggdb")
  10. SET(CMAKE_CXX_FLAGS_RELEASE "$ENV{CXXFLAGS} -O3 -Wall")
  11. set(CMAKE_C_STANDARD 99)
  12. add_executable(mqttDemo mqttDemo.c)
  13. TARGET_LINK_LIBRARIES(mqttDemo crypto)
  14. TARGET_LINK_LIBRARIES(mqttDemo paho-mqtt3as)

收发消息程序:

  1. #include "MQTTAsync.h"
  2. #include <signal.h>
  3. #include <memory.h>
  4. #include <stdlib.h>
  5. #include <unistd.h>
  6. #include <openssl/hmac.h>
  7. #include <openssl/bio.h>
  8. volatile int connected = 0;
  9. char *topic;
  10. char *userName;
  11. char *passWord;
  12. int messageDeliveryComplete(void *context, MQTTAsync_token token) {
  13. /* not expecting any messages */
  14. printf("send message %d success\n", token);
  15. return 1;
  16. }
  17. int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *m) {
  18. /* not expecting any messages */
  19. printf("recv message from %s ,body is %s\n", topicName, (char *) m->payload);
  20. MQTTAsync_freeMessage(&m);
  21. MQTTAsync_free(topicName);
  22. return 1;
  23. }
  24. void onConnectFailure(void *context, MQTTAsync_failureData *response) {
  25. connected = 0;
  26. printf("connect failed, rc %d\n", response ? response->code : -1);
  27. MQTTAsync client = (MQTTAsync) context;
  28. }
  29. void onSubcribe(void *context, MQTTAsync_successData *response) {
  30. printf("subscribe success \n");
  31. }
  32. void onConnect(void *context, MQTTAsync_successData *response) {
  33. connected = 1;
  34. printf("connect success \n");
  35. MQTTAsync client = (MQTTAsync) context;
  36. //do sub when connect success
  37. MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer;
  38. sub_opts.onSuccess = onSubcribe;
  39. int rc = 0;
  40. if ((rc = MQTTAsync_subscribe(client, topic, 1, &sub_opts)) != MQTTASYNC_SUCCESS) {
  41. printf("Failed to subscribe, return code %d\n", rc);
  42. }
  43. }
  44. void onDisconnect(void *context, MQTTAsync_successData *response) {
  45. connected = 0;
  46. printf("connect lost \n");
  47. }
  48. void onPublishFailure(void *context, MQTTAsync_failureData *response) {
  49. printf("Publish failed, rc %d\n", response ? -1 : response->code);
  50. }
  51. int success = 0;
  52. void onPublish(void *context, MQTTAsync_successData *response) {
  53. printf("send success %d\n", ++success);
  54. }
  55. void connectionLost(void *context, char *cause) {
  56. connected = 0;
  57. MQTTAsync client = (MQTTAsync) context;
  58. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  59. int rc = 0;
  60. printf("Connecting\n");
  61. conn_opts.keepAliveInterval = 10;
  62. conn_opts.cleansession = 1;
  63. conn_opts.username = userName;
  64. conn_opts.password = passWord;
  65. conn_opts.onSuccess = onConnect;
  66. conn_opts.onFailure = onConnectFailure;
  67. conn_opts.context = client;
  68. //如果使用加密ssl的方式,此处需要初始化,否则设置为NULL
  69. conn_opts.ssl = NULL;
  70. //MQTTAsync_SSLOptions ssl =MQTTAsync_SSLOptions_initializer;
  71. //conn_opts.ssl = &ssl;
  72. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
  73. printf("Failed to start connect, return code %d\n", rc);
  74. exit(EXIT_FAILURE);
  75. }
  76. }
  77. int main(int argc, char **argv) {
  78. MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  79. MQTTAsync client;
  80. //MQTT 使用的 Topic,其中第一级父 Topic 需要在 MQ 控制台先创建
  81. topic = "XXXXX";
  82. //MQTT 的接入域名,在 MQ 控制台购买付费实例后即分配该域名,如果是使用加密ssl的方式,域名格式是ssl://XXX.mqtt.aliyuncs.com
  83. char *host = "XXX.mqtt.aliyuncs.com";
  84. //MQTT 客户端分组 Id,需要先在 MQ 控制台创建
  85. char *groupId = "GID_XXXXXX";
  86. //MQTT 客户端设备 Id,用户自行生成,需要保证对于所有 TCP 连接全局唯一
  87. char *deviceId = "XXXXXXX";
  88. //帐号 AK
  89. char *accessKey = "XXXXXXX";
  90. //帐号 SK
  91. char *secretKey = "XXXXXXX";
  92. //服务端口,使用 MQTT 协议时设置1883,其他协议参考文档选择合适端口,如果是加密ssl的话端口是8883
  93. int port = 1883;
  94. //QoS,消息传输级别,参考文档选择合适的值
  95. int qos = 1;
  96. //cleanSession,是否设置持久会话,如果需要离线消息则 cleanSession 必须是 false,QoS 必须是1
  97. int cleanSession = 0;
  98. int rc = 0;
  99. char tempData[100];
  100. int len = 0;
  101. HMAC(EVP_sha1(), secretKey, strlen(secretKey), groupId, strlen(groupId), tempData, &len);
  102. char resultData[100];
  103. int passWordLen = EVP_EncodeBlock((unsigned char *) resultData, tempData, len);
  104. resultData[passWordLen] = '\0';
  105. printf("passWord is %s", resultData);
  106. userName = accessKey;
  107. passWord = resultData;
  108. //1.create client
  109. MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
  110. create_opts.sendWhileDisconnected = 0;
  111. create_opts.maxBufferedMessages = 10;
  112. char url[100];
  113. sprintf(url, "%s:%d", host, port);
  114. char clientIdUrl[64];
  115. sprintf(clientIdUrl, "%s@@@%s", groupId, deviceId);
  116. rc = MQTTAsync_createWithOptions(&client, url, clientIdUrl, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
  117. rc = MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL);
  118. //2.connect to server
  119. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  120. conn_opts.keepAliveInterval = 90;
  121. conn_opts.cleansession = cleanSession;
  122. conn_opts.username = userName;
  123. conn_opts.password = passWord;
  124. conn_opts.onSuccess = onConnect;
  125. conn_opts.onFailure = onConnectFailure;
  126. conn_opts.context = client;
  127. //如果使用加密ssl的方式,此处需要初始化,否则设置为NULL
  128. conn_opts.ssl = NULL;
  129. //MQTTAsync_SSLOptions ssl =MQTTAsync_SSLOptions_initializer;
  130. //conn_opts.ssl = &ssl;
  131. conn_opts.automaticReconnect = 1;
  132. conn_opts.connectTimeout = 3;
  133. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
  134. printf("Failed to start connect, return code %d\n", rc);
  135. exit(EXIT_FAILURE);
  136. }
  137. //3.publish msg
  138. MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
  139. pub_opts.onSuccess = onPublish;
  140. pub_opts.onFailure = onPublishFailure;
  141. for (int i = 0; i < 1000; i++) {
  142. do {
  143. char data[100];
  144. sprintf(data, "hello mqtt demo");
  145. rc = MQTTAsync_send(client, topic, strlen(data), data, qos, 0, &pub_opts);
  146. sleep(1);
  147. } while (rc != MQTTASYNC_SUCCESS);
  148. }
  149. sleep(1000);
  150. disc_opts.onSuccess = onDisconnect;
  151. if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
  152. printf("Failed to start disconnect, return code %d\n", rc);
  153. exit(EXIT_FAILURE);
  154. }
  155. while (connected)
  156. sleep(1);
  157. MQTTAsync_destroy(&client);
  158. return EXIT_SUCCESS;
  159. }
本文导读目录