全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
阿里云物联网套件

远程控制设备并返回结果

更新时间:2017-08-04 14:36:33

使用场景

MQTT协议是基于PUB/SUB的异步通信模式,对于有些场景,服务端发请求给设备端需要同时获得设备端返回结果的不太适用,IoT套件基于MQTT协议做了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。对于服务端我们提供了API,设备端只需要按照固定的格式回复PUB消息,即可将响应同步返回给服务端,具体参考如下代码示例。

Topic格式约定

请求:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}

响应:/sys/${productKey}/${deviceName}/rrpc/response/${messageId}

$表示变量,每个设备不同,messageId为IoT套件生成的消息ID,设备端回复响应时,Topic里的这个值要与请求一致;设备端订阅时可以使用/sys/${productKey}/${deviceName}/rrpc/request/+, 将最末的采用同配符订阅,回复时不可以使用同配符。

服务端实现

  1. RRpcRequest rrpcRequest = new RRpcRequest();
  2. rrpcRequest.setProductKey("..."); //设备所属产品的Key
  3. rrpcRequest.setDeviceName("..."); //设备名称
  4. rrpcRequest.setRequestBase64Byte(Base64.encodeBase64String("{\"action\":\unlock\}".getBytes())); //发给设备的数据,要求二进制数据做一次Base64编码
  5. rrpcRequest.setTimeOut(1000); //超时时间,单位毫秒,如果超过这个时间设备没反应则返回"TIMEOUT"
  6. RRpcResponse rrpcResponse = client.getAcsResponse(rrpcRequest); //得到设备返回的数据信息
  7. System.out.println(rrpcResponse.getPayloadBase64Byte()); //得到的数据是设备返回二进制数据然后再经过Base64编码之后的字符串,需要转换一下才能拿到原始的二进制数据
  8. System.out.println(rrpcResponse.getRrpcCode()); //对应的响应码(UNKNOW/SUCCESS/TIMEOUT/OFFLINE/HALFCONN等)

设备端实现

  1. #define PRODUCT_KEY "..."
  2. #define DEVICE_NAME "..."
  3. #define TOPIC_RRPC_REQ "/sys/"PRODUCT_KEY"/"DEVICE_NAME"/rrpc/request/"
  4. #define TOPIC_RRPC_RSP "/sys/"PRODUCT_KEY"/"DEVICE_NAME"/rrpc/response/"
  5. // 订阅RRPC的Topic
  6. rc = IOT_MQTT_Subscribe(pclient, TOPIC_RRPC_REQ "+", IOTX_MQTT_QOS0, mqtt_rrpc_msg_arrive, NULL);
  7. if (rc < 0) {
  8. IOT_MQTT_Destroy(pclient);
  9. printf("IOT_MQTT_Subscribe() failed, rc = %d\n", rc);
  10. rc = -1;
  11. goto do_exit;
  12. }
  13. // 回复RRPC响应
  14. void mqtt_rrpc_msg_arrive(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
  15. {
  16. iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg;
  17. iotx_mqtt_topic_info_t topic_msg;
  18. char msg_pub[MSG_LEN_MAX] = {0};
  19. char topic[TOPIC_LEN_MAX] = {0};
  20. char msg_id[MSG_ID_LEN_MAX] = {0};
  21. // print topic name and topic message
  22. printf("----\n");
  23. printf("Topic: '%.*s' (Length: %d)\n",
  24. ptopic_info->topic_len,
  25. ptopic_info->ptopic,
  26. ptopic_info->topic_len);
  27. printf("Payload: '%.*s' (Length: %d)\n",
  28. ptopic_info->payload_len,
  29. ptopic_info->payload,
  30. ptopic_info->payload_len);
  31. printf("----\n");
  32. if (snprintf(msg_id,
  33. ptopic_info->topic_len - strlen(TOPIC_RRPC_REQ) + 1,
  34. "%s",
  35. ptopic_info->ptopic + strlen(TOPIC_RRPC_REQ))
  36. > sizeof(msg_id)) {
  37. printf("snprintf error!\n");
  38. return;
  39. }
  40. printf("response msg_id = %s\n", msg_id);
  41. if (snprintf(topic, sizeof(topic), "%s%s", TOPIC_RRPC_RSP, msg_id) > sizeof(topic)) {
  42. printf("snprintf error!\n");
  43. return;
  44. }
  45. printf("response topic = %s\n", topic);
  46. sprintf(msg_pub, "rrpc client has received message!\n");
  47. topic_msg.qos = IOTX_MQTT_QOS0;
  48. topic_msg.retain = 0;
  49. topic_msg.dup = 0;
  50. topic_msg.payload = (void *)msg_pub;
  51. topic_msg.payload_len = strlen(msg_pub);
  52. if (IOT_MQTT_Publish(pclient, topic, &topic_msg) < 0) {
  53. printf("error occur when publish!\n");
  54. }
  55. }
本文导读目录