订阅消息

本文介绍如何通过云消息队列 RocketMQ 版的Java SDK订阅消息。

订阅方式

云消息队列 RocketMQ 版支持以下两种订阅方式:

  • 集群订阅

    同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。设置方式如下所示。

    // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)。
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • 广播订阅

    同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。设置方式如下所示。

    // 广播订阅方式设置。
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);               
说明
  • 请确保同一个Group ID下所有Consumer实例的订阅关系保持一致,更多信息,请参见订阅关系一致

  • 两种不同的订阅方式有着不同的功能限制,例如,广播模式不支持顺序消息、不维护消费进度、不支持重置消费位点等,更多信息,请参见集群消费和广播消费

消息获取方式

云消息队列 RocketMQ 版支持以下两种消息获取方式:

  • Push:消息由云消息队列 RocketMQ 版推送至Consumer。Push方式下,云消息队列 RocketMQ 版还支持批量消费功能,可以将批量消息统一推送至Consumer进行消费。更多信息,请参见批量消费

  • Pull:消息由Consumer主动从云消息队列 RocketMQ 版拉取。

Pull Consumer提供了更多接收消息的选择。相比于Push Consumer,您可以使用Pull Consumer更加自由地控制消息拉取。具体的接口信息请参见接口和参数说明

重要
  • 如需使用Pull Consumer,请确保您的实例为企业铂金版。

  • Pull Consumer不支持公网访问,仅支持VPC网络访问。

示例代码

具体的示例代码,请以云消息队列 RocketMQ 版代码库为准。以下分别列举Push和Pull两种消息获取方式的示例代码。

  • Push方式

    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
    import java.util.Properties;
    
    public class ConsumerTest {
       public static void main(String[] args) {
           Properties properties = new Properties();
           // 您在消息队列RocketMQ版控制台创建的Group ID。
           properties.put(PropertyKeyConst.GROUP_ID, "XXX");
           // 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
           // AccessKey ID,阿里云身份验证标识。
           properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
           // AccessKey Secret,阿里云身份验证密钥。
           properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
           // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
           properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
              // 集群订阅方式(默认)。
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // 广播订阅方式。
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    
           Consumer consumer = ONSFactory.createConsumer(properties);
            //订阅多个Tag。
           consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { 
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
            // 订阅另外一个Topic,如需取消订阅该Topic,请删除该部分的订阅代码,重新启动消费端即可。
            //订阅全部Tag。
            consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { 
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
           consumer.start();
           System.out.println("Consumer Started");
       }
    }            
  • Push方式(批量消费)

    重要

    配置云消息队列 RocketMQ 版的批量消费功能,请升级TCP Java SDK到1.8.7.3或以上版本,详细版本说明和获取方式,请参见Java SDK版本说明

    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.batch.BatchConsumer;
    import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
    import java.util.List;
    import java.util.Properties;
    
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.tcp.example.MqConfig;
    
    public class SimpleBatchConsumer {
    
        public static void main(String[] args) {
            Properties consumerProperties = new Properties();
            // 您在消息队列RocketMQ版控制台创建的Group ID。
            consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
            // 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
            // AccessKey ID,阿里云身份验证标识。
            properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
            // AccessKey Secret,阿里云身份验证密钥。
            properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
            consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);
    
            // 设置批量消费最大消息数量,当指定Topic的消息数量已经攒够128条,SDK立即执行回调进行消费。默认值:32,取值范围:1~1024。
            consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
            // 设置批量消费最大等待时长,当等待时间达到10秒,SDK立即执行回调进行消费。默认值:0,取值范围:0~450,单位:秒。
            consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
    
            BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
            batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {
    
                 @Override
                public Action consume(final List<Message> messages, ConsumeContext context) {
                    System.out.printf("Batch-size: %d\n", messages.size());
                    // 批量消息处理。
                    return Action.CommitMessage;
                }
            });
            //启动batchConsumer。
            batchConsumer.start();
            System.out.println("Consumer start success.");
    
            //等待固定时间防止进程退出。
            try {
                Thread.sleep(200000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }         
  • Pull方式

    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.PullConsumer;
    import com.aliyun.openservices.ons.api.TopicPartition;
    import java.util.List;
    import java.util.Properties;
    import java.util.Set;
    
    public class PullConsumerClient {
        public static void main(String[] args){
            Properties properties = new Properties();
            // 您在消息队列RocketMQ版控制台创建的Group ID。
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
            // 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
            // AccessKey ID,阿里云身份验证标识。
            properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
            // AccessKey Secret,阿里云身份验证密钥。
            properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "xxxxx");
            PullConsumer consumer = ONSFactory.createPullConsumer(properties);
            // 启动Consumer。
            consumer.start();
            // 获取topic-xxx下的所有分区。
            Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
            // 指定需要拉取消息的分区。
            consumer.assign(topicPartitions);
    
            while (true) {
                // 拉取消息,超时时间为3000 ms。
                List<Message> messages = consumer.poll(3000);
                System.out.printf("Received message: %s %n", messages);
            }
        }
    }

    分区和位点的详细说明,请参见基本概念

更多信息

云消息队列 RocketMQ 版消费端流控的最佳实践,请参见消息队列RocketMQ客户端流控设计