全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

MQTT客户端发送顺序消息

更新时间:2017-09-14 19:37:28

本文主要介绍如何使用 MQTT 客户端发送顺序消息。

注意:

本文给出的实例均基于 Eclipse Paho Java SDK 实现,其他语言和平台的实现方法类似,请自行对照修改。

1. Topic 申请

MQTT 协议的消息目前支持仅支持顺序发送,即可以保证每个使用 MQTT 协议的客户端的消息发送是顺序的,但并不保证消息接收是顺序的。举例而言,物联网领域里只需要保证每个传感器采集的数据是顺序上传即可。消息的处理方,即服务端应用可以使用 MQ 协议的 SDK 来做顺序消费。

使用 MQTT 发顺序消息,首先需要保证 topic 本身是支持顺序的,即 topic 的申请必须选择分区顺序或者全局顺序类型。更多信息请参考《接入准备》一章。

注意:使用普通 topic 发送,即使程序用法正确,也无法保证顺序性。其次,MQTT服务仅保证消息的发送是顺序的,如果需要顺序消费,请使用 MQ 协议的顺序客户端消费。

2. 顺序发送示例

使用 MQTT 协议来发送顺序消息,需要在建立连接之后,首先发送控制报文标志客户端即将使用顺序模式发消息,然后再进行正常的业务发送,控制报文每次建立连接时都必须且仅仅需要设置一次。具体参考以下 Demo 程序。

  1. public class MQTTSendMsg {
  2. public static void main(String[] args) throws IOException {
  3. /**
  4. * 设置当前用户私有的MQTT的接入点。例如此处示意使用XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请MQTT实例,每个实例都会分配一个接入点域名。
  5. */
  6. final String broker ="tcp://XXXX.mqtt.aliyuncs.com:1883";
  7. /**
  8. * 设置阿里云的AccessKey,用于鉴权
  9. */
  10. final String acessKey ="XXXXXX";
  11. /**
  12. * 设置阿里云的SecretKey,用于鉴权
  13. */
  14. final String secretKey ="XXXXXXX";
  15. /**
  16. * 发消息使用的一级Topic,需要先在MQ控制台里申请
  17. */
  18. final String topic ="XXXX";
  19. /**
  20. * MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
  21. * 其中GroupID在MQ控制台里申请
  22. * DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
  23. */
  24. final String clientId ="GID_XXX@@@ClientID_XXXX";
  25. String sign;
  26. MemoryPersistence persistence = new MemoryPersistence();
  27. try {
  28. final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  29. final MqttConnectOptions connOpts = new MqttConnectOptions();
  30. System.out.println("Connecting to broker: " + broker);
  31. /**
  32. * 计算签名,将签名作为MQTT的password。
  33. * 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
  34. * 第二个参数阿里云的SecretKey
  35. */
  36. sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
  37. connOpts.setUserName(acessKey);
  38. connOpts.setServerURIs(new String[] { broker });
  39. connOpts.setPassword(sign.toCharArray());
  40. connOpts.setCleanSession(true);
  41. connOpts.setKeepAliveInterval(90);
  42. connOpts.setAutomaticReconnect(true);
  43. sampleClient.setCallback(new MqttCallbackExtended() {
  44. public void connectComplete(boolean reconnect, String serverURI) {
  45. System.out.println("connect success");
  46. //连接成功,需要上传客户端的顺序控制报文
  47. JSONObject object = new JSONObject();
  48. object.put("order", "true");//设置顺序发送的标记
  49. MqttMessage message = new MqttMessage(object.toJSONString().getBytes());
  50. message.setQos(1);
  51. sampleClient.publish("$SYS/enableOrderMsg", message);
  52. }
  53. public void connectionLost(Throwable throwable) {
  54. System.out.println("mqtt connection lost");
  55. }
  56. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  57. System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
  58. }
  59. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  60. System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  61. }
  62. });
  63. sampleClient.connect(connOpts);
  64. for (int i = 0; i < 10; i++) {
  65. try {
  66. String scontent = new Date()+"MQTT Test body" + i;
  67. //此处消息体只需要传入byte数组即可,对于其他类型的消息,请自行完成二进制数据的转换。
  68. final MqttMessage message = new MqttMessage(scontent.getBytes());
  69. message.setQos(0);
  70. System.out.println(i+" pushed at "+new Date()+" "+ scontent);
  71. /**
  72. *消息发送到某个主题Topic,所有订阅这个Topic的设备都能收到这个消息。
  73. * 遵循MQTT的发布订阅规范,Topic也可以是多级Topic。此处设置了发送到二级Topic。
  74. */
  75. sampleClient.publish(topic+"/notice/", message);
  76. /**
  77. * 如果发送P2P消息,二级Topic必须是“p2p”,三级Topic是目标的ClientID。
  78. * 此处设置的三级Topic需要是接收方的ClientID。
  79. */
  80. String p2pTopic =topic+"/p2p/GID_mqttdelay3@@@DEVICEID_001";
  81. sampleClient.publish(p2pTopic,message);
  82. } catch (Exception e) {
  83. e.printStackTrace();
  84. }
  85. }
  86. } catch (Exception me) {
  87. me.printStackTrace();
  88. }
  89. }
  90. }
本文导读目录