云消息队列 RocketMQ 版提供Java SDK实现消息发送与订阅,订阅方可通过Push或Pull的方式从云消息队列 RocketMQ 版获取消息。本文介绍消息发送和订阅的接口和参数说明。
背景信息
云消息队列 RocketMQ 版支持以下两种消息获取方式:
Push:消息由云消息队列 RocketMQ 版推送至Consumer。Push方式下,云消息队列 RocketMQ 版还支持批量消费功能,可以将消息统一批量推送至Consumer进行消费。更多信息,请参见批量消费。
Pull:消息由Consumer主动从云消息队列 RocketMQ 版拉取。
Pull Consumer提供了更多接收消息的选择。相比于Push Consumer,您可以使用Pull Consumer更加自由地控制消息拉取。
如需使用Pull Consumer,请确保您的云消息队列 RocketMQ 版实例为企业铂金版。
通用参数
参数名 | 参数说明 |
NAMESRV_ADDR | 设置TCP协议接入点,从云消息队列 RocketMQ 版控制台的实例详情页面获取。 |
AccessKey | AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。 |
SecretKey | AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。 |
OnsChannel | 用户渠道,默认值为:ALIYUN,聚石塔用户为:CLOUD。 |
消息发送接口
消息发送参数
参数名 | 参数说明 |
SendMsgTimeoutMillis | 设置消息发送的超时时间,单位:毫秒。 |
CheckImmunityTimeInSeconds(事务消息) | 设置事务消息第一次回查的最快时间,单位:秒。 |
shardingKey(顺序消息) | 顺序消息中用来计算不同分区的值。 |
消息订阅接口
Pull方式的接口说明如下所示。
public interface PullConsumer extends Admin {
/**
* 获取某个Topic下的分区信息,返回结果是该Topic下的所有分区。注意该接口必须在Pull Consumer Start以后才能被调用。
*/
Set<TopicPartition> topicPartitions(String topic);
/**
* 指定需要拉取消息的分区,此接口不会参与Rebalance,您需要自己保证所有分区得到消费。此外,若多次调用此接口会替换掉原来订阅的分区,并不会增量增加订阅分区的数量。
*/
void assign(Collection<TopicPartition> topicPartitions);
/**
* 拉取消息,单次最多拉取maxBatchMessageCount个消息,超时时间您可以自定义,单位为毫秒。
*/
List<Message> poll(long timeout);
/**
* 将指定的分区的消费位点重置到某个指定位置。该位置必须在分区的最小位点和最大位点之间。注意该接口必须在Pull Consumer Start以后才能被调用,并且指定的分区必须为订阅的分区。
*/
void seek(TopicPartition topicPartition, long offset);
/**
* 将指定分区的消费位点重置到该分区的最小位点。注意该接口必须在Pull Consumer Start以后才能被调用,并且指定的分区必须为订阅的分区。
*/
void seekToBeginning(TopicPartition topicPartition);
/**
* 将指定分区的消费位点重置到该分区的最大位点。注意该接口必须在Pull Consumer Start以后才能被调用,并且指定的分区必须为订阅的分区。
*/
void seekToEnd(TopicPartition topicPartition);
/**
* 暂停指定分区的消费。
*/
void pause(Collection<TopicPartition> topicPartitions);
/**
* 恢复指定分区的消费。
*/
void resume(Collection<TopicPartition> topicPartitions);
/**
* 查找指定分区对应时间戳的位点。该时间戳为消息存储到服务器的时间戳。该位点是指定分区中第一个存储时间戳大于或等于该时间戳消息对应的位点。
*/
Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);
/**
* 获取指定分区的最新消费位点。
*/
Long committed(TopicPartition topicPartition);
/**
* 手动提交消费位点,该消费位点首先同步到本地,再由异步线程提交到服务器。
*/
void commitSync();
interface TopicPartitionChangeListener {
/**
* 该方法在某个Topic下分区发生变化时被调用,例如在服务端扩缩容时造成Topic的分区数量发生变化时回调该方法。
*/
void onChanged(Set<TopicPartition> topicPartitions);
}
/**
* 注册监听某个Topic分区变化的TopicPartitionChangeListener。例如注册后在服务端扩缩容时造成Topic的分区数量发生变化时回调listener的onChanged方法,默认最多5秒左右延迟。
*/
void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
}
消息订阅参数
参数 | 说明 |
GROUP_ID | 您在云消息队列 RocketMQ 版控制台上创建的Group ID,更多信息,请参见基本概念。 |
MessageModel | 设置Consumer实例的消费模式,取值说明如下:
|
ConsumeThreadNums | 设置Consumer实例的消费线程数,默认值:20。 |
MaxReconsumeTimes | 设置消息消费失败的最大重试次数,默认值:16。 |
ConsumeTimeout | 设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败,等下次重新投递再次消费。每个业务需要设置一个合理的值,默认值:15,单位:分钟。 |
suspendTimeMillis(顺序消息) | 只适用于顺序消息,设置消息消费失败的重试间隔时间。 |
maxCachedMessageAmount | 消费者客户端本地的最大缓存消息数量,取值范围为[100,50000],默认值:5000,单位:条。 该参数在客户端级别生效,限定额会平均分配到订阅的Topic上。例如最大缓存数为1000条,某消费者客户端订阅了2个Topic,则每个Topic将限制缓存500条消息。 考虑到消费者客户端批量拉取消息的场景,实际最大缓存量会略大于限制值。 建议取值:若某个消费者客户端每秒能处理N条消息,则该参数建议设置为2 * N条。 重要 请合理取值,设置过大可能会引起客户端OOM。 |
maxCachedMessageSizeInMiB | 客户端本地的最大缓存消息大小,取值范围:16 MB~2048 MB,默认值:512 MB。 |
参数 | 说明 |
ConsumeMessageBatchMaxSize | 批量消费的最大消息数量,缓存的消息数量达到设置的参数值,云消息队列 RocketMQ 版会将缓存的消息统一推送给消费者进行批量消费。默认值:32,取值范围:1~1024。 |
BatchConsumeMaxAwaitDurationInSeconds | 批量消费的等待时长,等待时长达到参数设置的值,云消息队列 RocketMQ 版会将缓存的消息统一推送给消费者进行批量消费。默认为:0,取值范围:0~450,单位:秒。 |
参数 | 说明 |
maxCachedMessageSizeInMiB | Consumer单个分区允许在客户端中缓存的最大消息容量,默认值:100 MiB,取值范围:16 MiB~2048 MiB。 重要 请合理取值,设置过大可能会引起客户端OOM。 |
autoCommit | 是否允许消费位点自动提交,默认为true。 |
autoCommitIntervalMillis | 消费位点自动提交间隔,默认值:5,单位:秒。 |
pollTimeoutMillis | 每次拉取消息超时时间,默认值:5,单位:秒。 |
分区和位点的详细说明,请参见基本概念。
更多信息
消息收发示例代码的更多信息,请参见以下文档: