收发顺序消息

顺序消息FIFO( First In First Out)是云消息队列 RocketMQ 版提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用TCP协议下的C/C++ SDK收发顺序消息的示例代码。

背景信息

顺序消息分为两类:

  • 全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。

  • 分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。

更多信息,请参见顺序消息

前提条件

您已完成以下操作:

  • 下载C/C++ SDK。更多信息,请参见版本说明

  • 准备环境。更多信息,请参见环境准备(v1.x.x)

  • 创建资源。代码中涉及的资源信息,例如实例、Topic和Group ID等,需要在控制台上提前创建。更多信息,请参见创建资源

  • 获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey

发送顺序消息

重要

云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果发送方有多个生产者或者有多个线程并发发送消息,则此时只能以到达云消息队列 RocketMQ 版服务端的时序作为消息顺序的依据,和业务侧的发送顺序未必一致。

发送顺序消息的示例代码如下。

#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;

int main()
{
    //Producer创建和正常工作的参数,必须输入。
    ONSFactoryProperty factoryInfo;
    //您在消息队列RocketMQ版控制台创建的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    //设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); 
    //您在消息队列RocketMQ版控制台创建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    //消息内容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
    //AccessKey ID,阿里云身份验证标识。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		//AccessKey Secret,阿里云身份验证密钥。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));

    //创建Producer。
    OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);


    //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
    pProducer->start();

    Message msg(
                //Message Topic
                factoryInfo.getPublishTopics(),
                //Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                "TagA",
                //Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                factoryInfo.getMessageContent()
    );

    // 设置代表消息的业务属性,请尽可能全局唯一。
    // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
    // 注意:不设置也不会影响消息正常收发。
    msg.setKey("ORDERID_100");
    // 分区顺序消息中区分不同分区的关键字段。
    // 如果是全局顺序消息,该字段可以设置为任意非空字符串。
    std::string shardingKey = "abc";  
    //带有同一Sharding Key的消息会按照顺序发送。
    try
    {
        //发送消息,只要不抛异常就是成功。
        SendResultONS sendResult = pProducer->send(msg, shardingKey);
    std::cout << "send success" << std::endl;
    }
    catch(ONSClientException & e)
    {
        //添加对exception的处理。
    }
    // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
    pProducer->shutdown();

    return 0;
}           

订阅顺序消息

订阅顺序消息的示例代码如下。

#include "ONSFactory.h"
using namespace std;
using namespace ons;

//创建消费消息的实例。
//pushConsumer拉取到消息后,会主动调用该实例的consumeMessage函数。
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
    MyMsgListener()
    {
    }
    virtual ~MyMsgListener()
    {
    }

    virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
    {
        //根据业务需求,消费消息。
        return Success; //CONSUME_SUCCESS;
    }
};


int main(int argc, char* argv[])
{
    //OrderConsumer创建和工作需要的参数,必须输入。
    ONSFactoryProperty factoryInfo;
    //您在消息队列RocketMQ版控制台创建的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
    //您在消息队列RocketMQ版控制台创建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
    //AccessKey ID,阿里云身份验证标识。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		//AccessKey Secret,阿里云身份验证密钥。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    //设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");


    //创建orderConsumer。
    OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
    MyMsgListener  msglistener;
    //指定orderConsumer订阅的消息Topic和消息Tag。
    orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    //注册消息监听的处理实例,orderConsumer拉取到消息后,会调用该类的consumeMessage函数。

    //启动orderConsumer。
    orderConsumer->start();

    for(volatile int i = 0; i < 1000000000; ++i) {
        //wait
    }

    //销毁orderConsumer, 在应用退出前,必须销毁Consumer对象,否则会导致内存泄露等问题。
    orderConsumer->shutdown();

   return 0;
}