本文介绍如何通过消息队列RocketMQ版的C/C++ SDK订阅消息。

订阅方式

消息队列RocketMQ版支持以下两种订阅方式:

  • 集群订阅

    同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。设置方式如下所示。

    // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • 广播订阅

    同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。设置方式如下所示。

    // 广播订阅方式设置
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
说明
  • 请确保同一个Group ID下所有Consumer实例的订阅关系保持一致,更多信息,请参见订阅关系一致
  • 两种不同的订阅方式有着不同的功能限制,例如,广播模式不支持顺序消息、不维护消费进度、不支持重置消费位点等,详情请参见集群消费和广播消费

示例代码

#include "ONSFactory.h"
using namespace ons;

// MyMsgListener:创建消费消息的实例。
// pushConsumer拉取到消息后,会主动调用该实例的consumer函数。
class MyMsgListener : public MessageListener
{

    public:

        MyMsgListener()
        {
        }

        virtual ~MyMsgListener()
        {
        }

        virtual Action consume(Message &message, ConsumeContext &context)
        {
            // 自定义消息处理细节。
            return CommitMessage; //CONSUME_SUCCESS;
        }
};


int main(int argc, char* argv[])
{

    // pushConsumer创建和工作需要的参数,必须输入。
    ONSFactoryProperty factoryInfo;
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId,"XXX");// 您在控制台创建的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,"XXX"); // 设置TCP接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX");// 您在控制台创建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey,"XXX");// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX");// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
      // 集群订阅方式(默认)
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
      // 广播订阅方式
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);

    //create pushConsumer
    PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // 指定pushConsumer订阅的消息Topic和Tag,注册消息回调函数。
    MyMsgListener  msglistener;
    pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    //start pushConsumer
    pushConsumer->start();

    // 注意:直到不再接收消息,才能调用shutdown;调用shutdown之后,Consumer退出,不能接收到任何消息。

    // 销毁pushConsumer,在应用退出前,必须销毁Consumer对象,否则会导致内存泄露等问题。
    pushConsumer->shutdown();
    return 0;

}

更多信息

消息队列RocketMQ版消费端流控的最佳实践,请参见消息队列RocletMQ客户端流控设计