全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

集群方式订阅消息

更新时间:2017-08-24 17:05:13   分享:   

集群订阅即某个消费者集群只消费指定的Topic,而不是消费所有Topic。

  1. #include "ONSFactory.h"
  2. using namespace ons;
  3. // MyMsgListener:创建消费消息的实例
  4. //pushConsumer拉取到消息后,会主动调用该实例的consume 函数
  5. class MyMsgListener : public MessageListener
  6. {
  7. public:
  8. MyMsgListener()
  9. {
  10. }
  11. virtual ~MyMsgListener()
  12. {
  13. }
  14. virtual Action consume(Message &message, ConsumeContext &context)
  15. {
  16. //自定义消息处理细节
  17. return CommitMessage; //CONSUME_SUCCESS;
  18. }
  19. };
  20. int main(int argc, char* argv[])
  21. {
  22. //pushConsumer创建和工作需要的参数,必须输入
  23. ONSFactoryProperty factoryInfo;
  24. factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");//您在MQ控制台申请的consumerId
  25. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//您在MQ控制台申请的msg topic
  26. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//阿里云身份验证,在阿里云服务器管理控制台创建
  27. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX");//阿里云身份验证,在阿里云服务器管理控制台创建
  28. //create pushConsumer
  29. PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
  30. //指定pushConsumer 订阅的消息topic和tag, 注册消息回调函数
  31. MyMsgListener msglistener;
  32. pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
  33. //start pushConsumer
  34. pushConsumer->start();
  35. //NOTE:直到不再接收消息,才能调用shutdown;调用shutdown之后,consumer退出,不能接收到任何消息
  36. //销毁pushConsumer, 在应用退出前,必须销毁Consumer 对象,否则会导致内存泄露等问题
  37. pushConsumer->shutdown();
  38. return 0;
  39. }
本文导读目录
本文导读目录
以上内容是否对您有帮助?