文档

收发顺序消息

更新时间:
一键部署

顺序消息(FIFO消息)是阿里云云消息队列 RocketMQ 版提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用TCP协议下的社区版C++ SDK收发顺序消息的示例代码供您参考。

顺序消息分为两类:

  • 全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。

  • 分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。

更多信息,请参见顺序消息

前提条件

发送顺序消息

重要

云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果发送方有多个生产者或者有多个线程并发发送消息,则此时只能以到达云消息队列 RocketMQ 版服务端的时序作为消息顺序的依据,和业务侧的发送顺序未必一致。

  1. 将以下代码拷贝到OrderProducerDemo.cpp文件中,修改相应的参数后,使用g++命令进行编译,生成可执行文件,即可直接运行。

    g++ -o order_producer_demo -std=c++11 -lz -lrocketmq OrderProducerDemo.cpp

  2. 发送顺序消息的示例代码如下。

    #include <iostream>
    #include <chrono>
    #include <thread>
    #include "DefaultMQProducer.h"
    
    using namespace std;
    using namespace rocketmq;
    
    class ExampleSelectMessageQueueByHash : public MessageQueueSelector {
    public:
        MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) {
            // 实现自定义分区逻辑,根据业务传入arg参数即分区键,计算路由到哪个队列,这里以arg为int型参数为例。
            int orderId = *static_cast<int *>(arg);
            int index = orderId % mqs.size();
            return mqs[0];
        }
    };
    
    int main() {
        std::cout << "=======Before sending messages=======" << std::endl;
        //您在阿里云消息队列RocketMQ控制台上申请的GID。
        DefaultMQProducer producer("GID_XXXXXXXX");
        //设置TCP协议接入点,从阿里云消息队列RocketMQ控制台的实例详情页面获取。
        producer.setNamesrvAddr("http://MQ_INST_XXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
    	  //ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET分别为阿里云账号的AccessKey ID和AccessKey Secret,用于身份验证。
        //用户渠道,默认值为:ALIYUN。
        producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
    
        //请确保参数设置完成之后启动Producer。
        producer.start();
        auto start = std::chrono::system_clock::now();
        int count = 32;
        //可以设置发送重试的次数,确保发送成功。
        int retryTimes = 1;
        //参考接口MessageQueueSelector,通过设置的自定义参数arg,计算发送到指定的路由队列中,此处的arg便是分区ID。
        ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash();
        for (int i = 0; i < count; ++i) {
            //发送消息时请设置您在阿里云消息队列RocketMQ控制台上申请的Topic。
            MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message.");
            try {
                SendResult sendResult = producer.send(msg, pSelector, &i, 1, false);
                std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                          << "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl;
                this_thread::sleep_for(chrono::seconds(1));
            } catch (MQException e) {
                std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
            }
        }
        auto interval = std::chrono::system_clock::now() - start;
        std::cout << "Send " << count << " messages OK, costs "
                  << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
    
        producer.shutdown();
        std::cout << "=======After sending messages=======" << std::endl;
        return 0;
    }

消费顺序消息

  1. 将以下代码拷贝到OrderConsumerDemo.cpp文件中,修改相应的参数后,使用g++命令进行编译,生成可执行文件,即可直接运行。

    g++ -o order_consumer_demo -std=c++11 -lz -lrocketmq OrderConsumerDemo.cpp

  2. 消费顺序消息的示例代码如下。

    
    #include <iostream>
    #include <thread>
    #include "DefaultMQPushConsumer.h"
    
    using namespace rocketmq;
    
    
    class ExampleOrderlyMessageListener : public MessageListenerOrderly {
    public:
        ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
            for (auto item = msgs.begin(); item != msgs.end(); item++) {
                std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
            }
            return CONSUME_SUCCESS;
        }
    };
    
    int main(int argc, char *argv[]) {
        std::cout << "=======Before consuming messages=======" << std::endl;
        //您在阿里云消息队列RocketMQ控制台上申请的GID。
        DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXXX");
        //设置TCP协议接入点,从阿里云消息队列RocketMQ控制台的实例详情页面获取。
        consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
    	  //ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET分别为阿里云账号的AccessKey ID和AccessKey Secret,用于身份验证。
        //用户渠道,默认值为:ALIYUN。
        consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
        auto start = std::chrono::system_clock::now();
    
        //register your own listener here to handle the messages received.
        //请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。
        ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener();
        consumer->subscribe("YOUR ORDERLY TOPIC", "*");
        consumer->registerMessageListener(messageListener);
    
        //Start this consumer
        //准备工作完成,必须调用启动函数,才可以正常工作。
        // ********************************************
        // 1.确保订阅关系的设置在启动之前完成。
        // 2.确保相同GID下面的消费者的订阅关系一致。
        // *********************************************
        consumer->start();
    
        //Keep main thread running until process finished.
        //请保持线程常驻,不要执行shutdown操作。
        std::this_thread::sleep_for(std::chrono::seconds (60 ));
        consumer->shutdown();
        std::cout << "=======After consuming messages======" << std::endl;
        return 0;
    }
  • 本页导读 (1)
文档反馈