顺序消息

顺序消息为云消息队列 RocketMQ 版中的高级特性消息,本文为您介绍顺序消息的应用场景、功能原理、使用限制、使用方法和使用建议。

应用场景

在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用云消息队列 RocketMQ 版的顺序消息可以有效保证数据传输的顺序性。

典型场景一:撮合交易

交易撮合

以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

典型场景二:数据实时增量同步

图 1. 普通消息普通消息

图 2. 顺序消息顺序消息

以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过云消息队列 RocketMQ 版传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。

功能原理

什么是顺序消息

顺序消息是云消息队列 RocketMQ 版提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。

相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。

云消息队列 RocketMQ 版顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组。

重要

只有同一消息组的消息才能保证顺序,不同消息组或未设置消息组的消息之间不涉及顺序性。

基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。

如何保证消息的顺序性

云消息队列 RocketMQ 版的消息的顺序性分为两部分,生产顺序性和消费顺序性。

  • 生产顺序性云消息队列 RocketMQ 版通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。

    如需保证消息生产的顺序性,则必须满足以下条件:

    • 同一消息组(MessageGroup)

      消息生产顺序性的范围为消息组,生产者发送消息时可以为每条消息设置消息组,只有同一消息组内的消息可以保证顺序性,不同消息组或未设置消息组的消息之间不保证顺序。

    • 单一生产者

      消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。

    • 串行发送

      云消息队列 RocketMQ 版生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

    满足以上条件的生产者,将顺序消息发送至云消息队列 RocketMQ 版后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:顺序存储逻辑

    • 相同消息组的消息按照先后顺序被存储在同一个队列。

    • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

    如上图所示,消息组1和消息组4的消息混合存储在队列1中,云消息队列 RocketMQ 版保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。

  • 消费顺序性云消息队列 RocketMQ 版通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。

    如需保证消息消费的顺序性,则必须满足以下条件:

    • 投递顺序

      云消息队列 RocketMQ 版通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。

      重要

      消费者类型为PushConsumer时,云消息队列 RocketMQ 版保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。消费者类型的具体信息,请参见消费者分类

    • 有限重试

      云消息队列 RocketMQ 版顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

      对于需要严格保证消费顺序的场景,请务必设置合理的重试次数,避免参数不合理导致消息乱序。

生产顺序性和消费顺序性组合

如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。但一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,您可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。例如发送顺序消息,但使用非顺序的并发消费方式来提高吞吐能力。更多组合方式如下表所示:

生产顺序

消费顺序

顺序性效果

设置消息组,保证消息顺序发送。

顺序消费

按照消息组粒度,严格保证消息顺序。

同一消息组内的消息的消费顺序和发送顺序完全一致。

设置消息组,保证消息顺序发送。

并发消费

并发消费,尽可能按时间顺序处理。

未设置消息组,消息乱序发送。

顺序消费

按队列存储粒度,严格顺序。

基于云消息队列 RocketMQ 版本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。

未设置消息组,消息乱序发送。

并发消费

并发消费,尽可能按照时间顺序处理。

顺序消息生命周期

生命周期

  • 初始化

    消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 待消费

    消息被发送到服务端,对消费者可见,等待消费者消费的状态。

  • 消费中

    消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。

    此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,云消息队列 RocketMQ 版会对消息进行重试处理。具体信息,请参见消费重试

  • 消费提交

    消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。

    云消息队列 RocketMQ 版默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除

    云消息队列 RocketMQ 版按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制

重要
  • 消息消费失败或消费超时,会触发服务端重试逻辑,重试消息属于新的消息,原消息的生命周期已结束。

  • 顺序消息消费失败进行消费重试时,为保障消息的顺序性,后续消息不可被消费,必须等待前面的消息消费完成后才能被处理。

使用限制

顺序消息仅支持使用MessageTypeFIFO的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。

使用示例

和普通消息发送相比,顺序消息发送必须要设置消息组。消息组的粒度建议按照业务场景,尽可能细粒度设计,以便实现业务拆分和并发扩展。

以Java语言为例,收发顺序消息的示例代码如下:

完整的消息收发示例代码请参见RocketMQ 5.x系列SDK(推荐)

示例代码

顺序消息发送

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;


public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * 实例接入点,从控制台实例详情页的接入点页签中获取。
         * 如果是在阿里云ECS内网访问,建议填写VPC接入点。
         * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //消息发送的目标Topic名称,需要提前在控制台创建,如果不创建直接使用会返回报错。
        String topic = "Your Topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
         * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
         */
        // builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        ClientConfiguration configuration = builder.build();
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
         //顺序消息发送。
        Message message = provider.newMessageBuilder()
                .setTopic("topic")
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
                .setMessageGroup("fifoGroup001")
                //消息体。
                .setBody("messageBody".getBytes())
                .build();
        try {
            //发送消息,需要关注发送结果,并捕获失败等异常
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

PushConsumer消费

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        /**
         * 实例接入点,从控制台实例详情页的接入点页签中获取。
         * 如果是在阿里云ECS内网访问,建议填写VPC接入点。
         * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
        String topic = "Your Topic";
        //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
         * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();
        //订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        //消费顺序消息时,需要确保当前消费者分组ConsumerGroup是顺序投递模式,否则仍然按并发乱序投递。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //设置消费者分组。
                .setConsumerGroup(consumerGroup)
                //设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                //设置消费监听器。
                .setMessageListener(messageView -> {
                    //处理消息并返回消费结果。
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        //如果不需要再使用PushConsumer,可关闭该进程。
        //pushConsumer.close();
    }
}  

SimpleConsumer消费

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        /**
         * 实例接入点,从控制台实例详情页的接入点页签中获取。
         * 如果是在阿里云ECS内网访问,建议填写VPC接入点。
         * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
        String topic = "Your Topic";
        //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
         * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        //订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //初始化SimpleConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        //消费顺序消息时,需要确保当前消费者分组ConsumerGroup是顺序投递模式,否则仍然按并发乱序投递。
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)
                //设置消费者分组。
                .setConsumerGroup(consumerGroup)
                //设置长轮询超时时间。
                .setAwaitDuration(awaitDuration)
                //设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();
        //设置本次拉取的最大消息条数。
        int maxMessageNum = 16;
        //设置消息的不可见时间。
        Duration invisibleDuration = Duration.ofSeconds(10);
        //SimpleConsumer需要客户端一直主动循环获取消息,并进行消费处理。
        //如果需要提高消费实时性,建议多线程并发拉取。
        while (true) {
            //需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
            final List<MessageView> messageViewList = consumer.receive(maxMessageNum, invisibleDuration);
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                //消费处理完成后,需要主动调用ACK提交消费结果。
                try {
                    consumer.ack(messageView);
                } catch (ClientException e) {
                    //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
                    e.printStackTrace();
                }
            });
        }
        // 如果不需要再使用SimpleConsumer,可关闭该进程。
        // consumer.close();
    }
}                                           

获取顺序消息消费重试日志

PushConsumer顺序消费的重试是在消费者客户端进行,服务端无法获取消费重试的详细日志,若消息轨迹中顺序消息的投递结果为失败时,您需要在消费者客户端日志中查看消息重试的最大次数、消费者客户端等信息。

消费者客户端日志查看路径,请参见日志配置

您可以通过搜索以下关键字在客户端日志中快速定位消费失败的相关内容:

Message listener raised an exception while consuming messages
Failed to consume fifo message finally, run out of attempt times

使用建议

串行消费,避免批量消费导致乱序

消息消费建议串行处理,避免一次消费多条消息,否则可能出现乱序情况。

例如:发送顺序为1->2->3->4,消费时批量消费,消费顺序为1->23(批量处理,失败)->23(重试处理)->4,此时可能由于消息3的失败导致消息2被重复处理,最后导致消息消费乱序。

消息组尽可能打散,避免集中导致热点

云消息队列 RocketMQ 版保证相同消息组的消息存储在同一个队列中,如果不同业务场景的消息都集中在少量或一个消息组中,则这些消息存储压力都会集中到服务端的少量队列或一个队列中。容易导致性能热点,且不利于扩展。一般建议的消息组设计会采用订单ID、用户ID作为顺序参考,即同一个终端用户的消息保证顺序,不同用户的消息无需保证顺序。

因此建议将业务以消息组粒度进行拆分,例如,将订单ID、用户ID作为消息组关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。