全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

MQ 客户端收发 MQTT 消息

更新时间:2018-01-23 21:00:47

除了使用 MQTT 客户端收发 MQTT 消息,MQ 也支持使用 MQ 客户端来收发 MQTT 消息,实现 MQ 私有协议和 MQTT 协议之间的数据互通。

MQTT 是面向移动端的消息协议,一般单个客户端的处理能力都比较弱。因此 MQTT 协议适用于大量在线客户端,但是每个客户端消息很少的场景。而 MQ 是面向服务端的消息引擎,单个客户端处理能力强,TPS 高,适用于服务端进行大批量处理分析的场景。因此,一般推荐在移动端设备上使用 MQTT,而在服务端应用则使用 MQ 。

例如,业务上有千万数量级的传感器在使用 MQTT 客户端上传数据,此时服务端就可以用 MQ 客户端搭建,用少量的机器来完成这些传感器数据的处理。

MQ 消息和 MQTT 消息的对应关系如下图所示:

mq2mqtt

使用 MQ 客户端的相关说明,请参见 MQ TCP Java SDK 接入

1. MQ 客户端发送 MQTT 消息

本小节介绍如何使用 MQ 的 SDK 向 MQTT 设备发送消息。

使用 MQ 客户端向 MQTT 设备发消息的方式和普通 MQ 客户端发消息没有区别,只是要根据需求,将 MQTT 的二级 Topic 设置到 MQ 的消息属性中,同时指定消息的 Tag 为 MQ2MQTT 即可。

其中,涉及到 MQTT 的相关特性需要设置到消息的属性中,具体列表如下:

属性 key value 默认值 说明
QoS qoslevel 0,1,2 1 该消息的 QoS 级别
CleanSession cleansessionflag true/false true 该消息的 cleanSession 标签,仅 P2P 消息支持设置
subTopic mqttSecondTopic 字符串 该消息的子 topic,如果不设默认为空
mqttRealTopic mqttRealTopic 字符串 客户端收到消息时显示的topic名称,该属性一般用于P2P消息,即客户端收到P2P消息时可以显示成预期的业务topic

示例代码:

public class ONSSendMsg {
    public static void main(String[] args) throws InterruptedException {
        /**
         * 设置阿里云的 AccessKey,用于鉴权
         */
        final String acessKey ="XXXXXX";
        /**
         * 设置阿里云的 SecretKey,用于鉴权
         */
        final String secretKey ="XXXXXXX";
        /**
         * 发消息使用的一级 Topic,需要先在 MQ 控制台里创建
         */
        final String topic ="MQTTTestTopic";
        /**
         * ProducerID,需要先在 MQ 控制台里创建
         */
        final String producerId ="PID_MQTTTestTopic";
        Properties properties =new Properties();
        properties.put(PropertyKeyConst.ProducerId, producerId);
        properties.put(PropertyKeyConst.AccessKey, acessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();
        byte[] body=new byte[1024];
        final Message msg = new Message(
                topic,//MQ 消息的 Topic,需要事先创建
                "MQ2MQTT",//MQ Tag,通过 MQ 向 MQTT 客户端发消息时,必须指定 MQ2MQTT 作为 Tag,其他 Tag 或者不设都将导致 MQTT 客户端收不到消息
                body);//消息体,和 MQTT 的 body 对应
        /**
         * 使用 MQ 客户端给 MQTT 设备发送 P2P 消息时,需要在 MQ 消息中设置 mqttSecondTopic 属性
         * 设置的值是“/p2p/”+目标 ClientID
         */
        String targetClientID="GID_MQTTTestTopic@@@DeviceID_0001";
        msg.putUserProperties("mqttSecondTopic", "/p2p/"+targetClientID);
        //发送消息,只要不抛异常就是成功。
        SendResult sendResult = producer.send(msg);
        System.out.println(sendResult);
        /**
         * 如果仅仅发送 Pub/Sub 消息,则只需要设置实际 MQTT 订阅的 Topic 即可,支持设置二级 Topic
         */
        msg.putUserProperties("mqttSecondTopic", "/notice/");
        SendResult result =producer.send(msg);
        producer.shutdown();
        System.exit(0);
    }
}

2. MQ 客户端接收 MQTT 消息

本小节介绍如何使用 MQ 的 SDK 接收来自 MQTT 设备发送的消息。使用 MQ 客户端接收 MQTT 设备的消息和普通 MQ 客户端收消息没有区别,MQ 客户端只需要订阅 MQTT 的一级 Topic 即可。

public class ONSRecvMsg {
    public static void main(String[] args) throws InterruptedException {
        /**
         * 设置阿里云的 AccessKey,用于鉴权
         */
        final String acessKey ="XXXXXX";
        /**
         * 设置阿里云的 SecretKey,用于鉴权
         */
        final String secretKey ="XXXXXXX";
        /**
         * 收消息使用的一级 Topic,需要先在 MQ 控制台里创建
         */
        final String topic ="MQTTTestTopic";
        /**
         * ConsumerID ,需要先在 MQ 控制台里创建
         */
        final String consumerID ="CID_MQTTTestTopic";
        Properties properties =new Properties();
        properties.put(PropertyKeyConst.ConsumerId, consumerID);
        properties.put(PropertyKeyConst.AccessKey, acessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        Consumer consumer =ONSFactory.createConsumer(properties);
        /**
         * 此处 MQ 客户端只需要订阅 MQTT 的一级 Topic 即可
         */
        consumer.subscribe(topic, "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext consumeContext) {
                System.out.println("recv msg:"+message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        System.out.println("[Case Normal Consumer Init]   Ok");
        Thread.sleep(Integer.MAX_VALUE);
        consumer.shutdown();
        System.exit(0);
    }
本文导读目录