初始化TableStoreWriter时,您可以按需自定义TableStoreWriter的配置以及回调函数逻辑。本文介绍TableStoreWriter支持自定义的配置和Callback示例。
配置项
初始化TableStoreWriter时,您可以通过修改WriterConfig
自定义TableStoreWriter的配置。
配置示例
WriterConfig的配置示例如下:
WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);
配置说明
参数 | 类型 | 说明 |
bucketCount | 整型 | Writer内部的分桶数。默认值为3。一个分桶相当于一个缓冲空间,用于缓存部分数据。 此参数可用于提升按序写并发,当未达机器瓶颈时,分桶数与写入速率正相关。 当分桶内的写入模式为并发写时,保持默认配置即可。 |
bufferSize | 整型 | 内存中缓冲队列的大小。默认值为1024行。此参数值必须为2的指数倍。 |
enableSchemaCheck | 布尔值 | 在数据写入到缓冲区时,是否进行schema检查。取值范围如下:
|
dispatchMode | 当数据写入到缓冲区时,将数据分发到分桶内的模式。当分桶数大于等于2时,此参数才有效。取值范围如下:
| |
batchRequestType | Writer将缓冲区数据发送到表格存储时,构建的请求类型。取值范围如下:
| |
concurrency | 整型 | Writer将缓冲区数据发送到表格存储时的最大请求并发数。默认值为10。 |
writeMode | Writer将缓冲区数据写入到表格存储时,每个分桶内数据写入到表格存储中的模式。取值范围如下:
| |
allowDuplicatedRowInBatchRequest | 布尔值 | 构建批量请求将数据写入表格存储时,是否允许有主键相同的行。默认值为true。 当数据表上存在二级索引时,表格存储会忽略此参数的配置,不允许有主键相同的行。此时TableStoreWriter在构建请求时会将主键相同的行加入到不同请求中。 |
maxBatchSize | 整型 | 一次批量请求写入表格存储的最大数据量。默认值为4 MB。单位为字节。 |
maxBatchRowsCount | 整型 | 一次批量请求写入表格存储的最大行数。默认值为200。最大值为200。 |
callbackThreadCount | 整型 | Writer内部Callback运行的线程池线程数。默认值为处理器个数。 |
callbackThreadPoolQueueSize | 整型 | Writer内部Callback运行的线程池队列大小。默认值为1024。 |
maxColumnsCount | 整型 | 当写入数据到缓冲区时,一行最大的列数限制。默认值为128列。 |
maxAttrColumnSize | 整型 | 当写入数据到缓冲区时,单一属性列值的最大大小,默认值为2 MB。单位为字节。 |
maxPKColumnSize | 整型 | 当写入数据到缓冲区时,单一主键列值的最大大小。默认值为1 KB。单位为字节。 |
flushInterval | 整型 | Writer将缓冲区数据发送到表格存储时,自动flush缓冲区的时间间隔。默认值为10000。单位为毫秒。 |
logInterval | 整型 | Writer将缓冲区数据发送到表格存储时,自动打印任务状态的时间间隔。默认值为10000。单位为毫秒。 |
clientMaxConnections | 整型 | 内部构建Client时使用的最大连接数配置。默认值为300。 |
writerRetryStrategy | 内部构建Client时使用的重试策略。取值范围如下:
|
Callback
TableStoreWriter通过Callback来反馈写入行数据的成功或者失败信息。如果行数据写入成功,则Writer会调用onCompleted()
函数,如果行数据写入失败,则Writer会根据异常的类别调用对应的onFailed()
函数。
以下示例用于统计成功和失败的行数据数量。
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
//统计成功行数。
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
//统计失败行数。
failedRows.incrementAndGet();
}
};