全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

订阅消息

更新时间:2018-06-26 14:12:07

本文介绍如何通过 MQ SDK 使用 C/C++ 语言进行消息订阅。

说明:

  • 请确保同一个 Consumer ID 下所有 Consumer 实例的订阅关系保持一致,详见订阅关系一致
  • 关于 TCP 接入点域名,请参见 TCP 接入说明

订阅方式

MQ 支持以下两种订阅方式:

  • 集群订阅:

    用于实现集群消费。集群消费是指同一个 Consumer ID 所标识的所有 Consumer 平均分摊消费消息。 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。

    1. // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • 广播订阅:

    用于实现广播消费。广播消费是指同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。

    1. // 广播订阅方式设置
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);

示例代码

  1. #include "ONSFactory.h"
  2. using namespace ons;
  3. // MyMsgListener:创建消费消息的实例
  4. //pushConsumer 拉取到消息后,会主动调用该实例的 consumer 函数
  5. class MyMsgListener : public MessageListener
  6. {
  7. public:
  8. MyMsgListener()
  9. {
  10. }
  11. virtual ~MyMsgListener()
  12. {
  13. }
  14. virtual Action consume(Message &message, ConsumeContext &context)
  15. {
  16. //自定义消息处理细节
  17. return CommitMessage; //CONSUME_SUCCESS;
  18. }
  19. };
  20. int main(int argc, char* argv[])
  21. {
  22. //pushConsumer 创建和工作需要的参数,必须输入
  23. ONSFactoryProperty factoryInfo;
  24. factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");//您在 MQ 控制台创建的 consumer ID
  25. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//您在 MQ 控制台创建的 msg topic
  26. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//阿里云身份验证,在阿里云服务器管理控制台创建
  27. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX");//阿里云身份验证,在阿里云服务器管理控制台创建
  28. // 集群订阅方式 (默认)
  29. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  30. // 广播订阅方式
  31. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
  32. //create pushConsumer
  33. PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
  34. //指定 pushConsumer 订阅的消息 topic 和 tag,注册消息回调函数
  35. MyMsgListener msglistener;
  36. pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
  37. //start pushConsumer
  38. pushConsumer->start();
  39. //注意:直到不再接收消息,才能调用 shutdown;调用 shutdown 之后,consumer 退出,不能接收到任何消息
  40. //销毁 pushConsumer,在应用退出前,必须销毁 Consumer 对象,否则会导致内存泄露等问题
  41. pushConsumer->shutdown();
  42. return 0;
  43. }
本文导读目录