顺序消息FIFO( First In First Out)是消息队列RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用TCP协议下的C/C SDK收发顺序消息的示例代码。
背景信息
顺序消息分为两类:
- 全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。
- 分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
更多信息,请参见顺序消息。
前提条件
您已完成以下操作:
- 下载C/C++ SDK。更多信息,请参见版本说明。
- 准备环境。请根据您使用的SDK版本做好环境准备工作:
发送顺序消息
发送顺序消息的示例代码如下。
#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;
int main()
{
// Producer创建和正常工作的参数,必须输入。
ONSFactoryProperty factoryInfo;
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId,"XXX");// 在控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,"XXX");// 设置TCP接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX");// 在控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent,"XXX");// 消息内容。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey,"XXX");// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,"XXX");// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
// 创建Producer。
OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
pProducer->start();
Message msg(
// Message Topic
factoryInfo.getPublishTopics(),
// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
"TagA",
// Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
factoryInfo.getMessageContent()
);
// 设置代表消息的业务属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey("ORDERID_100");
// 分区顺序消息中区分不同分区的关键字段。
// 如果是全局顺序消息,该字段可以设置为任意非空字符串。
std::string shardingKey ="abc"; // 带有同一Sharding Key的消息会按照顺序发送。
try
{
// 发送消息,只要不抛异常就是成功。
SendResultONS sendResult = pProducer->send(msg, shardingKey);
std::cout << "send success" << std::endl;
}
catch(ONSClientException & e)
{
// 添加对exception的处理。
}
// 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
pProducer->shutdown();
return 0;
}
订阅顺序消息
订阅顺序消息的示例代码如下。
#include "ONSFactory.h"
using namespace std;
using namespace ons;
// 创建消费消息的实例。
// pushConsumer拉取到消息后,会主动调用该实例的consumeMessage函数。
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
MyMsgListener()
{
}
virtual ~MyMsgListener()
{
}
virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
{
// 根据业务需求,消费消息。
return Success; // CONSUME_SUCCESS;
}
};
int main(int argc, char* argv[])
{
// OrderConsumer创建和工作需要的参数,必须输入以下内容。
ONSFactoryProperty factoryInfo;
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId,"XXX");// 在控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX");// 在控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey,"XXX");// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,"XXX");// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,"XXX");// 设置TCP接入域名,进入控制台的实例管理页面的“获取接入点信息”查看。
// 创建orderConsumer。
OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
MyMsgListener msglistener;
// 指定orderConsumer订阅的消息Topic和消息Tag。
orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
// 注册消息监听的处理实例,orderConsumer拉取到消息后,会调用该类的consumeMessage函数。
// 启动orderConsumer。
orderConsumer->start();
for(volatile int i = 0; i < 1000000000; i) {
//wait
}
// 销毁orderConsumer,在应用退出前,必须销毁Consumer对象,否则会导致内存泄露等问题。
orderConsumer->shutdown();
return 0;
}
在文档使用中是否遇到以下问题
更多建议
匿名提交