全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息队列 MQ

MQTT 客户端收发 MQTT 消息

更新时间:2017-12-15 20:39:22

本文主要介绍如何使用 MQTT 客户端收发 MQTT 消息,并给出示例代码供前期开发测试参考,包括资源申请、环境准备、示例代码、注意事项等。

注意:

本文给出的实例均基于 Eclipse Paho Java SDK 实现,SDK 下载请参见 MQTT 接入准备。如使用其他第三方的客户端,请适当修改。

1. 资源申请

使用 MQ 提供的 MQTT 服务,首先需要核实应用中使用的 Topic 资源是否已经申请,如果没有,请先去控制台申请 Topic,Group ID 等资源。

申请资源时需要根据需求选择对应的 Region,例如 MQTT 需要使用华北2的接入点,那么 Topic 等资源就在华北2 申请,资源申请具体请参见申请 MQ 资源

注意:MQTT 使用的多级子 Topic 不需要申请,代码里直接使用即可,没有限制。

2. 环境准备

使用 MQTT 协议来收发消息,需要根据应用平台选择合适的客户端。本示例运行在 Java 平台,使用 Eclipse Paho Java SDK 构建。首先引入 Maven 依赖,POM 文件配置如下:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.eclipse.paho</groupId>
  4. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  5. <version>1.1.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>commons-codec</groupId>
  9. <artifactId>commons-codec</artifactId>
  10. <version>1.10</version>
  11. </dependency>
  12. </dependencies>
  13. <repositories>
  14. <repository>
  15. <id>Eclipse Paho Repo</id>
  16. <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
  17. </repository>
  18. <repository>
  19. <id>snapshots-repo</id>
  20. <url>https://oss.sonatype.org/content/repositories/snapshots</url>
  21. <releases>
  22. <enabled>false</enabled>
  23. </releases>
  24. <snapshots>
  25. <enabled>true</enabled>
  26. </snapshots>
  27. </repository>
  28. </repositories>

3. MQTT 发送消息

本段示例代码演示如何使用 MQTT 客户端发送普通消息和 P2P 的点对点消息,其中用到的工具 MacSignature 参考下文。

  1. public class MQTTSendMsg {
  2. public static void main(String[] args) throws IOException {
  3. /**
  4. * 设置当前用户私有的 MQTT 的接入点。例如此处示意使用 XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请 MQTT 实例,每个实例都会分配一个接入点域名。
  5. */
  6. final String broker ="tcp://XXXX.mqtt.aliyuncs.com:1883";
  7. /**
  8. * 设置阿里云的 AccessKey,用于鉴权
  9. */
  10. final String acessKey ="XXXXXX";
  11. /**
  12. * 设置阿里云的 SecretKey,用于鉴权
  13. */
  14. final String secretKey ="XXXXXXX";
  15. /**
  16. * 发消息使用的一级 Topic,需要先在 MQ 控制台里申请
  17. */
  18. final String topic ="XXXX";
  19. /**
  20. * MQTT 的 ClientID,一般由两部分组成,GroupID@@@DeviceID
  21. * 其中 GroupID 在 MQ 控制台里申请
  22. * DeviceID 由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的 ClientID 连接
  23. */
  24. final String clientId ="GID_XXX@@@ClientID_XXXX";
  25. String sign;
  26. MemoryPersistence persistence = new MemoryPersistence();
  27. try {
  28. final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  29. final MqttConnectOptions connOpts = new MqttConnectOptions();
  30. System.out.println("Connecting to broker: " + broker);
  31. /**
  32. * 计算签名,将签名作为 MQTT 的 password。
  33. * 签名的计算方法,参考工具类 MacSignature,第一个参数是 ClientID 的前半部分,即 GroupID
  34. * 第二个参数阿里云的 SecretKey
  35. */
  36. sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
  37. connOpts.setUserName(acessKey);
  38. connOpts.setServerURIs(new String[] { broker });
  39. connOpts.setPassword(sign.toCharArray());
  40. connOpts.setCleanSession(true);
  41. connOpts.setKeepAliveInterval(90);
  42. connOpts.setAutomaticReconnect(true);
  43. sampleClient.setCallback(new MqttCallbackExtended() {
  44. public void connectComplete(boolean reconnect, String serverURI) {
  45. System.out.println("connect success");
  46. //连接成功,需要上传客户端所有的订阅关系
  47. }
  48. public void connectionLost(Throwable throwable) {
  49. System.out.println("mqtt connection lost");
  50. }
  51. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  52. System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
  53. }
  54. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  55. System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  56. }
  57. });
  58. sampleClient.connect(connOpts);
  59. for (int i = 0; i < 10; i++) {
  60. try {
  61. String scontent = new Date()+"MQTT Test body" + i;
  62. //此处消息体只需要传入 byte 数组即可,对于其他类型的消息,请自行完成二进制数据的转换
  63. final MqttMessage message = new MqttMessage(scontent.getBytes());
  64. message.setQos(0);
  65. System.out.println(i+" pushed at "+new Date()+" "+ scontent);
  66. /**
  67. *消息发送到某个主题 Topic,所有订阅这个 Topic 的设备都能收到这个消息。
  68. * 遵循 MQTT 的发布订阅规范,Topic 也可以是多级 Topic。此处设置了发送到二级 Topic
  69. */
  70. sampleClient.publish(topic+"/notice/", message);
  71. /**
  72. * 如果发送 P2P 消息,二级 Topic 必须是“p2p”,三级 Topic 是目标的 ClientID
  73. * 此处设置的三级 Topic 需要是接收方的 ClientID
  74. */
  75. String p2pTopic =topic+"/p2p/GID_mqttdelay3@@@DEVICEID_001";
  76. sampleClient.publish(p2pTopic,message);
  77. } catch (Exception e) {
  78. e.printStackTrace();
  79. }
  80. }
  81. } catch (Exception me) {
  82. me.printStackTrace();
  83. }
  84. }
  85. }

4. MQTT 接收消息

本段代码演示如何使用 MQTT 客户端订阅消息,接收普通的消息以及点对点消息。

  1. public class MQTTRecvMsg {
  2. public static void main(String[] args) throws IOException {
  3. /**
  4. * 设置当前用户私有的 MQTT 的接入点。例如此处示意使用 XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请 MQTT 实例,每个实例都会分配一个接入点域名。
  5. */
  6. final String broker ="tcp://XXXX.mqtt.aliyuncs.com:1883";
  7. /**
  8. * 设置阿里云的 AccessKey,用于鉴权
  9. */
  10. final String acessKey ="XXXXXX";
  11. /**
  12. * 设置阿里云的 SecretKey,用于鉴权
  13. */
  14. final String secretKey ="XXXXXXX";
  15. /**
  16. * 发消息使用的一级 Topic,需要先在 MQ 控制台里申请
  17. */
  18. final String topic ="XXXX";
  19. /**
  20. * MQTT 的 ClientID,一般由两部分组成,GroupID@@@DeviceID
  21. * 其中 GroupID 在 MQ 控制台里申请
  22. * DeviceID 由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的 ClientID 连接
  23. */
  24. final String clientId ="GID_XXXX@@@ClientID_XXXXXX";
  25. String sign;
  26. MemoryPersistence persistence = new MemoryPersistence();
  27. try {
  28. final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  29. final MqttConnectOptions connOpts = new MqttConnectOptions();
  30. System.out.println("Connecting to broker: " + broker);
  31. /**
  32. * 计算签名,将签名作为 MQTT 的 password
  33. * 签名的计算方法,参考工具类 MacSignature,第一个参数是 ClientID 的前半部分,即 GroupID
  34. * 第二个参数阿里云的 SecretKey
  35. */
  36. sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
  37. /**
  38. * 设置订阅方订阅的 Topic 集合,此处遵循 MQTT 的订阅规则,可以是一级 Topic,二级 Topic,P2P 消息请订阅/p2p
  39. */
  40. final String[] topicFilters=new String[]{topic+"/notice/",topic+"/p2p"};
  41. final int[]qos={0,0};
  42. connOpts.setUserName(acessKey);
  43. connOpts.setServerURIs(new String[] { broker });
  44. connOpts.setPassword(sign.toCharArray());
  45. connOpts.setCleanSession(true);
  46. connOpts.setKeepAliveInterval(90);
  47. connOpts.setAutomaticReconnect(true);
  48. sampleClient.setCallback(new MqttCallbackExtended() {
  49. public void connectComplete(boolean reconnect, String serverURI) {
  50. System.out.println("connect success");
  51. //连接成功,需要上传客户端所有的订阅关系
  52. sampleClient.subscribe(topicFilters,qos);
  53. }
  54. public void connectionLost(Throwable throwable) {
  55. System.out.println("mqtt connection lost");
  56. }
  57. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  58. System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
  59. }
  60. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  61. System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  62. }
  63. });
  64. //客户端每次上线都必须上传自己所有涉及的订阅关系,否则可能会导致消息接收延迟
  65. sampleClient.connect(connOpts);
  66. //每个客户端最多允许存在30个订阅关系,超出限制可能会丢弃导致收不到部分消息
  67. sampleClient.subscribe(topicFilters,qos);
  68. Thread.sleep(Integer.MAX_VALUE);
  69. } catch (Exception me) {
  70. me.printStackTrace();
  71. }
  72. }
  73. }

上文代码用到的工具类 MacSignature.java 如下:

  1. public class MacSignature {
  2. /**
  3. * @param text 要签名的文本
  4. * @param secretKey 阿里云 MQ SecretKey
  5. * @return 加密后的字符串
  6. * @throws InvalidKeyException
  7. * @throws NoSuchAlgorithmException
  8. */
  9. public static String macSignature(String text, String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
  10. Charset charset = Charset.forName("UTF-8");
  11. String algorithm = "HmacSHA1";
  12. Mac mac = Mac.getInstance(algorithm);
  13. mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
  14. byte[] bytes = mac.doFinal(text.getBytes(charset));
  15. return new String(Base64.encodeBase64(bytes), charset);
  16. }
  17. /**
  18. * 发送方签名方法
  19. *
  20. * @param clientId MQTT ClientID
  21. * @param secretKey 阿里云 MQ SecretKey
  22. * @return 加密后的字符串
  23. * @throws NoSuchAlgorithmException
  24. * @throws InvalidKeyException
  25. */
  26. public static String publishSignature(String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  27. return macSignature(clientId, secretKey);
  28. }
  29. /**
  30. * 订阅方签名方法
  31. *
  32. * @param topics 要订阅的 Topic 集合
  33. * @param clientId MQTT ClientID
  34. * @param secretKey 阿里云 MQ SecretKey
  35. * @return 加密后的字符串
  36. * @throws NoSuchAlgorithmException
  37. * @throws InvalidKeyException
  38. */
  39. public static String subSignature(List<String> topics, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  40. Collections.sort(topics); //以字典顺序排序
  41. String topicText = "";
  42. for (String topic : topics) {
  43. topicText += topic + "\n";
  44. }
  45. String text = topicText + clientId;
  46. return macSignature(text, secretKey);
  47. }
  48. /**
  49. * 订阅方签名方法
  50. *
  51. * @param topic 要订阅的 Topic
  52. * @param clientId MQTT ClientID
  53. * @param secretKey 阿里云 MQ SecretKey
  54. * @return 加密后的字符串
  55. * @throws NoSuchAlgorithmException
  56. * @throws InvalidKeyException
  57. */
  58. public static String subSignature(String topic, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  59. List<String> topics = new ArrayList<String>();
  60. topics.add(topic);
  61. return subSignature(topics, clientId, secretKey);
  62. }
  63. }
本文导读目录