本文介绍如何通过消息队列RocketMQ版的Java SDK订阅消息。

订阅方式

消息队列RocketMQ版支持以下两种订阅方式:

  • 集群订阅

    同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。设置方式如下所示。

    // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)。
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • 广播订阅

    同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。设置方式如下所示。

    // 广播订阅方式设置。
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);               
说明
  • 请确保同一个Group ID下所有Consumer实例的订阅关系保持一致,更多信息,请参见订阅关系一致
  • 两种不同的订阅方式有着不同的功能限制,例如,广播模式不支持顺序消息、不维护消费进度、不支持重置消费位点等,详情请参见集群消费和广播消费

消息获取方式

消息队列RocketMQ版支持以下两种消息获取方式:

  • Push:消息由消息队列RocketMQ版推送至Consumer。
  • Pull:消息由Consumer主动从消息队列RocketMQ版拉取。

Pull Consumer提供了更多接收消息的选择。相比于Push Consumer,您可以使用Pull Consumer更加自由地控制消息拉取。具体的接口信息请参见接口和参数说明

注意 如需使用Pull Consumer,请确保您的消息队列RocketMQ版实例为企业铂金版。

示例代码

具体的示例代码,请以消息队列RocketMQ版代码库为准。以下分别列举Push和Pull两种消息获取方式的示例代码。

  • Push方式
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
    import java.util.Properties;
    
    public class ConsumerTest {
       public static void main(String[] args) {
           Properties properties = new Properties();
            // 您在控制台创建的Group ID。
           properties.put(PropertyKeyConst.GROUP_ID, "XXX");
            // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
           properties.put(PropertyKeyConst.AccessKey, "XXX");
            // Accesskey Secret阿里云身份验证,在阿里云服RAM控制台创建。
           properties.put(PropertyKeyConst.SecretKey, "XXX");
            // 设置TCP接入域名,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
           properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
              // 集群订阅方式(默认)。
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // 广播订阅方式。
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    
           Consumer consumer = ONSFactory.createConsumer(properties);
           consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个Tag。
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
            //订阅另外一个Topic,如需取消订阅该Topic,请删除该部分的订阅代码,重新启动消费端即可。
            consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部Tag。
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
           consumer.start();
           System.out.println("Consumer Started");
       }
    }            
  • Pull方式
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.PullConsumer;
    import com.aliyun.openservices.ons.api.TopicPartition;
    import java.util.List;
    import java.util.Properties;
    import java.util.Set;
    
    public class PullConsumerClient {
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
            // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
            properties.put(PropertyKeyConst.AccessKey, "xxxxxxx");
            // AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。
            properties.put(PropertyKeyConst.SecretKey, "xxxxxxx");
            // 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "xxxxx");
            PullConsumer consumer = ONSFactory.createPullConsumer(properties);
            // 启动Consumer。
            consumer.start();
            // 获取topic-xxx下的所有分区。
            Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
            // 指定需要拉取消息的分区。
            consumer.assign(topicPartitions);
    
            while (true) {
                // 拉取消息,超时时间为3000 ms。
                List<Message> messages = consumer.poll(3000);
                System.out.printf("Received message: %s %n", messages);
            }
        }
    }

    分区和位点的详细说明请参见名词解释

更多信息

消息队列RocketMQ版消费端流控的最佳实践,请参见消息队列RocketMQ客户端流控设计