本文介绍如何通过消息队列 RocketMQ 版的 Java SDK 订阅消息。

订阅方式

消息队列 RocketMQ 版支持以下两种订阅方式:

  • 集群订阅

    同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。设置方式如下所示。

    // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • 广播订阅

    同一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。设置方式如下所示。

    // 广播订阅方式设置
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);               
说明
  • 请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致,详情请参见订阅关系一致
  • 两种不同的订阅方式有着不同的功能限制,例如,广播模式不支持顺序消息、不维护消费进度、不支持重置消费位点等,详情请参见集群消费和广播消费

消息获取方式

消息队列 RocketMQ 版支持以下两种消息获取方式:

  • Push:消息由消息队列 RocketMQ 版推送至 Consumer。
  • Pull:消息由 Consumer 主动从消息队列 RocketMQ 版拉取。

Pull Consumer 提供了更多接收消息的选择。相比于 Push Consumer,您可以使用 Pull Consumer 更加自由地控制消息拉取。具体的接口信息请参见接口和参数说明

注意 如需使用 Pull Consumer,请确保您的消息队列 RocketMQ 版实例为企业铂金版。

示例代码

具体的示例代码,请以消息队列 RocketMQ 版代码库为准。以下分别列举 Push 和 Pull 两种消息获取方式的示例代码。

  • Push 方式
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
    import java.util.Properties;
    
    public class ConsumerTest {
       public static void main(String[] args) {
           Properties properties = new Properties();
            // 您在控制台创建的 Group ID
           properties.put(PropertyKeyConst.GROUP_ID, "XXX");
            // AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建
           properties.put(PropertyKeyConst.AccessKey, "XXX");
            // AccesskeySecret 阿里云身份验证,在阿里云服务器管理控制台创建
           properties.put(PropertyKeyConst.SecretKey, "XXX");
            // 设置 TCP 接入域名,进入控制台的实例管理页面的获取接入点信息区域查看
           properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
              // 集群订阅方式 (默认)
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // 广播订阅方式
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    
           Consumer consumer = ONSFactory.createConsumer(properties);
           consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个 Tag
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
            //订阅另外一个 Topic,如需取消订阅该 Topic,请删除该部分的订阅代码,重新启动消费端即可
            consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部 Tag
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
           consumer.start();
           System.out.println("Consumer Started");
       }
    }            
  • Pull 方式
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.PullConsumer;
    import com.aliyun.openservices.ons.api.TopicPartition;
    import java.util.List;
    import java.util.Properties;
    import java.util.Set;
    
    public class PullConsumerClient {
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
            // AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建
            properties.put(PropertyKeyConst.AccessKey, "xxxxxxx");
            // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建
            properties.put(PropertyKeyConst.SecretKey, "xxxxxxx");
            // 设置 TCP 接入域名,进入控制台的实例管理页面的获取接入点信息区域查看
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "xxxxx");
            PullConsumer consumer = ONSFactory.createPullConsumer(properties);
            // 启动 Consumer
            consumer.start();
            // 获取 topic-xxx 下的所有分区
            Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
            // 指定需要拉取消息的分区
            consumer.assign(topicPartitions);
    
            while (true) {
                // 拉取消息,超时时间为 3000 ms
                List<Message> messages = consumer.poll(3000);
                System.out.printf("Received message: %s %n", messages);
            }
        }
    }

    分区和位点的详细说明请参见名词解释

更多信息

消息队列 RocketMQ 版消费端流控的最佳实践,请参见消息队列 RocketMQ 客户端流控设计