使用TableStoreWriter并发写入数据

通过本文您可以了解在表格存储中如何通过TableStoreWriter实现高并发数据写入。

背景介绍

在日志、物联网(例如轨迹追踪或溯源)等场景中,系统会在短时间内产生大量的数据,并将数据写入到数据库中。因此数据库需要提供高并发、高吞吐率的写入性能,满足每秒上万行甚至上百万行的写入吞吐率,而表格存储的 BatchWriteRow 接口限制单次只能写入200行数据。

TableStoreWriter是表格存储基于Java SDK实现的简单易用、高性能的数据导入工具类,它封装了用于高并发、高吞吐率数据导入的接口,可以实现高并发写入数据,同时支持行级别回调以及自定义配置功能。更多信息,请参见附录 1:TableStoreWriter 实现原理

说明

TableStoreWriter只适用于宽表模型

适用场景

如果您的业务场景具备以下特点,则可以使用TableStoreWriter进行数据写入,典型的应用场景有日志存储、即时通讯(IM)系统、分布式队列等。

  • 高并发,对吞吐率要求很高

  • 对单条数据的写入延迟没有要求

  • 写入可异步化(可采用生产者消费者模型)

  • 同一条数据可重复写入

准备工作

操作步骤

步骤 1:安装 Tablestore SDK

如果您使用的是Maven项目,请在项目的pom.xml文件中添加如下依赖:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.17.4</version>
</dependency>                 

更多关于安装Tablestore SDK的信息,请参见安装Tablestore SDK

步骤 2:初始化 TableStoreWriter

初始化TableStoreWriter时,您需要配置实例和表的信息、身份认证信息,也可以自定义TableStoreWriter的配置参数和回调函数。使用多线程时,建议共用一个TableStoreWriter对象,TableStoreWriter的初始化示例代码如下所示。

TableStoreWriter支持的参数配置和回调函数,请参见附录 2:TableStoreWriter 配置参数附录 3:TableStoreWriter 回调函数
private static TableStoreWriter createTablesStoreWriter() {
    
    /**
     * 一般情况下保持默认配置即可,您也可以按需自定义 TableStoreWriter 配置。
     * 更多参数说明请参见“配置 TableStoreWriter”文档。
     * */
    WriterConfig config = new WriterConfig();
    // 配置一次批量导入的行数上限,默认值为 200。
    config.setMaxBatchRowsCount(200); 
    // 配置最大并发数,默认值为 10。建议保持默认。                         
    config.setConcurrency(10);    
            
    /**
     * 自定义行级别 Callback。
     * 该示例通过成功、失败计数,简单展示回调能力。
     * */
    TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
        @Override
        public void onCompleted(RowChange rowChange, RowWriteResult cc) {
            succeedRows.incrementAndGet();
        }

        @Override
        public void onFailed(RowChange rowChange, Exception ex) {
            failedRows.incrementAndGet();
        }
    };

    /** 配置访问凭证。 **/
    ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);

    /**
     * 推荐使用内部构建的线程池与 Client,方便用户使用,隔离初始化和释放的逻辑。
     * */
    DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
        endpoint, credentials, instanceName, tableName, config, resultCallback);

    return writer;
}

步骤 3:写入数据

您可以根据不同的增删改操作构造RowChange,然后将RowChange添加到TableStoreWriter中。

单行写入数据

以下示例代码使用单行写入方式写入1000行数据。

public void writeSingleRowWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Single Row With Future");
    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
        futures.add(future);
    }

    System.out.println("Write thread finished.");
    // 对缓冲区中的数据进行 flush。TableStoreWriter也会根据flushInterval和maxBatchSize的配置决定缓冲区的flush时机。其中flushInterval是根据时间定期进行flush,maxBatchSize是根据缓冲区的数据量决定是否进行flush。
    writer.flush();
    
    // 打印Future过程。
    // printFutureResult(futures);

    System.out.println("=========================================================[Finish]");
}

批量写入数据

以下示例代码使用批量写入方式写入1000行数据。

public void writeRowListWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Row List With Future");

    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    List<RowChange> rowChanges = new LinkedList<RowChange>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        rowChanges.add(rowChange);
        if (Math.random() > 0.995 || index == rowsCount - 1) {
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
            futures.add(future);
            rowChanges.clear();
        }
    }

    System.out.println("Write thread finished.");
    // 对缓冲区中的数据进行 flush。TableStoreWriter也会根据flushInterval和maxBatchSize的配置决定缓冲区的flush时机。其中flushInterval是根据时间定期进行flush,maxBatchSize是根据缓冲区的数据量决定是否进行flush。
    writer.flush();
    
    // 打印Future过程。
    // printFutureResult(futures);
    
    System.out.println("=========================================================[Finish]");
}

步骤 4:关闭 TableStoreWriter

建议您在退出应用程序前手动关闭TableStoreWriter。在TableStoreWriter关闭前,系统会先flush掉缓冲区中的所有数据。

说明

如果在TableStoreWriter关闭过程中或者关闭之后仍然调用addRowChange方法向缓冲区中写入数据,该部分数据不保证会写入表格存储。

// 主动关闭Writer,内部等候所有队列数据写入后,自动关闭client与内部的线程池。
writer.close();

完整示例代码

以下示例用于创建一张新的数据表,并通过并发写入的方式将数据写入到数据表中。

import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;

import com.aliyuncs.exceptions.ClientException;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.commons.codec.digest.DigestUtils.md5Hex;

public class TableStoreWriterDemo {

    // yourInstanceName 填写您的实例名称
    private static String instanceName = "yourInstanceName";
    // yourEndpoint 填写您的实例访问地址
    private static String endpoint = "yourEndpoint";
    // 获取环境变量里的 AccessKey ID 和 AccessKey Secret
    private static String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    private static String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    private static String tableName = "<TABLE_NAME>";

    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws ClientException {
        TableStoreWriterDemo sample = new TableStoreWriterDemo();

        /**
         * 使用Writer前确保表已存在。
         * 1、writer会校验表的存在性.
         * 2、校验写入数据是否与表的字段、类型一致。
         * */
        sample.tryCreateTable();

        /**
         * 初始化建议使用。
         * DefaultTableStoreWriter(
         *      String endpoint,                                                   // 实例的服务地址。
         *      ServiceCredentials credentials,                                    // 认证信息:含 AK,也支持 token
         *      String instanceName,                                               // 实例名。
         *      String tableName,                                                  // 表名:一个 writer 仅针对一个表。
         *      WriterConfig config,                                               // writer 的配置。
         *      TableStoreCallback<RowChange, RowWriteResult> resultCallback       // 行级别回调。
         * )
         * */
        TableStoreWriter writer = sample.createTablesStoreWriter();

        /**
         * Future使用:单行写
         * */
        sample.writeSingleRowWithFuture(writer);
        /**
         * Future使用:批量写
         * */   
        //sample.writeRowListWithFuture(writer);

        System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
        System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());

        /**
         * 您需要主动关闭Writer,内部等候所有队列数据写入后,自动关闭 client 与内部的线程池。
         * */
        writer.close();
    }

    private static TableStoreWriter createTablesStoreWriter() {

        WriterConfig config = new WriterConfig();
        // 配置一次批量导入的行数上限,默认值为 200。如果希望一次写入超过 200 行数据,请调大该值。
        config.setMaxBatchRowsCount(200); 
        // 配置最大并发数,默认值为 10。建议保持默认即可。                         
        config.setConcurrency(10);                                   

        /**
         * 自定义的行级别 Callback。
         * 该示例通过成功、失败计数,简单展示回调能力。
         * */
        TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
            @Override
            public void onCompleted(RowChange rowChange, RowWriteResult cc) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(RowChange rowChange, Exception ex) {
                failedRows.incrementAndGet();
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);


        /**
         * 推荐使用内部构建的线程池与 client,方便用户使用,隔离初始化和释放的逻辑。
         * */
        DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
                endpoint, credentials, instanceName, tableName, config, resultCallback);

        return writer;
    }


    private static void tryCreateTable() throws ClientException {
        SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        try {
            ots.deleteTable(new DeleteTableRequest(tableName));
        } catch (Exception e) {
        }

        TableMeta tableMeta = new TableMeta(tableName);
        tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
        TableOptions tableOptions = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(
                tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));

        try {
            CreateTableResponse res = ots.createTable(request);
        } catch (Exception e) {
            throw new ClientException(e);
        } finally {
            ots.shutdown();
        }
    }

    public static void writeSingleRowWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Single Row With Future");
        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
            futures.add(future);
        }

        System.out.println("Write thread finished.");
        writer.flush();
        // 打印future过程。
        // printFutureResult(futures);

        System.out.println("=========================================================[Finish]");
    }
    
    public void writeRowListWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Row List With Future");

        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        List<RowChange> rowChanges = new LinkedList<RowChange>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                    .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                    .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                    .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            rowChanges.add(rowChange);
            if (Math.random() > 0.995 || index == rowsCount - 1) {
                Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
                futures.add(future);
                rowChanges.clear();
            }
    }

    System.out.println("Write thread finished.");
    writer.flush();
    // 打印future过程。
    // printFutureResult(futures);
    System.out.println("=========================================================[Finish]");
    }


    private static void printFutureResult(List<Future<WriterResult>> futures) {
        int totalRow = 0;

        for (int index = 0; index < futures.size(); index++) {
            try {
                WriterResult result = futures.get(index).get();
                totalRow += result.getTotalCount();
                System.out.println(String.format(
                        "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
                        index, result.getFailedRows().size(), result.getSucceedRows().size(),
                        result.getTotalCount(), totalRow));

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果如下所示。

=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
    totalRequestCount=6,
    totalRowsCount=1000,
    totalSucceedRowsCount=1000,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}

常见问题

使用Java SDK写入数据时报错:The count of attribute columns exceeds the maximum:128

附录

附录 1:TableStoreWriter 实现原理

TableStoreWriter是基于SDK处理层接口重新封装的工具类,它依赖了以下接口。

  • TableStoreWriter初始化依赖AsyncClient异步接口。

  • TableStoreWriter导入数据使用BatchWriteRow接口。

  • TableStoreWriter异常重试依赖RetryStrategy接口。

代码分层架构如下图所示。

image

TableStoreWriter在接口性能和易用性上做了优化,具备以下特性:

  • 使用异步接口:使用更少的线程,提供更高的并发。

  • 自动数据聚合:在内存中使用缓冲队列,让一次发给表格存储的批量写请求尽量大,提高写入吞吐率。

  • 采用生产者消费者模式:更易于异步化和数据聚集。

  • 使用高性能数据交换队列:选用Disruptor RingBuffer,采用多生产者单消费者的模型,提供更好的性能。

  • 屏蔽复杂请求封装:屏蔽调用BatchWriteRow接口细节,通过预检查自动过滤脏数据(例如主键格式与表预定义的不符、行大小超限、行列数超限等),自动处理请求限制(例如一次批量的行数限制、一次批量的大小限制等)。

  • 行级别Callback:相对于表格存储Java SDK提供的请求级别的Callback,TableStoreWriter提供行级别的Callback,让业务逻辑可以实现行级别数据处理。

  • 行级别重试:请求级别重试失败会根据特定的错误码进行行级别重试,最大程度保证数据写入成功率。

TableStoreWriter的实现和封装细节如下图所示。

image

附录 2:TableStoreWriter 配置参数

初始化TableStoreWriter时,您可以通过修改 WriterConfig 自定义TableStoreWriter的配置参数。

参数

类型

说明

bucketCount

整型

Writer内部的分桶数,默认值为3。一个分桶相当于一个缓冲空间,用于缓存部分数据,此参数可用于提升按序写并发,当未达机器瓶颈时,分桶数与写入速率正相关。

说明

当分桶内的写入模式为并发写时,保持默认配置即可。

bufferSize

整型

内存中缓冲队列的大小,默认值为1024。参数值必须为2的指数倍。

enableSchemaCheck

布尔值

数据写入到缓冲区时,是否进行schema检查,默认值为true。

  • 开启schema检查时,在行数据写入缓冲区前,TableStoreWriter会对该行数据进行如下检查。如果行数据未通过上述检查,则TableStoreWriter会判定行数据为脏数据,不会写入到缓冲区中。

    • 该行的主键的schema是否与表定义的相同。

    • 该行的主键列或属性列的值大小是否超过限制。

    • 该行的属性列的个数是否超过限制。

    • 属性列中是否有列名与主键列相同。

    • 该行的大小是否超过一次批量请求导入的最大数据量限制。

  • 不开启schema检查(设置为false)时,如果缓冲区中的部分行数据为脏数据,则TableStoreWriter将行数据写入到表格存储时,对应行数据会写入失败。

dispatchMode

DispatchMode

数据写入到缓冲区时,将数据分发到分桶内的模式。取值范围如下:

  • HASH_PARTITION_KEY:默认值,基于分区键哈希值做分桶进行分发,保证同分区的数据处于一桶内按序写入。

  • HASH_PRIMARY_KEY:基于主键哈希值做分桶进行分发,保证同主键的数据处于一个桶内按序写入。

  • ROUND_ROBIN:循环遍历每个分桶进行分发。数据随机分散在不同分桶中。

重要

当分桶数大于等于2时,此参数才有效。

batchRequestType

BatchRequestType

Writer将缓冲区数据发送到表格存储时,构建的请求类型。取值范围如下:

  • BATCH_WRITE_ROW:默认值,构建BatchWriteRowRequest。

  • BULK_IMPORT:构建BulkImportRequest。

concurrency

整型

Writer将缓冲区数据发送到表格存储时的最大请求并发数,默认值为10。

writeMode

WriteMode

Writer将缓冲区数据写入到表格存储时,每个分桶内数据写入到表格存储中的模式。取值范围如下:

  • PARALLEL:默认值,并发写。不同桶间并发,同一个桶内也会并行请求。

  • SEQUENTIAL:串行写。不同桶间并发,同一个桶内串行请求。

allowDuplicatedRowInBatchRequest

布尔值

构建批量请求将数据写入表格存储时,是否允许有主键相同的行,默认值为true。

重要

当数据表上存在二级索引时,表格存储会忽略此参数的配置,不允许有主键相同的行。此时TableStoreWriter在构建请求时会将主键相同的行加入到不同请求中。

maxBatchSize

整型

一次批量请求写入表格存储的最大数据量,默认值为4 MB,单位为字节。

maxBatchRowsCount

整型

一次批量请求写入表格存储的最大行数,默认值为200,最大值为200。

callbackThreadCount

整型

Writer内部Callback运行的线程池线程数,默认值为处理器个数。

callbackThreadPoolQueueSize

整型

Writer内部Callback运行的线程池队列大小,默认值为1024。

maxColumnsCount

整型

写入数据到缓冲区时,一行数据的最大列数,默认值为128。

maxAttrColumnSize

整型

写入数据到缓冲区时,单一属性列值的最大大小,默认值为2 MB,单位为字节。

maxPKColumnSize

整型

写入数据到缓冲区时,单一主键列值的最大大小,默认值为1 KB,单位为字节。

flushInterval

整型

Writer将缓冲区数据发送到表格存储时,自动flush缓冲区的时间间隔,默认值为10000,单位为毫秒。

logInterval

整型

Writer将缓冲区数据发送到表格存储时,自动打印任务状态的时间间隔,默认值为10000,单位为毫秒。

clientMaxConnections

整型

内部构建Client时使用的最大连接数配置,默认值为300。

writerRetryStrategy

WriterRetryStrategy

内部构建Client时使用的重试策略,取值范围如下:

  • CERTAIN_ERROR_CODE_NOT_RETRY:默认值,给定的错误码不做重试,其它错误均会重试。不做重试的错误码包括:

    • OTSParameterInvalid

    • OTSConditionCheckFail

    • OTSRequestBodyTooLarge

    • OTSInvalidPK

    • OTSOutOfColumnCountLimit

    • OTSOutOfRowSizeLimit

  • CERTAIN_ERROR_CODE_RETRY:只对给定的错误码进行重试,其他错误均不重试。进行重试的错误码包括:

    • OTSInternalServerError

    • OTSRequestTimeout

    • OTSPartitionUnavailable

    • OTSTableNotReady

    • OTSRowOperationConflict

    • OTSTimeout

    • OTSServerUnavailable

    • OTSServerBusy

配置示例

WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);

附录 3:TableStoreWriter 回调函数

TableStoreWriter通过Callback来反馈写入行数据的成功或者失败信息。如果行数据写入成功,则Writer会调用 onCompleted 函数,如果行数据写入失败,则Writer会根据异常的类别调用对应的 onFailed 函数。

以下示例用于统计成功和失败的行数据数量。

private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
    @Override
    public void onCompleted(RowChange rowChange, RowWriteResult cc) {
        //统计成功行数。
        succeedRows.incrementAndGet();
    }

    @Override
    public void onFailed(RowChange rowChange, Exception ex) {
        //统计失败行数。
        failedRows.incrementAndGet();
    }
};