本次最佳实践,将结合Java代码对消息队列RocketMQ版(简称RocketMQ)的使用原理进行分析。

背景信息

RocketMQ 是企业级互联网架构的核心产品,具备低延迟、高并发、高可用、高可靠,可支撑万亿级数据洪峰的分布式消息中间件。可通过RocketMQ控制台创建RocketMQ实例,无需安装包,省去繁杂的手续。对RocketMQ消息服务消息可视化可以按Topic、MessageID或Topic不同维度查询发送的消息、按消息轨迹功能展示发送和消费关系、消息是否成功消费等信息。其中资源报表可以快速的统计RocketMQ在一定时间段内发送和订阅消息的TPS数。

本次最佳实践的内容主要包含 :
  • 消息同步和异步发送的Java示例代码及原理分析。
  • 针对同步和异步发送的区别选择适用的消息发送方式满足需求。
  • 对消息发送可以分Topic,更细粒化标签tag消息进行归类。
  • 通过Topic和Tag选择过滤消费消息。
  • 对消息发送失败有进行消息重试处理
  • 结合Java代码对集群和广播订阅消息消费原理进行详述。

生产者发送消息

  • 同步发送应用于重要通知邮件、报名短信通知、营销短信系统等。
  • 异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

同步发送消息

同步发送原理:

同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。

  1. pom.xml文件导入依赖包。
    <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>ons-client</artifactId>
        <version>1.8.2.Final</version>
    </dependency>
  2. 配置文件application.properties连接mq的参数值。
    # POC2专有云MQ配置
    mq.accessKey=lB2en******
    mq.secretKey=cweVrg5********
    mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.cn-******-poc2-d01.mq.namesrv.cloud.poc2.com:9876
    mq.normalTopic=pdsa_topic
    mq.producerId=GID_pdsa_mq
    mq.consumerId=CID_consumer
    mq.sendMsgTimeoutMillis=3000
    mq.tag=TagA
  3. 同步发送示例代码,针对性适配后面MQ性能压测场景代码,内容包含发送每条消息数据大小50Kb,Topic和Tag消息更细粒化分类,消息发送失败进行重试处理:
    package com.aliware.edas.com.aliware.edas.rocketmq;
    
    import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;
    import com.aliyun.openservices.ons.api.*;
    import org.springframework.cloud.context.config.annotation.RefreshScope;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.Properties;
    
    /**
     * @author yourInstanceId
     */
    @Component("simpleMQProduce")
    @RefreshScope
    public class SimpleMQProduce extends ProducerEntry {
    
        StringBuilder content = new StringBuilder();
    
        public void sendMsg() {
            for (int i = 0; i < 6400; i++) {
                content.append(String.valueOf("A"));
            }
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());
            // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
            properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
            // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
            properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
            //设置发送超时时间,单位毫秒
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());
            // 设置 TCP 接入域名,到控制台的实例基本信息中查看
            properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
    
            Producer producer = ONSFactory.createProducer(properties);
            // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
            producer.start();
    
            Message msg = new Message(
                    // Message 所属的 Topic
                    this.getTopic(),
                    // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
                    this.getTag(),
                    // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,
                    // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                    (content.toString()).getBytes());
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发
            // 注意:不设置也不会影响消息正常收发
            msg.setKey("ORDERID_" + 1);
    
            try {
                SendResult sendResult = producer.send(msg);
                // 同步发送消息,只要不抛异常就是成功
                if (sendResult != null) {
                    System.out.println(new Date() + "消息长度:" + content.length() + "--发送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length()-50, content.length()));
                }
            }
            catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                System.out.println(new Date() + "消息长度:" + content.length() + "--发送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length()-50, content.length()));
                e.printStackTrace();
            }
    
            // 在应用退出前,销毁 Producer 对象
            // 注意:如果不销毁也没有问题
            producer.shutdown();
        }
    
    }
                            

异步发送消息

异步发送原理:

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要用户实现异步发送回调接口。

  1. 在pom.xml文件导入依赖包。
    <dependency>
         <groupId>com.aliyun.openservices</groupId>
         <artifactId>ons-client</artifactId>
         <version>1.8.2.Final</version>
    </dependency>
  2. 配置文件application.properties连接mq的参数值。
    # POC2专有云MQ配置
    mq.accessKey=lB2eniM******
    mq.secretKey=cweVrg******
    mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.cn-*********-poc2-d01.mq.namesrv.cloud.poc2.com:9876
    mq.normalTopic=pdsa_topic
    mq.producerId=GID_pdsa_mq
    mq.consumerId=CID_consumer
    mq.sendMsgTimeoutMillis=3000
    mq.tag=TagA
  3. 异步发送示例代码,针对性适配后面MQ性能压测场景代码,内容包含发送每条消息数据大小50Kb ,Topic和Tag消息更细粒化分类,消息发送失败进行重试处理。
    package com.aliware.edas.com.aliware.edas.rocketmq;
    
    import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;
    import com.aliyun.openservices.ons.api.*;
    import org.springframework.cloud.context.config.annotation.RefreshScope;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.Properties;
    
    /**
     * @author liuhuihui
     */
    @Component("asyncSimpleMQProduce")
    @RefreshScope
    public class AsyncSimpleMQProduce extends ProducerEntry {
    
        StringBuilder content = new StringBuilder();
    
        public void sendMsg() {
            for (int i = 0; i < 6400; i++) {
                content.append(String.valueOf("A"));
            }
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());
            // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
            properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
            // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
            properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
            //设置发送超时时间,单位毫秒
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());
            // 设置 TCP 接入域名,到控制台的实例基本信息中查看
            properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
    
            Producer producer = ONSFactory.createProducer(properties);
            // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
            producer.start();
    
            Message msg = new Message(
                    // Message 所属的 Topic
                    this.getTopic(),
                    // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
                    this.getTag(),
                    // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,
                    // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                    (content.toString()).getBytes());
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发
            // 注意:不设置也不会影响消息正常收发
            msg.setKey("ORDERID_" + 1);
    
            while (true) {
                producer.sendAsync(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(new Date() + "消息长度:" + content.length() + "--发送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length()-50, content.length()));
                    }
    
                    @Override
                    public void onException(OnExceptionContext context) {
                        System.out.println("发送失败!");
                    }
                });
            }
    
            // 在应用退出前,销毁 Producer 对象
            // 注意:如果不销毁也没有问题
    //        producer.shutdown();
        }
    
    }
                            

消费者订阅消息

集群订阅

集群订阅原理:

同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。

  1. 在pom.xml文件导入依赖包。
    <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>ons-client</artifactId>
        <version>1.8.2.Final</version>
    </dependency>
  2. 配置文件application.properties连接mq的参数值。
  3. 集群订阅示例代码,适配后面MQ性能压测场景代码。
    @Component("simpleMQConsumer")
    @RefreshScope
    public classSimpleMQConsumerextendsProducerEntry{
    public void receive(){
        Properties properties=new Properties();
        //您在控制台创建的GroupID
        properties.put(PropertyKeyConst.GROUP_ID,this.getProcucerId());
        //AccessKey阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.AccessKey,this.getAccessKey());
        //SecretKey阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.SecretKey,this.getSecretKey());
        //设置TCP接入域名,到控制台的实例基本信息中查看
        properties.put(PropertyKeyConst.NAMESRV_ADDR,this.getOnsAddr());
        //集群订阅方式(默认)
        properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);
        Consumer consumer=ONSFactory.createConsumer(properties);
        //订阅另外一个Topic
        consumer.subscribe(this.getTopic(),"*",new MessageListener(){//订阅全部Tag 
            @Override
            public Action consume(Message message,ConsumeContext context){
            System.out.println("Receive:"+message);
            return Action.CommitMessage;
            }
        });
        consumer.start();
        System.out.println("Consumer Started");
        }
    }

广播订阅

广播订阅原理:

同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。

  1. 在pom.xml文件导入依赖包。
    <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>ons-client</artifactId>
        <version>1.8.2.Final</version>
    </dependency>
  2. 广播消费示例代码。
    @Component("simpleMQConsumer")
    @RefreshScope
    public classSimpleMQConsumerextendsProducerEntry{
    public void receive(){
        Properties properties=new Properties();
        //您在控制台创建的GroupID
        properties.put(PropertyKeyConst.GROUP_ID,this.getProcucerId());
        //AccessKey阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.AccessKey,this.getAccessKey());
        //SecretKey阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.SecretKey,this.getSecretKey());
        //设置TCP接入域名,到控制台的实例基本信息中查看
        properties.put(PropertyKeyConst.NAMESRV_ADDR,this.getOnsAddr());
        //广播订阅方式(默认)
        properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);
        Consumer consumer=ONSFactory.createConsumer(properties);
        consumer.subscribe(this.getTopic(),"*",new MessageListener(){//订阅全部Tag 
            @Override
            public Action consume(Message message,ConsumeContext context){
            System.out.println("Receive:"+message);
            return Action.CommitMessage;
            }
        });
        consumer.start();
        System.out.println("Consumer Started");
        }
    }

RocketMQ测试分析

客户场景,信息共享交换平台:

  • 交换平台需支持每秒万级别数据传输。
  • 实现跨路段、跨部门、跨行业、跨区域信息即时共享,做到高可靠、低延迟。
客户场景

客户现场展示场景设计思路:

  1. 针对性的编写一套Java代码来支撑本次MQ性能POC验证。
  2. 选择合适规格的单台ECS,在单个TOPIC下运行多线程代码实现和MQ的订阅发送,统计1分钟内订阅和发送的数据交换TPS情况。
  3. 考虑POC也要符合客户实际生产场景中MQ使用逻辑,ECS应运行4个独立的JAR包,两对JAR包交叉经过MQ进行数据交换。
  4. 消息体内的内容应打印到屏幕,通过消息轨迹验证消息的被消费情况。
  5. 验证结果:客户指定场景下8线程异步,1分钟TPS在10K以上。

创建资源

创建RocketMQ实例

  1. 登录Apsara Stack控制台。
  2. 在左侧导航栏中单击中间件产品 > 消息队列访问管理控制台页面。
    控制台
  3. 消息队列页面,选择区域与部门后,单击MQ,进入MQ控制台。
    MQ控制台
  4. 单击左侧导航栏概览后,在概览页面中,单击创建实例
  5. 创建实例对话框,选择实例类型,并输入实例名和描述,然后单击确认
    创建实例
    说明 创建完实例后,单击左侧导航栏实例详情,可以查看创建实例的Topic数上限、消息发送TPS上限、消息订阅TPS上限和TCP协议接入地址等。

创建Topic

Topic 是消息队列 RocketMQ 版里对消息的一级归类,例如可以创建 Topic_Trade 这一 Topic 来识别交易类消息,消息生产者将消息发送到 Topic_Trade,而消息消费者则通过订阅该 Topic 来获取和消费消息。

创建Topic要注意以下几点:

  • Topic 不能跨实例使用,例如在实例 A 中创建的 Topic A 不能在实例 B 中使用。
  • Topic 名称必须在同一实例中是唯一的。
  • 您可创建不同的 Topic 来发送不同类型的消息,例如用 Topic A 发送普通消息,Topic B 发送事务消息,Topic C 发送定时/延时消息。
  1. 在控制台左侧导航栏,单击 Topic 管理
  2. Topic 管理页面上方选择刚创建的实例,单击创建 Topic 按钮。
    创建topic
  3. 创建 Topic 弹窗中的 Topic 一栏,输入 Topic 名称,选择该 Topic 对应的消息类型,输入该 Topic 的备注内容,然后单击确定

创建Group ID

创建完实例和Topic 后,您需要为消息的消费者(或生产者)创建客户端ID ,即Group ID作为标识。
  • Group ID必须在同一实例中是唯一的。
  • Group ID和Topic的关系是N:N,即一个消费者可以订阅多个Topic,同一个Topic 也可以被多个消费者订阅;一个生产者可以向多个Topic 发送消息,同一个Topic 也可以接收来自多个生产者的消息。
  1. 在控制台左侧导航栏,单击 Group 管理
  2. Group 管理页面上方选择刚创建的实例,然后选择TCP协议 > 创建Group ID 。本文以 TCP 协议为例。
    创建Group ID
  3. 创建 Group ID 对话框中,输入 Group ID 和描述,然后单击确认

场景落地

  1. 准备p1,c1,p2,c2双发送双订阅应用小程序,其中p1、p2小程序请参见生产者发送消息中的生产者异步发送消息代码,c1、c2小程序请参见消费者订阅消息中的消费者集群订阅消息代码。

    p1-8081.jar

    c1-8083.jar

    p2-8082.jar

    c2-8084.jar

  2. 通过XShell连接到专有云ops1环境,把准备的4个jar包上传至一台16c32gECS服务器上。
    场景落地2
  3. 在该目录下编写启动4个jar包的start.sh脚本。
    #!/bin/bash
    nohup java -jar p1-8081.jar &
    nohup java -jar p2-8081.jar &
    nohup java -jar c1-8081.jar &
    nohup java -jar c2-8081.jar &
  4. 编写停用4个jar包的stop.sh脚本。
    #!/bin/bash
    process=jar
    PID=$(ps -ef|grep $process |grep -v
    grep|awk '{print $2}')
    #echo $PID
    if [ ! -n "${PID[0]}" ];then      
    echo "\"$process\" process not find"       
    ps -ef|grep $process
    else       
    kill -9 $PID       
    echo "kill $process success"       
    ps -ef|grep $process
    fi
  5. 编写请求消息队列2个发送和2个订阅接口的curl脚本。
    #!/bin/bash
    curl
    http://192.168.**.***:8081/echo-sync-final-send
    curl
    http://192.168.**.***:8082/echo-sync-final-send
    curl
    http://192.168.**.***:8083/echo-final-mq
    curl
    http://192.168.**.***:8084/echo-final-mq
  6. 执行./start.sh启动4个jar包,通过tail -f nohup.out查看启动日志,每个jar包启动完成日志如下。
    场景落地6
  7. 通过ps -ef | grep jar确认4个jar是否在ECS服务器运行,有如下图则表示4个应用小程序运行正常。
    场景落地7
  8. 执行./curl.sh请求2个消息发送和2个消息订阅接口,请求之后我们查看一下程序后台日志发现代码打印的消息发送和消息订阅的日志不停的在刷。
    1. 截取单条消息发送日志如下:场景落地8
    2. 截取单条消息订阅日志如下:场景落地8.1

消息查询

如遇消息消费有问题,则可通过查询具体发送的消息内容来排查问题。消息队列 RocketMQ 版提供了三种消息查询的方式,分别是按 Message ID、Message Key 以及 Topic 查询。

查询方式说明

三种查询方式的特点和对比如下表所述。

表 1. 查询方式对比
查询方式 查询条件 查询类别 说明
按Message ID查询 Topic+Message ID 精确查询 根据Topic和Message ID可以精确定位任意一条消息,获取消息的属性。
按Message Key查询 Topic+Message Key 模糊查询 根据Topic和Message Key可以匹配到包含指定Key的最近64条消息。
注意 建议消息生产方为每条消息设置尽可能唯一的Key,以确保相同的Key的消息不会超过64条,否则消息会漏查。
按Topic查询 Topic+时间段 范围查询 根据Topic和时间范围,批量获取符合条件的所有消息,查询量大,不易匹配。

推荐查询过程

推荐按照以下流程查询消息。

查询流程

查询步骤

  1. 登录MQ控制台
  2. 在左侧导航栏,单击消息轨迹
  3. 消息轨迹页面,您可单击以下任一页签,然后按页面提示输入相应信息,再单击搜索按钮来查询消息。
    • 按 Message ID 查询

      按 Message ID 查询消息属于精确查询,您输入 Topic 和 Message ID 即可精确查询到任意一条消息。因此,为了尽可能精确地查询,建议在发送消息成功后将 Message ID 信息打印到日志中,方便问题排查。

      按ID查询
    • 按 Message Key 查询

      消息队列 RocketMQ 版根据您设置的 Message Key 建立消息的索引信息,当您输入 Key 进行查询时,消息队列 RocketMQ 版根据该索引即可匹配相关的消息返回。

      按KEY查询
    • 按 Topic 查询

      按 Topic 查询一般用在 Message ID 和 Message Key 都无法获得的情况下,根据 Topic 和消息的发送时间范围,批量获取该时间范围内的所有消息,然后再找到关心的数据。

      按topic查询
  4. 操作栏中单击消息详情,可查看到轨迹的简要信息,主要是消息本身的属性以及接收状态的信息。
    消息详情
  5. 在展开的区域中,单击查看轨迹即可查看完整的链路图。下图示例为在 TCP 协议下,按 Message ID 查询普通消息的轨迹。
    对于 Message Key 和 Topic 查询方式,如果匹配到多条轨迹,可以进行上下翻页,查看比对轨迹数据。轨迹

查询结果说明

您可以在控制台的消息查询页面看到查询到的消息。直接显示的信息包含 Message ID、Tag、Key 和存储时间。此外,您还可以在每一行消息操作列下载消息内容、查询信息轨迹以及查看消息详情。

投递状态是消息队列 RocketMQ 版根据各个 Group ID 的消费进度计算出的结果,投递状态的信息如下表所示。

表 2. 消息投递状态
投递状态 可能的原因
已订阅,并且消息至少已被消费一次 该Group ID已经正常消费过这条消息。
已订阅,但消息被过滤表达式过滤,请查看Tag 该消息的Tag不符合消费方的订阅关系,消息被过滤,可以在控制台进入Group 管理,然后单击该Group ID的操作列的消费者状态查询订阅关系。
已订阅,但消息未被消费 该Group ID订阅了该消息,但还未消费,有可能是消费过慢,或者消费出现异常导致阻塞。
已订阅,但是Group ID当前不在线,请通过消息轨迹功能进行精确查询 该Group ID订阅了该消息,但是不在线,请检查消费者端应用不在线的原因。
未知异常 出现未收录异常,请联系技术支持。

消费验证

消息队列 RocketMQ 版提供了消费验证功能,该功能可以将指定消息推送给指定的在线客户端,以检测客户端消费该消息的逻辑和结果是否符合预期。
说明 消费验证功能仅仅是用于验证客户端的消费逻辑是否正常,并不会影响正常的收消息流程,因此消息的消费状态等信息在消费验证后并不会改变。

查看消息生产数据

可供查看的消息生产数据是某个Topic在一个时间段内从Broker接收的消息的总量或者TPS。

  1. 登录MQ控制台。
  2. 在左侧导航栏,单击资源报表
  3. 资源报表页面,单击消息生产页签。
  4. Topic一栏,选择Topic,并指定其他信息,然后单击搜索
    字段说明:
    • 采集类型:分为总量和TPS。总量提供该周期内Topic接收的消息总量;TPS提供每个周期内该Topic接收消息的平均TPS。
    • 采集周期:包括1分钟、10分钟后、30分钟、1小时。采集周期决定了数据采集的时间间隔,周期越短,采集点越密集,消息消费数据越详细。
    • 时间范围:RocketMQ最多可以提供最近三天之内的消息的生产查询。
    查询结果以图表的形式显示。查询3查询4

查看消息消费数据

可供查看的消息消费数据是某个Topic在一个时间段内投递给某个Group ID的消息的总量或TPS。

  1. 登录MQ控制台。
  2. 在左侧导航栏,单击资源报表
  3. 资源报表页面,单击消息消费页签。
  4. Group IDTopic栏,分别选择您要查询的Group ID和Topic。
  5. 指定其他信息,然后单击搜索
    查询字段说明:
    • 采集类型:分为总量和TPS。总量提供每个周期内该Topic投递给该Group ID的消息总量,TPS提供每个周期内该Topic投递给该Group ID消息的平均TPS。
    • 采集周期:包括1分钟、10分钟后、30分钟、1小时。采集周期决定了数据采集的时间间隔,周期越短,采集点越密集,消息消费数据越详细。
    • 时间范围:RocketMQ最多可以提供最近三天之内的消息的消费查询。
    查询结果以图表的形式显示。查询结果查询结果2