TableStoreWriter配置项

初始化TableStoreWriter时,您可以按需自定义TableStoreWriter的配置以及回调函数逻辑。本文介绍TableStoreWriter支持自定义的配置和Callback示例。

配置项

初始化TableStoreWriter时,您可以通过修改WriterConfig自定义TableStoreWriter的配置。

配置示例

WriterConfig的配置示例如下:

WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);

配置说明

参数

类型

说明

bucketCount

整型

Writer内部的分桶数。默认值为3。一个分桶相当于一个缓冲空间,用于缓存部分数据。

此参数可用于提升按序写并发,当未达机器瓶颈时,分桶数与写入速率正相关。

当分桶内的写入模式为并发写时,保持默认配置即可。

bufferSize

整型

内存中缓冲队列的大小。默认值为1024行。此参数值必须为2的指数倍。

enableSchemaCheck

布尔值

在数据写入到缓冲区时,是否进行schema检查。取值范围如下:

  • true(默认值):开启schema检查。在行数据写入缓冲区前,TableStoreWriter会对该行数据进行如下检查:

    • 该行的主键的schema是否与表定义的相同。

    • 该行的主键列或属性列的值大小是否超过限制。

    • 该行的属性列的个数是否超过限制。

    • 属性列中是否有列名与主键列相同。

    • 该行的大小是否超过一次批量请求导入的最大数据量限制。

    如果行数据未通过上述检查,则TableStoreWriter会判定行数据为脏数据,不会写入到缓冲区中。

  • false:在行数据写入缓冲区时不检查schema。

    如果缓冲区中的部分行数据为脏数据,则TableStoreWriter将行数据写入到表格存储时,对应行数据会写入失败。

dispatchMode

DispatchMode

当数据写入到缓冲区时,将数据分发到分桶内的模式。当分桶数大于等于2时,此参数才有效。取值范围如下:

  • HASH_PARTITION_KEY(默认值):基于分区键哈希值做分桶进行分发,保证同分区的数据处于一桶内按序写入。

  • HASH_PRIMARY_KEY:基于主键哈希值做分桶进行分发,保证同主键的数据处于一个桶内按序写入。

  • ROUND_ROBIN:循环遍历每个分桶进行分发。数据随机分散在不同分桶中。

batchRequestType

BatchRequestType

Writer将缓冲区数据发送到表格存储时,构建的请求类型。取值范围如下:

  • BATCH_WRITE_ROW(默认值):构建BatchWriteRowRequest。

  • BULK_IMPORT:构建BulkImportRequest。

concurrency

整型

Writer将缓冲区数据发送到表格存储时的最大请求并发数。默认值为10。

writeMode

WriteMode

Writer将缓冲区数据写入到表格存储时,每个分桶内数据写入到表格存储中的模式。取值范围如下:

  • PARALLEL(默认值):并发写。不同桶间并发,同一个桶内也会并行请求。

  • SEQUENTIAL:串行写。不同桶间并发,同一个桶内串行请求。

allowDuplicatedRowInBatchRequest

布尔值

构建批量请求将数据写入表格存储时,是否允许有主键相同的行。默认值为true。

当数据表上存在二级索引时,表格存储会忽略此参数的配置,不允许有主键相同的行。此时TableStoreWriter在构建请求时会将主键相同的行加入到不同请求中。

maxBatchSize

整型

一次批量请求写入表格存储的最大数据量。默认值为4 MB。单位为字节。

maxBatchRowsCount

整型

一次批量请求写入表格存储的最大行数。默认值为200。最大值为200。

callbackThreadCount

整型

Writer内部Callback运行的线程池线程数。默认值为处理器个数。

callbackThreadPoolQueueSize

整型

Writer内部Callback运行的线程池队列大小。默认值为1024。

maxColumnsCount

整型

当写入数据到缓冲区时,一行最大的列数限制。默认值为128列。

maxAttrColumnSize

整型

当写入数据到缓冲区时,单一属性列值的最大大小,默认值为2 MB。单位为字节。

maxPKColumnSize

整型

当写入数据到缓冲区时,单一主键列值的最大大小。默认值为1 KB。单位为字节。

flushInterval

整型

Writer将缓冲区数据发送到表格存储时,自动flush缓冲区的时间间隔。默认值为10000。单位为毫秒。

logInterval

整型

Writer将缓冲区数据发送到表格存储时,自动打印任务状态的时间间隔。默认值为10000。单位为毫秒。

clientMaxConnections

整型

内部构建Client时使用的最大连接数配置。默认值为300。

writerRetryStrategy

WriterRetryStrategy

内部构建Client时使用的重试策略。取值范围如下:

  • CERTAIN_ERROR_CODE_NOT_RETRY(默认值):给定的错误码不做重试,其他错误均会重试。

    不做重试的错误码包括OTSParameterInvalidOTSConditionCheckFailOTSRequestBodyTooLargeOTSInvalidPKOTSOutOfColumnCountLimitOTSOutOfRowSizeLimit

  • CERTAIN_ERROR_CODE_RETRY:只对给定的错误码进行重试,其他错误均不重试。

    进行重试的错误码包括OTSInternalServerErrorOTSRequestTimeoutOTSPartitionUnavailableOTSTableNotReadyOTSRowOperationConflictOTSTimeoutOTSServerUnavailableOTSServerBusy

Callback

TableStoreWriter通过Callback来反馈写入行数据的成功或者失败信息。如果行数据写入成功,则Writer会调用onCompleted()函数,如果行数据写入失败,则Writer会根据异常的类别调用对应的onFailed()函数。

以下示例用于统计成功和失败的行数据数量。

private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
    @Override
    public void onCompleted(RowChange rowChange, RowWriteResult cc) {
        //统计成功行数。
        succeedRows.incrementAndGet();
    }

    @Override
    public void onFailed(RowChange rowChange, Exception ex) {
        //统计失败行数。
        failedRows.incrementAndGet();
    }
};