本文介绍TCP协议下Java SDK 2.x.x.Final的版本说明,包括注意事项、版本的基本信息、环境要求以及和历史版本相比各功能特性的变更内容。

注意事项

目前仅华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华北3(张家口)、华北5(呼和浩特)、华南1(深圳)、西南1(成都)、德国(法兰克福)和印度尼西亚(雅加达)地域支持将客户端升级为2.x.x.Final版本,其他地域请勿将SDK升级到Java SDK 2.x.x Final版本,否则将无法访问消息队列RocketMQ版服务。如有特殊需求,请提交工单咨询。

版本信息

发布时间 版本号 下载链接
2022-08-17 2.0.3.Final ons-client-2.0.3.Final
2022-06-16 2.0.2.Final ons-client-2.0.2.Final
2021-11-29 2.0.1.Final ons-client-2.0.1.Final
2021-10-18 2.0.0.Final ons-client-2.0.0.Final

环境要求

使用V2.x.x版本Java SDK,请确保您使用的JDK版本为Java 8及以上。

V2.0.3功能变更

问题修复

修复高版本JDK中,消费线程池无法调节至32个线程以上的问题。

V2.0.2功能变更

问题修复

修复消息发送时,有小概率触发死锁的问题。

V2.0.1功能变更

消息轨迹

补充消息轨迹数据。

V2.0.0功能变更

顺序消息

顺序消息的最大重试次数MaxReconsumeTimes参数的默认值从Integer.MAX变更为16次。超过最大重试次数消息还未被消费成功将直接被投递至死信队列。您可以通过自定义MaxReconsumeTimes参数值修改顺序消息的最大重试次数。

事务消息

若生产者发送消息时LocalTransactionExecuter类为null,则系统会抛出异常,历史版本则不会抛出异常。

广播消费

广播消费模式下,支持使用offsetStore接口的方式定制消费者启动时的消费位点。若未设置,默认和历史版本一致直接从最新消费位点开始消费。

示例代码如下:
public class BroadcastingConsumerExample {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();

        properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId");
        properties.put(PropertyKeyConst.AccessKey, "MyAccessKey");
        properties.put(PropertyKeyConst.SecretKey, "MySecretKey");
        // 设置TCP协议的接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        // 从OffsetStore读取消费位点的功能仅支持广播消费模式。
        properties.put(PropertyKeyConst.MessageModel, MessageModel.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        // 参数1表示1秒钟会调用一次AbstractOffsetStore方法来进行位点的持久化。
        OffsetStore offsetStore = new AbstractOffsetStore(1) {
            @Override
            public Map<TopicPartition, Long> loadOffset() {
            // 实现从外部存储读取位点的逻辑。
            }

            @Override
            public void persistOffset(Map<TopicPartition, Long> offsetTable) {
            // 持久化位点到外部存储的逻辑。
            }
        };
        offsetStore.start();
        consumer.setOffsetStore(offsetStore);

        consumer.subscribe("testBroadcastingTopic", "TagA", new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
                // 实现消息消费相关逻辑。
                return Action.CommitMessage;
            }
        });

        consumer.start();
        Thread.sleep(100000);
        consumer.shutdown();
        offsetStore.shutdown();
    }
}

Push消费

  • 暂不支持普通消息的批量消费功能。
  • 如果设置的消费线程数不在合法区间[1,1000]内,系统会在创建消费者时抛出异常,而不是在启动消费者时抛出异常。
  • 新增消费速度限流功能。为了避免消息洪峰可能对消费端应用产生应冲击,您可通过该功能限制消息的消费速度,保护消费端应用。自定义消费速度的示例代码如下:
    说明 顺序消息的消息重试不受限流控制。
    public class RateLimitConsumerExample {
        public static void main(String[] args) throws InterruptedException {
            Properties properties = new Properties();
    
            properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId");
            properties.put(PropertyKeyConst.AccessKey, "MyAccessKey");
            properties.put(PropertyKeyConst.SecretKey, "MySecretKey");
            // 设置TCP协议的接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
    
            Consumer consumer = ONSFactory.createConsumer(properties);
    
            // 对testTopicA进行消费限流,限制消费端每秒钟消费10条消息。
            consumer.rateLimit("testTopicA", 10);
            consumer.subscribe("testTopic", "TagA", new MessageListener() {
                @Override
                public Action consume(Message message, ConsumeContext context) {
                    // 实现消息消费相关逻辑。
                    return Action.CommitMessage;
                }
            });
    
            consumer.start();
            Thread.sleep(100000);
            consumer.shutdown();
        }
    }

Pull消费

暂不支持Pull消费方式。

日志配置

  • 日志默认路径由~/logs/ons.log变更为~/logs/ons/ons-client.log
  • 日志级别与logback完全对齐。除已有的ERRORWARNINFODEBUG级别之外,增加对OFFTRACEALL级别的支持。
  • 所有日志相关参数的添加方式除了-D方式以外,增加对环境变量的支持。

客户端创建

设置非法的接入点时,系统会在创建发送者或消费者时抛出异常,而不是在启动发送者或消费者时抛出异常。

消息轨迹

使用最新版本SDK收发消息,查询消息轨迹时,新增返回如下轨迹参数。
参数 说明
AccessKey 您的阿里云账号或RAM用户的AccessKey ID,用于标识用户。当您通过SDK或API调用消息队列RocketMQ版资源时,需要使用AccessKey ID进行身份验证。
到达Server 消息到达消息队列RocketMQ版服务端的时间。
预设DeliverAt 定时消息的预计投递时间。
实际AvailableAt 定时消息定时结束的时间。即消息可被消费者消费的开始时间。
Available Time 消息可被消费者消费的开始时间。
提交/回滚时间 事务消息提交或回滚的时间。
到达消费端 消息到达消费者客户端的时间。
等待处理耗时 消息到达消费者客户端,等待线程池分配线程和分配处理资源的耗时。

V2.0.0问题修复

修复RAM角色实现跨云账号STS授权场景下,updateCredential方法调用频率较高时,三元组(AccessKey IDAccessKey SecretSTS Token)更新不具备原子性而导致的鉴权失败问题。

STS授权的SDK示例代码,请参见STS Token配置示例