消息队列 RocketMQ 版提供 Java SDK 实现消息发送与订阅,订阅方可通过 Push 或 Pull 的方式从消息队列 RocketMQ 版获取消息。本文介绍消息发送和订阅的接口和参数说明。

背景信息

消息队列 RocketMQ 版支持以下两种消息获取方式:

  • Push:消息由消息队列 RocketMQ 版推送至 Consumer。
  • Pull:消息由 Consumer 主动从消息队列 RocketMQ 版拉取。

Pull Consumer 提供了更多接收消息的选择。相比于 Push Consumer,您可以使用 Pull Consumer 更加自由地控制消息拉取。

注意 如需使用 Pull Consumer,请确保您的消息队列 RocketMQ 版实例为企业铂金版。

通用参数

参数名 参数说明
NAMESRV_ADDR 设置 TCP 协议接入点,从消息队列 RocketMQ 版控制台的实例详情页面获取。
AccessKey 您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证。
SecretKey 您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证。
OnsChannel 用户渠道,默认值为:ALIYUN,聚石塔用户为:CLOUD。

消息发送接口

messagesendinterface

消息发送参数

参数名 参数说明
SendMsgTimeoutMillis 设置消息发送的超时时间,默认值:3000;单位:毫秒。
CheckImmunityTimeInSeconds(事务消息) 设置事务消息第一次回查的最快时间,单位:秒。
shardingKey(顺序消息) 顺序消息中用来计算不同分区的值。

消息订阅接口

图 1. Push 方式接口
consumeinterface
图 2. Pull 方式接口
pull_consumer

Pull 方式的接口说明如下所示。

public interface PullConsumer extends Admin {

    /**
     * 获取某个 Topic 下的分区信息,返回结果是该 Topic 下的所有分区。注意该接口必须在 Pull Consumer Start 以后才能被调用。
     */
    Set<TopicPartition> topicPartitions(String topic);

    /**
     * 指定需要拉取消息的分区,此接口不会参与 Rebalance,用户需要自己保证所有分区得到消费。此外,若多次调用此接口会替换掉原来订阅的分区,并不会增量增加订阅分区的数量
     */
    void assign(Collection<TopicPartition> topicPartitions);

    /**
     * 拉取消息,单次最多拉取maxBatchMessageCount个消息,超时时间用户可以指定,单位为毫秒。
     */
    List<Message> poll(long timeout);

    /**
     * 将指定的分区的消费位点重置到某个指定位置。该位置必须在分区的最小位点和最大位点之间。注意该接口必须在 Pull Consumer Start 以后才能被调用,并且指定的分区必须为订阅的分区。
     */
    void seek(TopicPartition topicPartition, long offset);

    /**
     * 将指定分区的消费位点重置到该分区的最小位点。注意该接口必须在 Pull Consumer Start 以后才能被调用,并且指定的分区必须为订阅的分区。
     */
    void seekToBeginning(TopicPartition topicPartition);

    /**
     * 将指定分区的消费位点重置到该分区的最大位点。注意该接口必须在 Pull Consumer Start 以后才能被调用,并且指定的分区必须为订阅的分区。
     */
    void seekToEnd(TopicPartition topicPartition);

    /**
     * 暂停指定分区的消费
     */
    void pause(Collection<TopicPartition> topicPartitions);

    /**
     * 恢复指定分区的消费
     */
    void resume(Collection<TopicPartition> topicPartitions);

    /**
     * 查找指定分区对应时间戳的位点。该时间戳为消息存储到服务器的时间戳。该位点是指定分区中第一个存储时间戳大于或等于该时间戳消息对应的位点。
     */
    Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);

    /**
     * 获取指定分区的最新消费位点
     */
    Long committed(TopicPartition topicPartition);

    /**
     * 手动提交消费位点,该消费位点首先同步到本地,再由异步线程提交到服务器。
     */
    void commitSync();
    
    interface TopicPartitionChangeListener {
        /**
         * 该方法在某个 Topic 下分区发生变化时被调用,例如在服务端扩缩容时造成 Topic 的分区数量发生变化时回调该方法
         */
        void onChanged(Set<TopicPartition> topicPartitions);
    }
    
    /**
     * 注册监听某个 Topic 分区变化的 TopicPartitionChangeListener。例如注册后在服务端扩缩容时造成 Topic 的分区数量发生变化时回调 listener 的 onChanged 方法,默认最多 5 秒左右延迟。
     */
    void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
}

消息订阅参数

表 1. 通用参数
参数 说明
GROUP_ID 您在消息队列 RocketMQ 版控制台上创建的 Group ID,详情请参见名词解释
MessageModel 设置 Consumer 实例的消费模式,取值说明如下:
  • CLUSTERING(默认值):表示集群消费
  • BROADCASTING:表示广播消费
ConsumeThreadNums 设置 Consumer 实例的消费线程数,默认值:20。
MaxReconsumeTimes 设置消息消费失败的最大重试次数,默认值:16。
ConsumeTimeout 设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败,等下次重新投递再次消费。每个业务需要设置一个合理的值,默认值:15,单位:分钟 。
suspendTimeMillis(顺序消息) 只适用于顺序消息,设置消息消费失败的重试间隔时间。
maxCachedMessageAmount 客户端本地的最大缓存消息数据,默认值:1000,单位:条。
maxCachedMessageSizeInMiB 客户端本地的最大缓存消息大小,取值范围:16 MB ~ 2 GB,默认值:512 MB。
表 2. Pull 方式特有参数
参数 说明
maxCachedMessageSizeInMiB Consumer 单个分区允许在客户端中缓存的最大消息容量,默认值:100 MiB,取值范围:16 MiB ~ 2048 MiB。
说明 请合理取值,设置过大可能会引起客户端 OOM。
autoCommit 是否允许消费位点自动提交,默认为 true。
autoCommitIntervalMillis 消费位点自动提交间隔,默认值:5,单位:秒。
pollTimeoutMillis 每次拉取消息超时时间,默认值:5,单位:秒。

分区和位点的详细说明请参见名词解释

更多信息

消息收发示例代码详情请参见以下文档: