全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

MQTT 客户端收发 MQTT 消息

更新时间:2017-09-13 18:08:01

本文主要介绍如何使用 MQTT 客户端收发 MQTT 消息,并给出示例代码供前期开发测试参考,包括资源申请、环境准备、示例代码、注意事项等。

注意:

本文给出的实例均基于 Eclipse Paho Java SDK 实现,SDK 下载请参见 MQTT 接入准备。如使用其他第三方的客户端,请适当修改。

1. 资源申请

使用 MQ 提供的 MQTT 服务,首先需要核实应用中使用的 Topic 资源是否已经申请,如果没有,请先去控制台申请 Topic,Group ID 等资源。

申请资源时需要根据需求选择对应的 Region,例如 MQTT 需要使用华北2的接入点,那么 Topic 等资源就在华北2 申请,资源申请具体请参见申请 MQ 资源

注意:MQTT 使用的多级子 Topic 不需要申请,代码里直接使用即可,没有限制。

2. 环境准备

使用 MQTT 协议来收发消息,需要根据应用平台选择合适的客户端。本示例运行在 Java 平台,使用 Eclipse Paho Java SDK 构建。首先引入 Maven 依赖,POM 文件配置如下:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.eclipse.paho</groupId>
  4. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  5. <version>1.1.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>commons-codec</groupId>
  9. <artifactId>commons-codec</artifactId>
  10. <version>1.10</version>
  11. </dependency>
  12. </dependencies>
  13. <repositories>
  14. <repository>
  15. <id>Eclipse Paho Repo</id>
  16. <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
  17. </repository>
  18. <repository>
  19. <id>snapshots-repo</id>
  20. <url>https://oss.sonatype.org/content/repositories/snapshots</url>
  21. <releases>
  22. <enabled>false</enabled>
  23. </releases>
  24. <snapshots>
  25. <enabled>true</enabled>
  26. </snapshots>
  27. </repository>
  28. </repositories>

3. MQTT 发送消息

本段示例代码演示如何使用 MQTT 客户端发送普通消息和 P2P 的点对点消息,其中用到的工具 MacSignature 参考下文。

  1. public class MQTTSendMsg {
  2. public static void main(String[] args) throws IOException {
  3. /**
  4. * 设置当前用户私有的MQTT的接入点。例如此处示意使用XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请MQTT实例,每个实例都会分配一个接入点域名。
  5. */
  6. final String broker ="tcp://XXXX.mqtt.aliyuncs.com:1883";
  7. /**
  8. * 设置阿里云的AccessKey,用于鉴权
  9. */
  10. final String acessKey ="XXXXXX";
  11. /**
  12. * 设置阿里云的SecretKey,用于鉴权
  13. */
  14. final String secretKey ="XXXXXXX";
  15. /**
  16. * 发消息使用的一级Topic,需要先在MQ控制台里申请
  17. */
  18. final String topic ="XXXX";
  19. /**
  20. * MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
  21. * 其中GroupID在MQ控制台里申请
  22. * DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
  23. */
  24. final String clientId ="GID_XXX@@@ClientID_XXXX";
  25. String sign;
  26. MemoryPersistence persistence = new MemoryPersistence();
  27. try {
  28. final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  29. final MqttConnectOptions connOpts = new MqttConnectOptions();
  30. System.out.println("Connecting to broker: " + broker);
  31. /**
  32. * 计算签名,将签名作为MQTT的password。
  33. * 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
  34. * 第二个参数阿里云的SecretKey
  35. */
  36. sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
  37. connOpts.setUserName(acessKey);
  38. connOpts.setServerURIs(new String[] { broker });
  39. connOpts.setPassword(sign.toCharArray());
  40. connOpts.setCleanSession(true);
  41. connOpts.setKeepAliveInterval(90);
  42. connOpts.setAutomaticReconnect(true);
  43. sampleClient.setCallback(new MqttCallbackExtended() {
  44. public void connectComplete(boolean reconnect, String serverURI) {
  45. System.out.println("connect success");
  46. //连接成功,需要上传客户端所有的订阅关系
  47. }
  48. public void connectionLost(Throwable throwable) {
  49. System.out.println("mqtt connection lost");
  50. }
  51. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  52. System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
  53. }
  54. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  55. System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  56. }
  57. });
  58. sampleClient.connect(connOpts);
  59. for (int i = 0; i < 10; i++) {
  60. try {
  61. String scontent = new Date()+"MQTT Test body" + i;
  62. //此处消息体只需要传入byte数组即可,对于其他类型的消息,请自行完成二进制数据的转换
  63. final MqttMessage message = new MqttMessage(scontent.getBytes());
  64. message.setQos(0);
  65. System.out.println(i+" pushed at "+new Date()+" "+ scontent);
  66. /**
  67. *消息发送到某个主题Topic,所有订阅这个Topic的设备都能收到这个消息。
  68. * 遵循MQTT的发布订阅规范,Topic也可以是多级Topic。此处设置了发送到二级Topic
  69. */
  70. sampleClient.publish(topic+"/notice/", message);
  71. /**
  72. * 如果发送P2P消息,二级Topic必须是“p2p”,三级Topic是目标的ClientID
  73. * 此处设置的三级Topic需要是接收方的ClientID
  74. */
  75. String p2pTopic =topic+"/p2p/GID_mqttdelay3@@@DEVICEID_001";
  76. sampleClient.publish(p2pTopic,message);
  77. } catch (Exception e) {
  78. e.printStackTrace();
  79. }
  80. }
  81. } catch (Exception me) {
  82. me.printStackTrace();
  83. }
  84. }
  85. }

4. MQTT 接收消息

本段代码演示如何使用 MQTT 客户端订阅消息,接收普通的消息以及点对点消息。

  1. public class MQTTRecvMsg {
  2. public static void main(String[] args) throws IOException {
  3. /**
  4. * 设置当前用户私有的MQTT的接入点。例如此处示意使用XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请MQTT实例,每个实例都会分配一个接入点域名。
  5. */
  6. final String broker ="tcp://XXXX.mqtt.aliyuncs.com:1883";
  7. /**
  8. * 设置阿里云的AccessKey,用于鉴权
  9. */
  10. final String acessKey ="XXXXXX";
  11. /**
  12. * 设置阿里云的SecretKey,用于鉴权
  13. */
  14. final String secretKey ="XXXXXXX";
  15. /**
  16. * 发消息使用的一级Topic,需要先在MQ控制台里申请
  17. */
  18. final String topic ="XXXX";
  19. /**
  20. * MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
  21. * 其中GroupID在MQ控制台里申请
  22. * DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
  23. */
  24. final String clientId ="GID_XXXX@@@ClientID_XXXXXX";
  25. String sign;
  26. MemoryPersistence persistence = new MemoryPersistence();
  27. try {
  28. final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  29. final MqttConnectOptions connOpts = new MqttConnectOptions();
  30. System.out.println("Connecting to broker: " + broker);
  31. /**
  32. * 计算签名,将签名作为MQTT的password
  33. * 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
  34. * 第二个参数阿里云的SecretKey
  35. */
  36. sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
  37. /**
  38. * 设置订阅方订阅的Topic集合,此处遵循MQTT的订阅规则,可以是一级Topic,二级Topic,P2P消息请订阅/p2p
  39. */
  40. final String[] topicFilters=new String[]{topic+"/notice/",topic+"/p2p"};
  41. final int[]qos={0,0};
  42. connOpts.setUserName(acessKey);
  43. connOpts.setServerURIs(new String[] { broker });
  44. connOpts.setPassword(sign.toCharArray());
  45. connOpts.setCleanSession(true);
  46. connOpts.setKeepAliveInterval(90);
  47. connOpts.setAutomaticReconnect(true);
  48. sampleClient.setCallback(new MqttCallbackExtended() {
  49. public void connectComplete(boolean reconnect, String serverURI) {
  50. System.out.println("connect success");
  51. //连接成功,需要上传客户端所有的订阅关系
  52. sampleClient.subscribe(topicFilters,qos);
  53. }
  54. public void connectionLost(Throwable throwable) {
  55. System.out.println("mqtt connection lost");
  56. }
  57. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  58. System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
  59. }
  60. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  61. System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  62. }
  63. });
  64. //客户端每次上线都必须上传自己所有涉及的订阅关系,否则可能会导致消息接收延迟
  65. sampleClient.connect(connOpts);
  66. //每个客户端最多允许存在30个订阅关系,超出限制可能会丢弃导致收不到部分消息
  67. sampleClient.subscribe(topicFilters,qos);
  68. Thread.sleep(Integer.MAX_VALUE);
  69. } catch (Exception me) {
  70. me.printStackTrace();
  71. }
  72. }
  73. }

上文代码用到的工具类 MacSignature.java 如下:

  1. public class MacSignature {
  2. /**
  3. * @param text 要签名的文本
  4. * @param secretKey 阿里云MQ SecretKey
  5. * @return 加密后的字符串
  6. * @throws InvalidKeyException
  7. * @throws NoSuchAlgorithmException
  8. */
  9. public static String macSignature(String text, String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
  10. Charset charset = Charset.forName("UTF-8");
  11. String algorithm = "HmacSHA1";
  12. Mac mac = Mac.getInstance(algorithm);
  13. mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
  14. byte[] bytes = mac.doFinal(text.getBytes(charset));
  15. return new String(Base64.encodeBase64(bytes), charset);
  16. }
  17. /**
  18. * 发送方签名方法
  19. *
  20. * @param clientId MQTT ClientID
  21. * @param secretKey 阿里云MQ SecretKey
  22. * @return 加密后的字符串
  23. * @throws NoSuchAlgorithmException
  24. * @throws InvalidKeyException
  25. */
  26. public static String publishSignature(String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  27. return macSignature(clientId, secretKey);
  28. }
  29. /**
  30. * 订阅方签名方法
  31. *
  32. * @param topics 要订阅的Topic集合
  33. * @param clientId MQTT ClientID
  34. * @param secretKey 阿里云MQ SecretKey
  35. * @return 加密后的字符串
  36. * @throws NoSuchAlgorithmException
  37. * @throws InvalidKeyException
  38. */
  39. public static String subSignature(List<String> topics, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  40. Collections.sort(topics); //以字典顺序排序
  41. String topicText = "";
  42. for (String topic : topics) {
  43. topicText += topic + "\n";
  44. }
  45. String text = topicText + clientId;
  46. return macSignature(text, secretKey);
  47. }
  48. /**
  49. * 订阅方签名方法
  50. *
  51. * @param topic 要订阅的Topic
  52. * @param clientId MQTT ClientID
  53. * @param secretKey 阿里云MQ SecretKey
  54. * @return 加密后的字符串
  55. * @throws NoSuchAlgorithmException
  56. * @throws InvalidKeyException
  57. */
  58. public static String subSignature(String topic, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  59. List<String> topics = new ArrayList<String>();
  60. topics.add(topic);
  61. return subSignature(topics, clientId, secretKey);
  62. }
  63. }
本文导读目录