全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

订阅消息

更新时间:2017-08-24 16:49:33

本文介绍如何通过 MQ SDK 进行消息订阅。

请确保同一个 Consumer ID 下所有 Consumer 实例的订阅关系保持一致,具体请参考订阅关系一致文档。

TCP 接入点域名,请前往查看。

MQ 支持两种订阅方式。

  • 集群订阅:同一个 Consumer ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。

    1. // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • 广播订阅:同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。

    1. // 广播订阅方式设置
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

示例代码

  1. public class ConsumerTest {
  2. public static void main(String[] args) {
  3. Properties properties = new Properties();
  4. // 您在控制台创建的 Consumer ID
  5. properties.put(PropertyKeyConst.ConsumerId, "XXX");
  6. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  7. properties.put(PropertyKeyConst.AccessKey, "XXX");
  8. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  9. properties.put(PropertyKeyConst.SecretKey, "XXX");
  10. // 设置 TCP 接入域名(此处以公共云生产环境为例)
  11. properties.put(PropertyKeyConst.ONSAddr,
  12. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  13. // 集群订阅方式 (默认)
  14. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  15. // 广播订阅方式
  16. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
  17. Consumer consumer = ONSFactory.createConsumer(properties);
  18. consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个Tag
  19. public Action consume(Message message, ConsumeContext context) {
  20. System.out.println("Receive: " + message);
  21. return Action.CommitMessage;
  22. }
  23. });
  24. //订阅另外一个Topic
  25. consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部Tag
  26. public Action consume(Message message, ConsumeContext context) {
  27. System.out.println("Receive: " + message);
  28. return Action.CommitMessage;
  29. }
  30. });
  31. consumer.start();
  32. System.out.println("Consumer Started");
  33. }
  34. }

注意:广播消费模式下,控制台无法设置消息堆积报警,无法进行消息堆积查询。因此,也可以创建多个 Consumer ID 来达到广播模式的效果。详情请参考文档多个 Consumer ID 模式

本文导读目录