顺序消息FIFO( First In First Out)是消息队列RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用TCP协议下的C/C SDK收发顺序消息的示例代码。

背景信息

顺序消息分为两类:

  • 全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。
  • 分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。

更多信息,请参见顺序消息

前提条件

您已完成以下操作:

发送顺序消息

发送顺序消息的示例代码如下。

#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;

int main()
{
    // Producer创建和正常工作的参数,必须输入。
    ONSFactoryProperty factoryInfo;
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId,"XXX");// 在控制台创建的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,"XXX");// 设置TCP接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX");// 在控制台创建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent,"XXX");// 消息内容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey,"XXX");// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,"XXX");// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。

    // 创建Producer。
    OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);


    // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
    pProducer->start();

    Message msg(
                // Message Topic
                factoryInfo.getPublishTopics(),
                // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                "TagA",
                // Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                factoryInfo.getMessageContent()
    );

    // 设置代表消息的业务属性,请尽可能全局唯一。
    // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
    // 注意:不设置也不会影响消息正常收发。
    msg.setKey("ORDERID_100");
    // 分区顺序消息中区分不同分区的关键字段。
    // 如果是全局顺序消息,该字段可以设置为任意非空字符串。
  std::string shardingKey ="abc";  // 带有同一Sharding Key的消息会按照顺序发送。
    try
    {
        // 发送消息,只要不抛异常就是成功。
        SendResultONS sendResult = pProducer->send(msg, shardingKey);
    std::cout << "send success" << std::endl;
    }
    catch(ONSClientException & e)
    {
    // 添加对exception的处理。
    }
    // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
    pProducer->shutdown();

    return 0;
}        

订阅顺序消息

订阅顺序消息的示例代码如下。

#include "ONSFactory.h"
using namespace std;
using namespace ons;

// 创建消费消息的实例。
// pushConsumer拉取到消息后,会主动调用该实例的consumeMessage函数。
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
    MyMsgListener()
    {
    }
    virtual ~MyMsgListener()
    {
    }

    virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
    {
        // 根据业务需求,消费消息。
        return Success; // CONSUME_SUCCESS;
    }
};


int main(int argc, char* argv[])
{
    // OrderConsumer创建和工作需要的参数,必须输入以下内容。
    ONSFactoryProperty factoryInfo;
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId,"XXX");// 在控制台创建的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX");// 在控制台创建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey,"XXX");// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,"XXX");// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,"XXX");// 设置TCP接入域名,进入控制台的实例管理页面的“获取接入点信息”查看。


    // 创建orderConsumer。
    OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
    MyMsgListener  msglistener;
    // 指定orderConsumer订阅的消息Topic和消息Tag。
    orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    // 注册消息监听的处理实例,orderConsumer拉取到消息后,会调用该类的consumeMessage函数。

    // 启动orderConsumer。
    orderConsumer->start();

    for(volatile int i = 0; i < 1000000000;   i) {
        //wait
    }

    // 销毁orderConsumer,在应用退出前,必须销毁Consumer对象,否则会导致内存泄露等问题。
    orderConsumer->shutdown();

   return 0;
}