本文提供使用 TCP 协议下的开源 C++ SDK 来收发定时和延时消息的示例代码供您参考。

概念介绍

  • 定时消息:Producer 将消息发送到消息队列 RocketMQ 版服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息。
  • 延时消息:Producer 将消息发送到消息队列 RocketMQ 版服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。

定时消息与延时消息在代码配置上存在一些差异,但是最终达到的效果相同。消息在发送到阿里云 RocketMQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。详情请参见定时和延时消息

注意 开源版本的 Apache RocketMQ 支持延时消息,但不支持定时消息,因此没有专门的定时消息接口。阿里云 RocketMQ 的延时消息是通过设置定时时间来实现的。如需使用云上定时消息,请参照以下步骤。

前提条件

安装 CPP 动态库

发送定时消息

  1. 将以下代码拷贝到 DelayProducerDemo.cpp 文件中,修改相应的参数后,使用 g++ 命令进行编译,生成可执行文件,即可直接运行。

    g++ -o delay_producer_demo -std=c++11 -lz -lrocketmq DelayProducerDemo.cpp

  2. 发送定时消息的示例代码如下。
    #include <iostream>
    #include <chrono>
    #include <thread>
    #include "DefaultMQProducer.h"
    
    using namespace std;
    using namespace rocketmq;
    
    int main() {
        std::cout << "=======Before sending messages=======" << std::endl;
        //您在阿里云 RocketMQ 控制台上申请的 GID
        DefaultMQProducer producer("GID_XXXXXXXXXXX");
        //设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取
        producer.setNamesrvAddr("http://MQ_INST_XXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        //您在阿里云账号管理控制台中创建的 AccessKeyId 和 AccessKeySecret 用于身份认证,用户渠道,默认值为:ALIYUN
        producer.setSessionCredentials("AK", "SK", "ALIYUN");
    
        //请确保参数设置完成之后启动 Producer
        producer.start();
        auto start = std::chrono::system_clock::now();
        int count = 32;
        for (int i = 0; i < count; ++i) {
            //发送消息时请设置您在阿里云 RocketMQ 控制台上申请的Topic
            MQMessage msg("YOUR DELAY TOPIC", "HiTAG", "Hello,CPP SDK, Delay Message.");
            chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
            chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
            //定时延时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如 2020-03-07 16:21:00 投递
            //如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
            //设置需要延时或者定时的时间,例如当前时间延迟 10 秒后投递
            long exp = mil.count() + 10000;
            msg.setProperty("__STARTDELIVERTIME", to_string(exp));
            std::cout << "Now: " << mil.count() << " Exp:" << exp << std::endl;
            try {
                SendResult sendResult = producer.send(msg);
                std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                          << std::endl;
                this_thread::sleep_for(chrono::seconds(1));
            } catch (MQException e) {
                std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
            }
        }
        auto interval = std::chrono::system_clock::now() - start;
        std::cout << "Send " << count << " messages OK, costs "
                  << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
    
        producer.shutdown();
        std::cout << "=======After sending messages=======" << std::endl;
        return 0;
    }

消费定时消息

  1. 将以下代码拷贝到 DelayConsumerDemo.cpp 文件中,修改相应的参数后,使用 g++ 命令进行编译,生成可执行文件,即可直接运行。

    g++ -o delay_consumer_demo -std=c++11 -lz -lrocketmq DelayConsumerDemo.cpp

  2. 消费定时消息的示例代码如下。
    
    #include <iostream>
    #include <thread>
    #include <chrono>
    #include "DefaultMQPushConsumer.h"
    
    using namespace rocketmq;
    using namespace std;
    
    class ExampleDelayMessageListener : public MessageListenerConcurrently {
    public:
        ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
            for (auto item = msgs.begin(); item != msgs.end(); item++) {
                chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
                chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
                std::cout << "Now: " << mil.count() << " Received Message Topic:" << item->getTopic() << ", MsgId:"
                          << item->getMsgId() << " DelayTime:" << item->getProperty("__STARTDELIVERTIME") << std::endl;
            }
            return CONSUME_SUCCESS;
        }
    };
    
    int main(int argc, char *argv[]) {
        std::cout << "=======Before consuming messages=======" << std::endl;
        //您在阿里云 RocketMQ 控制台上申请的 GID
        DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXX");
        //设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取
        consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        //您在阿里云账号管理控制台中创建的 AccessKeyId 和 AccessKeySecret 用于身份认证,用户渠道,默认值为:ALIYUN
        consumer->setSessionCredentials("AK", "SK", "ALIYUN");
        auto start = std::chrono::system_clock::now();
    
        //register your own listener here to handle the messages received.
        //请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。
        ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
        consumer->subscribe("YOUR DELAY TOPIC", "*");
        consumer->registerMessageListener(messageListener);
    
        //Start this consumer
        //准备工作完成,必须调用启动函数,才可以正常工作。
        // ********************************************
        // 1.确保订阅关系的设置在启动之前完成
        // 2.确保相同 GID 下面的消费者的订阅关系一致
        // *********************************************
        consumer->start();
    
        //Keep main thread running until process finished.
        //请保持线程常驻,不要执行 shutdown 操作
        std::this_thread::sleep_for(std::chrono::seconds(600));
        consumer->shutdown();
        std::cout << "=======After consuming messages======" << std::endl;
        return 0;
    }