全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

MQ 客户端收发 MQTT 消息

更新时间:2017-09-01 15:15:04   分享:   

除了使用 MQTT 客户端收发 MQTT 消息,MQ 也支持使用 MQ 客户端来收发 MQTT 消息,实现 MQ 私有协议和 MQTT 协议之间的数据互通。

MQTT 是面向移动端的消息协议,一般单个客户端的处理能力都比较弱。因此 MQTT 协议适用于大量在线客户端,但是每个客户端消息很少的场景。而 MQ 是面向服务端的消息引擎,单个客户端处理能力强,TPS 高,适用于服务端进行大批量处理分析的场景。因此,一般推荐在移动端设备上使用 MQTT,而在服务端应用则使用 MQ 。

例如,业务上有千万数量级的传感器在使用 MQTT 客户端上传数据,此时服务端就可以用 MQ 客户端搭建,用少量的机器来完成这些传感器数据的处理。

MQ 消息和 MQTT 消息的对应关系如下图所示:

mq2mqtt

使用 MQ 客户端的相关说明,请参见 MQ TCP Java SDK 接入

1. MQ 客户端发送 MQTT 消息

本小节介绍如何使用 MQ 的 SDK 向 MQTT 设备发送消息。

使用 MQ 客户端向 MQTT 设备发消息的方式和普通 MQ 客户端发消息没有区别,只是要根据需求,将 MQTT 的二级 Topic 设置到 MQ 的消息属性中,同时指定消息的 Tag 为 MQ2MQTT 即可。

其中,涉及到 MQTT 的相关特性需要设置到消息的属性中,具体列表如下:

属性 key value 默认值 说明
QoS qoslevel 0,1,2 1 该消息的 QoS 级别
CleanSession cleansessionflag true/false true 该消息的 cleanSession 标签,仅 P2P 消息支持设置
subTopic mqttSecondTopic 字符串 该消息的子 topic,如果不设默认为空

示例代码:

  1. public class ONSSendMsg {
  2. public static void main(String[] args) throws InterruptedException {
  3. /**
  4. * 设置阿里云的AccessKey,用于鉴权
  5. */
  6. final String acessKey ="XXXXXX";
  7. /**
  8. * 设置阿里云的SecretKey,用于鉴权
  9. */
  10. final String secretKey ="XXXXXXX";
  11. /**
  12. * 发消息使用的一级Topic,需要先在MQ控制台里申请
  13. */
  14. final String topic ="MQTTTestTopic";
  15. /**
  16. * ProducerID,需要先在MQ控制台里申请
  17. */
  18. final String producerId ="PID_MQTTTestTopic";
  19. Properties properties =new Properties();
  20. properties.put(PropertyKeyConst.ProducerId, producerId);
  21. properties.put(PropertyKeyConst.AccessKey, acessKey);
  22. properties.put(PropertyKeyConst.SecretKey, secretKey);
  23. Producer producer = ONSFactory.createProducer(properties);
  24. producer.start();
  25. byte[] body=new byte[1024];
  26. final Message msg = new Message(
  27. topic,//MQ消息的Topic,需要事先申请
  28. "MQ2MQTT",//MQ Tag,通过MQ向MQTT客户端发消息时,必须指定MQ2MQTT作为Tag,其他Tag或者不设都将导致MQTT客户端收不到消息
  29. body);//消息体,和MQTT的body对应
  30. /**
  31. * 使用MQ客户端给MQTT设备发送P2P消息时,需要在MQ消息中设置mqttSecondTopic属性
  32. * 设置的值是“/p2p/”+目标ClientID
  33. */
  34. String targetClientID="GID_MQTTTestTopic@@@DeviceID_0001";
  35. msg.putUserProperties("mqttSecondTopic", "/p2p/"+targetClientID);
  36. //发送消息,只要不抛异常就是成功。
  37. SendResult sendResult = producer.send(msg);
  38. System.out.println(sendResult);
  39. /**
  40. * 如果仅仅发送Pub/Sub消息,则只需要设置实际MQTT订阅的Topic即可,支持设置二级Topic
  41. */
  42. msg.putUserProperties("mqttSecondTopic", "/notice/");
  43. SendResult result =producer.send(msg);
  44. producer.shutdown();
  45. System.exit(0);
  46. }
  47. }

2. MQ 客户端接收 MQTT 消息

本小节介绍如何使用 MQ 的 SDK 接收来自 MQTT 设备发送的消息。使用 MQ 客户端接收 MQTT 设备的消息和普通 MQ 客户端收消息没有区别,MQ 客户端只需要订阅 MQTT 的一级 Topic 即可。

  1. public class ONSRecvMsg {
  2. public static void main(String[] args) throws InterruptedException {
  3. /**
  4. * 设置阿里云的AccessKey,用于鉴权
  5. */
  6. final String acessKey ="XXXXXX";
  7. /**
  8. * 设置阿里云的SecretKey,用于鉴权
  9. */
  10. final String secretKey ="XXXXXXX";
  11. /**
  12. * 收消息使用的一级Topic,需要先在MQ控制台里申请
  13. */
  14. final String topic ="MQTTTestTopic";
  15. /**
  16. * ConsumerID ,需要先在MQ控制台里申请
  17. */
  18. final String consumerID ="CID_MQTTTestTopic";
  19. Properties properties =new Properties();
  20. properties.put(PropertyKeyConst.ConsumerId, consumerID);
  21. properties.put(PropertyKeyConst.AccessKey, acessKey);
  22. properties.put(PropertyKeyConst.SecretKey, secretKey);
  23. Consumer consumer =ONSFactory.createConsumer(properties);
  24. /**
  25. * 此处MQ客户端只需要订阅MQTT的一级Topic即可
  26. */
  27. consumer.subscribe(topic, "*", new MessageListener() {
  28. public Action consume(Message message, ConsumeContext consumeContext) {
  29. System.out.println("recv msg:"+message);
  30. return Action.CommitMessage;
  31. }
  32. });
  33. consumer.start();
  34. System.out.println("[Case Normal Consumer Init] Ok");
  35. Thread.sleep(Integer.MAX_VALUE);
  36. consumer.shutdown();
  37. System.exit(0);
  38. }
本文导读目录
本文导读目录
以上内容是否对您有帮助?