Java High-Level SDK

介绍

Java 的 High-Level sdk 一般 称为 client-library,主要分为 Producer 和 Consumer,下面会介绍 Producer 和 Consumer 的相关参数和一些常见的用法。

身份认证

AccessKey(简称AK)是阿里云提供给阿里云用户的访问密钥,用于访问阿里云OpenAPI时的身份验证。AccessKey包括AccessKey IDAccessKey Secret,需妥善保管。AK如果泄露,会威胁该账号下所有资源的安全。访问阿里云OpenAPI时,如果在代码中硬编码明文AK,容易因代码仓库权限管理不当造成AK泄露。

Alibaba Cloud Credentials是阿里云为阿里云开发者用户提供的身份凭证管理工具。配置了Credentials默认凭据链后,访问阿里云OpenAPI时,您无需在代码中硬编码明文AK,可有效保证您账号下云资源的安全。

前提条件

配置方案

本文示例的全部通采用配置环境变量方式获取 AK 信息,更多方式请访问管理访问凭据

重要

使用配置文件的方案时,请确保您系统中不存在环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。否则,配置文件将不生效。

阿里云SDK支持通过定义ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET环境变量来创建默认的访问凭证。调用接口时,程序直接访问凭证,读取您的访问密钥(即AccessKey)并自动完成鉴权。

配置方法

配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

LinuxmacOS系统配置方法

执行以下命令:

export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>

<access_key_id>需替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。

Windows系统配置方法

  1. 新建环境变量文件,添加环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,并写入已准备好的AccessKey IDAccessKey Secret。

  2. 重启Windows系统。

代码示例

EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

Producer 介绍

限制说明

  • Producer 是线程安全的,理论上同一个进程内一个 topic 只需要有一个 Producer 即可

参数介绍

Producer 的参数都通过 ProducerConfig 来设置,如果要设置参数maxAsyncThreadNum,需要调用 ProducerConfig 的 setMaxAsyncThreadNum。

参数名称

类型

是否必须

默认值

描述

maxAsyncThreadNum

16

发送数据的线程池大小

userAgent

String

dcl-xxx

maxRetryCount

int

1

最大重试次数

maxRetryIntervalMs

int

1000

可重试错误的重试间隔,不包含限流报错

maxRetryIntervalMsForLimit

int

100

写数据被限流后的重试间隔

ProducerInterceptor

Object

-

写入数据的时候可以添加拦截器做额外的处理,例如:添加额外的 attribute 信息

HttpConfig

Object

-

Http 相关默认值较多,建议直接查看代码

maxAsyncBufferRecords

int

INT_MAX

异步发送时,最大攒批的数据条数,一般通过 size 控制,所以这里默认值为 INT_MAX

maxAsyncBufferTimeMs

long

10000

异步发送时,最长缓存时间

maxAsyncBufferSize

long

4 * 1024 * 1024

异步发送时,最大攒批 size

maxAsyncQueueSize

long

16

异步发送时,攒批完成正在发送的请求数,超过会阻塞发送接口,主要是防止 OOM

useTTFormat

enableHeartbeat

bool

false

是否发送心跳包,一般情况下都不需要开启

heartbeatGenerator

Object

DefaultBlobHeartbeatGenerator

如果开启发送心跳,如果用户设置了那么会优先使用用户设置的heartbeatGenerator,没有设置默认会使用DefaultBlobHeartbeatGenerator,

Producer 示例

相关依赖

<!-- 零信任凭证相关 -->
<dependency>
	<groupId>com.aliyun</groupId>
	<artifactId>credentials-java</artifactId>
	<version>1.0.2</version>
</dependency>

<dependency>
	<groupId>com.aliyun.datahub</groupId>
	<artifactId>datahub-client-library</artifactId>
	<version>1.4.11</version>
</dependency>

异步写入(推荐)

异步写入的好处是不需要用户攒批,并且攒批的方式可以通过参数进行设置,可以参考上面参数介绍进行调优。

public static void main(String[] args) throws InterruptedException {
	// 通过环境变量获取AK信息
	EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

	String endpoint ="https://dh-cn-hangzhou.aliyuncs.com";
	String projectName = "test_project";
	String topicName = "test_topic";

	// 初始化Producer,这里直接使用默认配置
	ProducerConfig config = new ProducerConfig(endpoint, provider);
	DatahubProducer producer = new DatahubProducer(projectName, topicName, config);

	RecordSchema schema = producer.getTopicSchema();
	// 如果开启了多version schema,这里也可以获取指定version的schema
	// RecordSchema schema = producer.getTopicSchema(3);

	// 对于异步写入,可以根据需要来选择是否注册回调函数
	WriteCallback callback = new WriteCallback() {
		@Override
		public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
			System.out.println("write success");
		}

		@Override
		public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
			System.out.println("write failed");
		}
	};

	for (int i = 0; i < 10000; ++i) {
		try {
            // generate data by schema
            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "hello");
            data.setField("field2", 1234);
            RecordEntry recordEntry = new RecordEntry();
            recordEntry.setRecordData(data);

            producer.sendAsync(recordEntry, callback);
            // 如果不需要关心数据是否发送成功,那么就不需要注册回调,直接发送
            // producer.sendAsync(recordEntry, null);
        } catch (DatahubClientException e) {
            // TODO 处理异常,一般是不可重试错误或者超过重试次数;
            Thread.sleep(1000);
        }
	}

	// 保证退出前,数据全部被发送完
	producer.flush(true);
	producer.close();
}

Hash 写入

如果数据有保序的需求,那么需要根据一些信息进行 hash,相同 hash 值的数据会写入到同一个 shard,单个 shard 的数据是可以保证顺序的,一般 hash 写入建议使用异步的方式写入。

public static void main(String[] args) throws InterruptedException {
    // 通过环境变量获取AK信息
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";

    // 初始化Producer,这里直接使用默认配置
    ProducerConfig config = new ProducerConfig(endpoint, provider);
    DatahubProducer producer = new DatahubProducer(projectName, topicName, config);


    RecordSchema schema = producer.getTopicSchema();
    // 如果开启了多version schema,这里也可以获取指定version的schema
    // RecordSchema schema = producer.getTopicSchema(3);

    // 对于异步写入,可以注册回调函数
    WriteCallback callback = new WriteCallback() {
        @Override
        public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
            System.out.println("write success");
        }

        @Override
        public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
            System.out.println("write failed");
        }
    };

    for (int i = 0; i < 10000; ++i) {
        try {
            // generate data by schema
            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "hello");
            data.setField("field2", 1234);
            RecordEntry recordEntry = new RecordEntry();
            recordEntry.setRecordData(data);
            // 给每条数据设置hash的内容
            recordEntry.setHashKey("test" + i);

            producer.sendAsync(recordEntry, callback, DefaultRecordPartitioner.INSTANCE);
            // 如果不需要关心数据是否发送成功,那么就不需要注册回调,直接发送
            // producer.sendAsync(recordEntry, null, DefaultRecordPartitioner.INSTANCE);
        } catch (DatahubClientException e) {
            // TODO 处理异常,一般是不可重试错误或者超过重试次数;
            Thread.sleep(1000);
        }
    }

    // 保证退出前,数据全部被发送完
    producer.flush(true);
    producer.close();
}

同步写入

如果想自己控制攒批的方式,那可以采用同步写入的方式。

public static void main(String[] args) throws InterruptedException {
    // 通过环境变量获取AK信息
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";

    // 初始化Producer,这里直接使用默认配置
    ProducerConfig config = new ProducerConfig(endpoint, provider);
    DatahubProducer producer = new DatahubProducer(projectName, topicName, config);


    RecordSchema schema = producer.getTopicSchema();
    // 如果开启了多version schema,这里也可以获取指定version的schema
    // RecordSchema schema = producer.getTopicSchema(3);

    List<RecordEntry> recordEntryList = new ArrayList<>();
    for (int i = 0; i < 1000; ++i) {
        // generate data by schema
        TupleRecordData data = new TupleRecordData(schema);
        data.setField("field1", "hello");
        data.setField("field2", 1234);
        RecordEntry recordEntry = new RecordEntry();
        recordEntry.setRecordData(data);
        recordEntryList.add(recordEntry);
    }

    // 写入失败会抛异常,一般是不可重试错误,或者是可重试错误超过了重试次数
    try {
        String shardId = producer.send(recordEntryList);
        System.out.println("write success, shardId: " + shardId);
    } catch (DatahubClientException e) {
        // TODO 处理异常,一般是不可重试错误或者超过重试次数;
    }

    producer.close();
}

Consumer 介绍

Consumer 用于数据的消费,可以自动分配 shard,一般称为协同消费,具体介绍可以参考 协同消费

Consumer 实际请求是批量读取,缓存到本地,然后接口层面是单条数据返回的。

点位维护

Consumer 可以自动维护点位信息,在启动时会自动获取服务端保存的点位,然后从上次保存的点位开始继续消费,在消费的过程中,会周期性的(默认是 10 秒)把客户端的数据点位提交到服务端。具体实现逻辑如下

每一条数据的点位都会对应一个 RecordKey 对象,消费完一条数据后,可以对 RecordKey 进行 ack 操作,ack 后表示这条数据已经消费完,可以更新点位,也可以选择自动 ack。客户端读到数据以后,会把每一条数据的对应的RecordKey按顺序维护到一个 queue 中。后台有一个提交点位到服务端的周期任务,每次会检查队列,如果队首RecordKey 已经 ack,那么会被弹出队列,一直到队首RecordKey 没有被 ack 为止,那么当前队首的点位上一个点位就是本次需要提交到服务端的点位。

常见问题说明

1、 如果客户端消费了某条数据,但是点位没有来得及提交到服务端,这条数据是否会重复消费?

会,但是一般只会发生在异常退出的情况下,正常的调用 close 退出,是可以保证当前点位被提交的。

2、如果有三条数据,点位分别是 1~3,2 因为某些原因没有 ack,但是 1 和 3 已经 ack,这个时候点位会更新到多少?

点位更新到 1,1 已经 ack,2 此时位于队首不会被弹出,所以此时 点位会一直卡在 1。

3、如果某一条数据已经通过 read 读出来,但是一直没有 ack,这个时候 Consumer 是否会再次 read 到这条数据?

不会,并且位点也会一直卡住不更新,所以用户必须保证每一条 read 的数据一定被 ack。如果某一条数据超过一定时间(默认是 60s),继续调用 read 会抛出异常。

限制说明

  • Consumer 是线程安全的,一个进程每个 topic 只需要有一个 Consumer 对象即可

  • Consumer 数量一般不要超过 shard 数量,如果 Consumer 多于 shard 数,那么会有 Consumer 因为分配不到 shard 一直在空跑,但是有其他 Consumer 退出以后,空跑的 Consumer 就因为可以拿到 shard 开始正常运行。

  • 指定 shard 列表消费,使用同一个订阅 id 的不同 Consumer 不能消费同一个 shard

参数介绍

参数名称

类型

是否必须

默认值

描述

maxAsyncThreadNum

16

读取数据的线程池大小

userAgent

String

dcl-xxx

maxRetryCount

int

1

最大重试次数

maxRetryIntervalMs

int

1000

可重试错误的重试间隔,不包含限流报错

maxRetryIntervalMsForLimit

int

100

读数据被限流后的重试间隔

ProducerInterceptor

Object

-

读取数据的时候可以添加拦截器做额外的处理,例如:过滤掉一些敏感信息

HttpConfig

Object

-

Http 相关默认值较多,建议直接查看代码

balanceRead

bool

false

true 表示在当前 consumer 所消费的 shard 中依次发送读请求;false 表示在当前 consumer 所消费的 shard 中选择点位最老的 shard 发送读请求,主要是防止在数据倾斜场景下 shard 点位差距过大

autoCommit

bool

true

是否自动 ack 数据:true 表示数据 read 到以后自动 ack;false 表示数据 read 到以后,需要手动调用一下 RecordEntry.getKey().ack(),否则点位不会往前推动

sessionTimeoutMs

long

60000

consumer 会话最大时间,consumer 需要和服务端一直发送心跳来保证活跃,超过这个时间没有发送心跳,会被服务端视为退出 consumer group,这个 consumer 的 shard 会被分配给其他 consumer

heartbeatRetryCount

int

1

Consumer 发送心跳来保证活跃,心跳发送失败时的重试次数。

fetchNumber

int

500

单次请求读取的最大数据条数

maxBufferRecords

int

500

本地缓存数据条数,不足会向服务端发起请求,设置过大有可能会导致 OOM。

Consumer 示例

协同消费(推荐)

协同消费就是 Consumer Group,服务端会给每个节点动态分配需要消费的 shard,用户只需要关心数据处理即可,不需要关心点位维护和 shard 分配等事项。

自动 ack 消费

每条数据 read 到以后,就会自动 ack 确认,即表示可以更新点位,有一定可能造成数据丢失。

public static void main(String[] args) throws InterruptedException {
    // 通过环境变量获取AK信息
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";
    String subId = "1747966903774M787N";

    ConsumerConfig config = new ConsumerConfig(endpoint, provider);
    DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);

    while (true) {
        RecordEntry recordEntry = null;
        try {
            recordEntry = consumer.read(5000);
            if (recordEntry != null) {
                TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
                // handle data
                System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
            }
        } catch (DatahubClientException e) {
            // TODO 处理异常,一般是不可重试错误或者超过重试次数;
        }
    }
}

手动 ack 消费

如果每一条数据都要求必须消费完了,才能提交点位,那么推荐关闭 autoCommit,每条数据手动 ack。

public static void main(String[] args) throws InterruptedException {
  // 通过环境变量获取AK信息
  EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

  String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
  String projectName = "test_project";
  String topicName = "test_topic";
  String subId = "1747966903774M787N";

  ConsumerConfig config = new ConsumerConfig(endpoint, provider);
  // 设置数据消费成功后手动ack
  config.setAutoCommit(false);
  DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);

  while (true) {
    RecordEntry recordEntry = null;
    try {
      recordEntry = consumer.read(5000);
      if (recordEntry != null) {
        TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
        // handle data
        System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
      }
    } catch (DatahubClientException e) {
      // TODO 处理异常,一般是不可重试错误或者超过重试次数;
    } finally {
      if (recordEntry != null) {
        // 处理完每条数据都要进行ack,否则点位无法推进
        recordEntry.getKey().ack();
      }
    }
  }
}

指定 shard 消费

指定 shard 消费,是需要用户自己维护 shard 的分配,使用同一个订阅 id 的不同 Consumer 不能消费同一个 shard ,否则会无法消费。这里只给出一个自动 ack 的示例,手动 ack 可以参考上面的示例。

public static void main(String[] args) throws InterruptedException {
    // 通过环境变量获取AK信息
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";
    String subId = "1747966903774M787N";
    List<String> shardIds = Arrays.asList("0", "1");

    ConsumerConfig config = new ConsumerConfig(endpoint, provider);
    // 客户端指定好需要消费的shard列表
    DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, shardIds, config);

    while (true) {
        RecordEntry recordEntry = null;
        try {
            recordEntry = consumer.read(5000);
            if (recordEntry != null) {
                TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
                // handle data
                System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
            }
        } catch (DatahubClientException e) {
            // TODO 处理异常,一般是不可重试错误或者超过重试次数;
        }
    }
}