Java SDK版本说明

本文介绍TCP协议下Java SDK 2.x.x.Final的版本说明,包括使用限制、版本的基本信息、环境要求以及和历史版本相比各功能特性的变更内容。

使用限制

  • 目前仅华东1(杭州)、华北1(青岛)、华北2(北京)、华北3(张家口)、华北5(呼和浩特)、华南1(深圳)、西南1(成都)、中国香港、德国(法兰克福)和印度尼西亚(雅加达)地域支持将客户端升级为2.x.x.Final版本,其他地域请勿将SDK升级到Java SDK 2.x.x Final版本,否则将无法访问云消息队列 RocketMQ 版服务。

  • Java SDK 2.x.x.Final仅支持通过VPC网络访问云消息队列 RocketMQ 版,不支持经典网络访问。

    若您使用存量云消息队列 RocketMQ 版实例并通过经典网络访问,请勿将Java SDK升级到V2.x.x.Final版本,否则将无法访问云消息队列 RocketMQ 版实例。

  • Java SDK 2.x.x.Final仅支持有命名空间的实例,若您使用的实例无命名空间,请勿将客户端版本升级到Java SDK 2.x.x.Final。

    5.x版本实例默认都有命名空间,4.x版本实例可在云消息队列 RocketMQ 版控制台实例详情页面的基础信息区域查看是否有命名空间。

版本信息

发布时间

版本号

下载链接

2023-02-23

2.0.5.Final

ons-client-2.0.5.Final

2022-08-17

2.0.3.Final

ons-client-2.0.3.Final

2022-06-16

2.0.2.Final

ons-client-2.0.2.Final

2021-11-29

2.0.1.Final

ons-client-2.0.1.Final

2021-10-18

2.0.0.Final

ons-client-2.0.0.Final

环境要求

使用V2.x.x版本Java SDK,请确保您使用的JDK版本为Java 8及以上。

V2.0.5功能变更

功能优化

日志增加异步支持。

问题修复

  • 修复批量消费等待时间无法指定的问题。

  • 修复部分安全漏洞。

V2.0.3功能变更

问题修复

修复高版本JDK中,消费线程池无法调节至32个线程以上的问题。

V2.0.2功能变更

问题修复

修复消息发送时,有小概率触发死锁的问题。

V2.0.1功能变更

消息轨迹

补充消息轨迹数据。

V2.0.0功能变更

顺序消息

顺序消息的最大重试次数MaxReconsumeTimes参数的默认值从Integer.MAX变更为16次。超过最大重试次数消息还未被消费成功将直接被投递至死信队列。您可以通过自定义MaxReconsumeTimes参数值修改顺序消息的最大重试次数。

事务消息

若生产者发送消息时LocalTransactionExecutor类为null,则系统会抛出异常,历史版本则不会抛出异常。

广播消费

广播消费模式下,支持使用offsetStore接口的方式定制消费者启动时的消费位点。若未设置,默认和历史版本一致直接从最新消费位点开始消费。

示例代码如下:

public class BroadcastingConsumerExample {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();

        properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId");
        properties.put(PropertyKeyConst.AccessKey, "MyAccessKey");
        properties.put(PropertyKeyConst.SecretKey, "MySecretKey");
        // 设置TCP协议的接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        // 从OffsetStore读取消费位点的功能仅支持广播消费模式。
        properties.put(PropertyKeyConst.MessageModel, MessageModel.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        // 参数1表示1秒钟会调用一次AbstractOffsetStore方法来进行位点的持久化。
        OffsetStore offsetStore = new AbstractOffsetStore(1) {
            @Override
            public Map<TopicPartition, Long> loadOffset() {
            // 实现从外部存储读取位点的逻辑。
            }

            @Override
            public void persistOffset(Map<TopicPartition, Long> offsetTable) {
            // 持久化位点到外部存储的逻辑。
            }
        };
        offsetStore.start();
        consumer.setOffsetStore(offsetStore);

        consumer.subscribe("testBroadcastingTopic", "TagA", new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
                // 实现消息消费相关逻辑。
                return Action.CommitMessage;
            }
        });

        consumer.start();
        Thread.sleep(100000);
        consumer.shutdown();
        offsetStore.shutdown();
    }
}

Push消费

  • 暂不支持普通消息的批量消费功能。

  • 如果设置的消费线程数不在合法区间[1,1000]内,系统会在创建消费者时抛出异常,而不是在启动消费者时抛出异常。

  • 新增消费速度限流功能。为了避免消息洪峰可能对消费端应用产生冲击,您可通过该功能限制消息的消费速度,保护消费端应用。自定义消费速度的示例代码如下:

    说明

    顺序消息的消息重试不受限流控制。

    public class RateLimitConsumerExample {
        public static void main(String[] args) throws InterruptedException {
            Properties properties = new Properties();
    
            properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId");
            properties.put(PropertyKeyConst.AccessKey, "MyAccessKey");
            properties.put(PropertyKeyConst.SecretKey, "MySecretKey");
            // 设置TCP协议的接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
    
            Consumer consumer = ONSFactory.createConsumer(properties);
    
            // 对testTopicA进行消费限流,限制消费端每秒钟消费10条消息。
            consumer.rateLimit("testTopicA", 10);
            consumer.subscribe("testTopic", "TagA", new MessageListener() {
                @Override
                public Action consume(Message message, ConsumeContext context) {
                    // 实现消息消费相关逻辑。
                    return Action.CommitMessage;
                }
            });
    
            consumer.start();
            Thread.sleep(100000);
            consumer.shutdown();
        }
    }

Pull消费

暂不支持Pull消费方式。

日志配置

  • 日志默认路径由~/logs/ons.log变更为~/logs/ons/ons-client.log

  • 日志级别与logback完全对齐。除已有的ERRORWARNINFODEBUG级别之外,增加对OFFTRACEALL级别的支持。

  • 所有日志相关参数的添加方式除了-D方式以外,增加对环境变量的支持。

客户端创建

设置非法的接入点时,系统会在创建发送者或消费者时抛出异常,而不是在启动发送者或消费者时抛出异常。

消息轨迹

使用最新版本SDK收发消息,查询消息轨迹时,新增返回如下轨迹参数。

参数

说明

AccessKey

您的阿里云账号或RAM用户的AccessKey ID,用于标识用户。当您通过SDK或API调用云消息队列 RocketMQ 版资源时,需要使用AccessKey ID进行身份验证。

到达Server

消息到达消息队列RocketMQ版服务端的时间。

预设DeliverAt

定时消息的预计投递时间。

实际AvailableAt

定时消息定时结束的时间。即消息可被消费者消费的开始时间。

Available Time

消息可被消费者消费的开始时间。

提交/回滚时间

事务消息提交或回滚的时间。

到达消费端

消息到达消费者客户端的时间。

等待处理耗时

消息到达消费者客户端,等待线程池分配线程和分配处理资源的耗时。

V2.0.0问题修复

修复RAM角色实现跨云账号STS授权场景下,updateCredential方法调用频率较高时,三元组(AccessKey IDAccessKey SecretSTS Token)更新不具备原子性而导致的鉴权失败问题。

STS授权的SDK示例代码,请参见STS Token配置示例