使用场景

MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端发送请求给设备端,同时获取设备端返回结果的场景。

物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。

物联网平台提供API给服务端,设备端只需要按照固定的格式回复PUB消息,服务端使用API,即可同步获取设备端的响应结果,具体参考如下代码示例。

Topic格式约定

  • 请求:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}
  • 响应:/sys/${productKey}/${deviceName}/rrpc/response/${messageId}
  • $表示变量,每个设备不同,messageId为物联网平台生成的消息ID,设备端回复响应时,topic里的这个值要与请求一致。
  • 设备端订阅时,可以使用/sys/${productKey}/${deviceName}/rrpc/request/+, 采用通配符+订阅,回复时,不可以使用通配符。

服务端实现


RRpcRequest rrpcRequest = new RRpcRequest();
rrpcRequest.setProductKey("..."); //设备所属产品的Key
rrpcRequest.setDeviceName("..."); //设备名称
rrpcRequest.setRequestBase64Byte(Base64.encodeBase64String("{\"action\":\"unlock\"}".getBytes())); //发给设备的数据,要求二进制数据做一次Base64编码
rrpcRequest.setTimeout(1000); //超时时间,单位毫秒,如果超过这个时间设备没反应则返回"TIMEOUT"
RRpcResponse rrpcResponse = client.getAcsResponse(rrpcRequest); //得到设备返回的数据信息
System.out.println(rrpcResponse.getPayloadBase64Byte()); //得到的数据是设备返回二进制数据然后再经过Base64编码之后的字符串,需要转换一下才能拿到原始的二进制数据
System.out.println(rrpcResponse.getRrpcCode()); //对应的响应码(UNKNOW/SUCCESS/TIMEOUT/OFFLINE/HALFCONN等)

设备端实现


#define PRODUCT_KEY "..."
#define DEVICE_NAME "..."
#define TOPIC_RRPC_REQ "/sys/"PRODUCT_KEY"/"DEVICE_NAME"/rrpc/request/"
#define TOPIC_RRPC_RSP "/sys/"PRODUCT_KEY"/"DEVICE_NAME"/rrpc/response/"
// 订阅RRPC的Topic
rc = IOT_MQTT_Subscribe(pclient, TOPIC_RRPC_REQ "+", IOTX_MQTT_QOS0, mqtt_rrpc_msg_arrive, NULL);
if (rc < 0) {
IOT_MQTT_Destroy(pclient);
printf("IOT_MQTT_Subscribe() failed, rc = %d\n", rc);
rc = -1;
goto do_exit;
}
// 回复RRPC响应
void mqtt_rrpc_msg_arrive(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
{
iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg;
iotx_mqtt_topic_info_t topic_msg;
char msg_pub[MSG_LEN_MAX] = {0};
char topic[TOPIC_LEN_MAX] = {0};
char msg_id[MSG_ID_LEN_MAX] = {0};
// print topic name and topic message
printf("----\n");
printf("Topic: '%.*s' (Length: %d)\n",
ptopic_info->topic_len,
ptopic_info->ptopic,
ptopic_info->topic_len);
printf("Payload: '%.*s' (Length: %d)\n",
ptopic_info->payload_len,
ptopic_info->payload,
ptopic_info->payload_len);
printf("----\n");
if (snprintf(msg_id,
ptopic_info->topic_len - strlen(TOPIC_RRPC_REQ) + 1,
"%s",
ptopic_info->ptopic + strlen(TOPIC_RRPC_REQ))
> sizeof(msg_id)) {
printf("snprintf error!\n");
return;
}
printf("response msg_id = %s\n", msg_id);
if (snprintf(topic, sizeof(topic), "%s%s", TOPIC_RRPC_RSP, msg_id) > sizeof(topic)) {
printf("snprintf error!\n");
return;
}
printf("response topic = %s\n", topic);
sprintf(msg_pub, "rrpc client has received message!\n");
topic_msg.qos = IOTX_MQTT_QOS0;
topic_msg.retain = 0;
topic_msg.dup = 0;
topic_msg.payload = (void *)msg_pub;
topic_msg.payload_len = strlen(msg_pub);
if (IOT_MQTT_Publish(pclient, topic, &topic_msg) < 0) {
printf("error occur when publish!\n");
}
}