本文提供使用TCP协议下的社区版C++ SDK来收发定时和延时消息的示例代码供您参考。
背景信息
- 定时消息:Producer将消息发送到云消息队列 RocketMQ 版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
- 延时消息:Producer将消息发送到云消息队列 RocketMQ 版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
更多信息,请参见定时和延时消息。
重要
社区版的Apache RocketMQ和阿里云云消息队列 RocketMQ 版配置方式不同,实现效果也有所差异。社区版的Apache RocketMQ支持延时消息,但不支持定时消息,因此没有专门的定时消息接口。阿里云云消息队列 RocketMQ 版不仅同时支持配置延时消息和定时消息,并且定时和延时时间可以精确到秒级、拥有更高的并发性。建议您优先使用云上定时延时的方式,使用方法,请参考以下步骤。
前提条件
获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey。
发送定时消息
将以下代码拷贝到DelayProducerDemo.cpp文件中,修改相应的参数后,使用g++命令进行编译,生成可执行文件,即可直接运行。
g++ -o delay_producer_demo -std=c++11 -lz -lrocketmq DelayProducerDemo.cpp
发送定时消息的示例代码如下。
#include <iostream> #include <chrono> #include <thread> #include "DefaultMQProducer.h" using namespace std; using namespace rocketmq; int main() { std::cout << "=======Before sending messages=======" << std::endl; //您在阿里云消息队列RocketMQ版上申请的GID。 DefaultMQProducer producer("GID_XXXXXXXXXXX"); //设置TCP协议接入点,从阿里云消息队列RocketMQ版控制台的实例详情页面获取。 producer.setNamesrvAddr("http://MQ_INST_XXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。 //ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET分别为阿里云账号的AccessKey ID和AccessKey Secret,用于身份验证。 //用户渠道,默认值为:ALIYUN。 producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); //请确保参数设置完成之后启动Producer。 producer.start(); auto start = std::chrono::system_clock::now(); int count = 32; for (int i = 0; i < count; ++i) { //发送消息时请设置您在阿里云消息队列RocketMQ版控制台上申请的Topic。 MQMessage msg("YOUR DELAY TOPIC", "HiTAG", "Hello,CPP SDK, Delay Message."); chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch(); chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d); //定时延时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如2020-03-07 16:21:00投递。 //如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 //设置需要延时或者定时的时间,例如当前时间延迟10秒后投递。 long exp = mil.count() + 10000; msg.setProperty("__STARTDELIVERTIME", to_string(exp)); std::cout << "Now: " << mil.count() << " Exp:" << exp << std::endl; try { SendResult sendResult = producer.send(msg); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } auto interval = std::chrono::system_clock::now() - start; std::cout << "Send " << count << " messages OK, costs " << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; producer.shutdown(); std::cout << "=======After sending messages=======" << std::endl; return 0; }
消费定时消息
将以下代码拷贝到DelayConsumerDemo.cpp文件中,修改相应的参数后,使用g++命令进行编译,生成可执行文件,即可直接运行。
g++ -o delay_consumer_demo -std=c++11 -lz -lrocketmq DelayConsumerDemo.cpp
消费定时消息的示例代码如下。
#include <iostream> #include <thread> #include <chrono> #include "DefaultMQPushConsumer.h" using namespace rocketmq; using namespace std; class ExampleDelayMessageListener : public MessageListenerConcurrently { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch(); chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d); std::cout << "Now: " << mil.count() << " Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << " DelayTime:" << item->getProperty("__STARTDELIVERTIME") << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { std::cout << "=======Before consuming messages=======" << std::endl; //您在阿里云消息队列RocketMQ版控制台上申请的GID。 DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXX"); //设置TCP协议接入点,从阿里云消息队列RocketMQ版控制台的实例详情页面获取。 consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。 //ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET分别为阿里云账号的AccessKey ID和AccessKey Secret,用于身份验证。 //用户渠道,默认值为:ALIYUN。 consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); auto start = std::chrono::system_clock::now(); //register your own listener here to handle the messages received. //请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。 ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener(); consumer->subscribe("YOUR DELAY TOPIC", "*"); consumer->registerMessageListener(messageListener); //Start this consumer //准备工作完成,必须调用启动函数,才可以正常工作。 // ******************************************** // 1.确保订阅关系的设置在启动之前完成。 // 2.确保相同GID下面的消费者的订阅关系一致。 // ********************************************* consumer->start(); //Keep main thread running until process finished. //请保持线程常驻,不要执行shutdown操作。 std::this_thread::sleep_for(std::chrono::seconds(600)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }
文档内容是否对您有帮助?