全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

订阅消息

更新时间:2018-06-19 15:39:39

本文介绍如何通过 MQ SDK 使用 Java 语言进行消息订阅。

说明:

  • 请确保同一个 Consumer ID 下所有 Consumer 实例的订阅关系保持一致,详见订阅关系一致
  • 关于 TCP 接入点域名,请参见TCP 接入说明

订阅方式

MQ 支持以下两种订阅方式:

  • 集群订阅:同一个 Consumer ID 所标识的所有 Consumer 平均分摊消费消息。 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。

    1. // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • 广播订阅:同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。

    1. // 广播订阅方式设置
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

示例代码

  1. import com.aliyun.openservices.ons.api.Action;
  2. import com.aliyun.openservices.ons.api.ConsumeContext;
  3. import com.aliyun.openservices.ons.api.Consumer;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. import com.aliyun.openservices.ons.api.ONSFactory;
  7. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  8. import java.util.Properties;
  9. public class ConsumerTest {
  10. public static void main(String[] args) {
  11. Properties properties = new Properties();
  12. // 您在控制台创建的 Consumer ID
  13. properties.put(PropertyKeyConst.ConsumerId, "XXX");
  14. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  15. properties.put(PropertyKeyConst.AccessKey, "XXX");
  16. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  17. properties.put(PropertyKeyConst.SecretKey, "XXX");
  18. // 设置 TCP 接入域名(此处以公共云生产环境为例)
  19. properties.put(PropertyKeyConst.ONSAddr,
  20. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  21. // 集群订阅方式 (默认)
  22. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  23. // 广播订阅方式
  24. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
  25. Consumer consumer = ONSFactory.createConsumer(properties);
  26. consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个 Tag
  27. public Action consume(Message message, ConsumeContext context) {
  28. System.out.println("Receive: " + message);
  29. return Action.CommitMessage;
  30. }
  31. });
  32. //订阅另外一个 Topic
  33. consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部 Tag
  34. public Action consume(Message message, ConsumeContext context) {
  35. System.out.println("Receive: " + message);
  36. return Action.CommitMessage;
  37. }
  38. });
  39. consumer.start();
  40. System.out.println("Consumer Started");
  41. }
  42. }

注意:

  • 在广播消费模式下,您无法在控制台设置消息堆积报警,也无法进行消息堆积查询。 因此,您也可以创建多个 Consumer ID 来达到广播模式的效果,请参见集群消费和广播消费中《使用集群模式模拟广播》一节。
  • 关于 MQ 消费端流控的最佳实践,请参见MQ 客户端流控设计
本文导读目录