通过本文您可以了解在表格存储中如何通过TableStoreWriter实现高并发数据写入。
背景介绍
在日志、物联网(例如轨迹追踪或溯源)等场景中,系统会在短时间内产生大量的数据,并将数据写入到数据库中。因此数据库需要提供高并发、高吞吐率的写入性能,满足每秒上万行甚至上百万行的写入吞吐率,而表格存储的 BatchWriteRow 接口限制单次只能写入200行数据。
TableStoreWriter是表格存储基于Java SDK实现的简单易用、高性能的数据导入工具类,它封装了用于高并发、高吞吐率数据导入的接口,可以实现高并发写入数据,同时支持行级别回调以及自定义配置功能。更多信息,请参见附录 1:TableStoreWriter 实现原理。
TableStoreWriter只适用于宽表模型。
适用场景
如果您的业务场景具备以下特点,则可以使用TableStoreWriter进行数据写入,典型的应用场景有日志存储、即时通讯(IM)系统、分布式队列等。
高并发,对吞吐率要求很高
对单条数据的写入延迟没有要求
写入可异步化(可采用生产者消费者模型)
同一条数据可重复写入
准备工作
开通表格存储服务并创建实例。具体操作,请参见开通服务和创建实例。
获取阿里云账号或者RAM用户的AccessKey。具体操作,请参见创建AccessKey。
操作步骤
步骤 1:安装 Tablestore SDK
如果您使用的是Maven项目,请在项目的pom.xml文件中添加如下依赖:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.17.4</version>
</dependency>
更多关于安装Tablestore SDK的信息,请参见安装Tablestore SDK。
步骤 2:初始化 TableStoreWriter
初始化TableStoreWriter时,您需要配置实例和表的信息、身份认证信息,也可以自定义TableStoreWriter的配置参数和回调函数。使用多线程时,建议共用一个TableStoreWriter对象,TableStoreWriter的初始化示例代码如下所示。
TableStoreWriter支持的参数配置和回调函数,请参见附录 2:TableStoreWriter 配置参数和附录 3:TableStoreWriter 回调函数。
private static TableStoreWriter createTablesStoreWriter() {
/**
* 一般情况下保持默认配置即可,您也可以按需自定义 TableStoreWriter 配置。
* 更多参数说明请参见“配置 TableStoreWriter”文档。
* */
WriterConfig config = new WriterConfig();
// 配置一次批量导入的行数上限,默认值为 200。
config.setMaxBatchRowsCount(200);
// 配置最大并发数,默认值为 10。建议保持默认。
config.setConcurrency(10);
/**
* 自定义行级别 Callback。
* 该示例通过成功、失败计数,简单展示回调能力。
* */
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
failedRows.incrementAndGet();
}
};
/** 配置访问凭证。 **/
ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
/**
* 推荐使用内部构建的线程池与 Client,方便用户使用,隔离初始化和释放的逻辑。
* */
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
endpoint, credentials, instanceName, tableName, config, resultCallback);
return writer;
}
步骤 3:写入数据
您可以根据不同的增删改操作构造RowChange,然后将RowChange添加到TableStoreWriter中。
单行写入数据
以下示例代码使用单行写入方式写入1000行数据。
public void writeSingleRowWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Single Row With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
futures.add(future);
}
System.out.println("Write thread finished.");
// 对缓冲区中的数据进行 flush。TableStoreWriter也会根据flushInterval和maxBatchSize的配置决定缓冲区的flush时机。其中flushInterval是根据时间定期进行flush,maxBatchSize是根据缓冲区的数据量决定是否进行flush。
writer.flush();
// 打印Future过程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
批量写入数据
以下示例代码使用批量写入方式写入1000行数据。
public void writeRowListWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Row List With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
List<RowChange> rowChanges = new LinkedList<RowChange>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
rowChanges.add(rowChange);
if (Math.random() > 0.995 || index == rowsCount - 1) {
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
futures.add(future);
rowChanges.clear();
}
}
System.out.println("Write thread finished.");
// 对缓冲区中的数据进行 flush。TableStoreWriter也会根据flushInterval和maxBatchSize的配置决定缓冲区的flush时机。其中flushInterval是根据时间定期进行flush,maxBatchSize是根据缓冲区的数据量决定是否进行flush。
writer.flush();
// 打印Future过程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
步骤 4:关闭 TableStoreWriter
建议您在退出应用程序前手动关闭TableStoreWriter。在TableStoreWriter关闭前,系统会先flush掉缓冲区中的所有数据。
如果在TableStoreWriter关闭过程中或者关闭之后仍然调用addRowChange方法向缓冲区中写入数据,该部分数据不保证会写入表格存储。
// 主动关闭Writer,内部等候所有队列数据写入后,自动关闭client与内部的线程池。
writer.close();
完整示例代码
以下示例用于创建一张新的数据表,并通过并发写入的方式将数据写入到数据表中。
import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;
import com.aliyuncs.exceptions.ClientException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.commons.codec.digest.DigestUtils.md5Hex;
public class TableStoreWriterDemo {
// yourInstanceName 填写您的实例名称
private static String instanceName = "yourInstanceName";
// yourEndpoint 填写您的实例访问地址
private static String endpoint = "yourEndpoint";
// 获取环境变量里的 AccessKey ID 和 AccessKey Secret
private static String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
private static String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
private static String tableName = "<TABLE_NAME>";
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws ClientException {
TableStoreWriterDemo sample = new TableStoreWriterDemo();
/**
* 使用Writer前确保表已存在。
* 1、writer会校验表的存在性.
* 2、校验写入数据是否与表的字段、类型一致。
* */
sample.tryCreateTable();
/**
* 初始化建议使用。
* DefaultTableStoreWriter(
* String endpoint, // 实例的服务地址。
* ServiceCredentials credentials, // 认证信息:含 AK,也支持 token
* String instanceName, // 实例名。
* String tableName, // 表名:一个 writer 仅针对一个表。
* WriterConfig config, // writer 的配置。
* TableStoreCallback<RowChange, RowWriteResult> resultCallback // 行级别回调。
* )
* */
TableStoreWriter writer = sample.createTablesStoreWriter();
/**
* Future使用:单行写
* */
sample.writeSingleRowWithFuture(writer);
/**
* Future使用:批量写
* */
//sample.writeRowListWithFuture(writer);
System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());
/**
* 您需要主动关闭Writer,内部等候所有队列数据写入后,自动关闭 client 与内部的线程池。
* */
writer.close();
}
private static TableStoreWriter createTablesStoreWriter() {
WriterConfig config = new WriterConfig();
// 配置一次批量导入的行数上限,默认值为 200。如果希望一次写入超过 200 行数据,请调大该值。
config.setMaxBatchRowsCount(200);
// 配置最大并发数,默认值为 10。建议保持默认即可。
config.setConcurrency(10);
/**
* 自定义的行级别 Callback。
* 该示例通过成功、失败计数,简单展示回调能力。
* */
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
failedRows.incrementAndGet();
}
};
ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
/**
* 推荐使用内部构建的线程池与 client,方便用户使用,隔离初始化和释放的逻辑。
* */
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
endpoint, credentials, instanceName, tableName, config, resultCallback);
return writer;
}
private static void tryCreateTable() throws ClientException {
SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
try {
ots.deleteTable(new DeleteTableRequest(tableName));
} catch (Exception e) {
}
TableMeta tableMeta = new TableMeta(tableName);
tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
TableOptions tableOptions = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(
tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));
try {
CreateTableResponse res = ots.createTable(request);
} catch (Exception e) {
throw new ClientException(e);
} finally {
ots.shutdown();
}
}
public static void writeSingleRowWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Single Row With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
futures.add(future);
}
System.out.println("Write thread finished.");
writer.flush();
// 打印future过程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
public void writeRowListWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Row List With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
List<RowChange> rowChanges = new LinkedList<RowChange>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
rowChanges.add(rowChange);
if (Math.random() > 0.995 || index == rowsCount - 1) {
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
futures.add(future);
rowChanges.clear();
}
}
System.out.println("Write thread finished.");
writer.flush();
// 打印future过程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
private static void printFutureResult(List<Future<WriterResult>> futures) {
int totalRow = 0;
for (int index = 0; index < futures.size(); index++) {
try {
WriterResult result = futures.get(index).get();
totalRow += result.getTotalCount();
System.out.println(String.format(
"Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
index, result.getFailedRows().size(), result.getSucceedRows().size(),
result.getTotalCount(), totalRow));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
执行结果如下所示。
=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
totalRequestCount=6,
totalRowsCount=1000,
totalSucceedRowsCount=1000,
totalFailedRowsCount=0,
totalSingleRowRequestCount=0,
}
常见问题
使用Java SDK写入数据时报错:The count of attribute columns exceeds the maximum:128
附录
附录 1:TableStoreWriter 实现原理
TableStoreWriter是基于SDK处理层接口重新封装的工具类,它依赖了以下接口。
TableStoreWriter初始化依赖AsyncClient异步接口。
TableStoreWriter导入数据使用BatchWriteRow接口。
TableStoreWriter异常重试依赖RetryStrategy接口。
代码分层架构如下图所示。
TableStoreWriter在接口性能和易用性上做了优化,具备以下特性:
使用异步接口:使用更少的线程,提供更高的并发。
自动数据聚合:在内存中使用缓冲队列,让一次发给表格存储的批量写请求尽量大,提高写入吞吐率。
采用生产者消费者模式:更易于异步化和数据聚集。
使用高性能数据交换队列:选用Disruptor RingBuffer,采用多生产者单消费者的模型,提供更好的性能。
屏蔽复杂请求封装:屏蔽调用BatchWriteRow接口细节,通过预检查自动过滤脏数据(例如主键格式与表预定义的不符、行大小超限、行列数超限等),自动处理请求限制(例如一次批量的行数限制、一次批量的大小限制等)。
行级别Callback:相对于表格存储Java SDK提供的请求级别的Callback,TableStoreWriter提供行级别的Callback,让业务逻辑可以实现行级别数据处理。
行级别重试:请求级别重试失败会根据特定的错误码进行行级别重试,最大程度保证数据写入成功率。
TableStoreWriter的实现和封装细节如下图所示。
附录 2:TableStoreWriter 配置参数
初始化TableStoreWriter时,您可以通过修改 WriterConfig
自定义TableStoreWriter的配置参数。
参数 | 类型 | 说明 |
bucketCount | 整型 | Writer内部的分桶数,默认值为3。一个分桶相当于一个缓冲空间,用于缓存部分数据,此参数可用于提升按序写并发,当未达机器瓶颈时,分桶数与写入速率正相关。 说明 当分桶内的写入模式为并发写时,保持默认配置即可。 |
bufferSize | 整型 | 内存中缓冲队列的大小,默认值为1024。参数值必须为2的指数倍。 |
enableSchemaCheck | 布尔值 | 数据写入到缓冲区时,是否进行schema检查,默认值为true。
|
dispatchMode | 数据写入到缓冲区时,将数据分发到分桶内的模式。取值范围如下:
重要 当分桶数大于等于2时,此参数才有效。 | |
batchRequestType | Writer将缓冲区数据发送到表格存储时,构建的请求类型。取值范围如下:
| |
concurrency | 整型 | Writer将缓冲区数据发送到表格存储时的最大请求并发数,默认值为10。 |
writeMode | Writer将缓冲区数据写入到表格存储时,每个分桶内数据写入到表格存储中的模式。取值范围如下:
| |
allowDuplicatedRowInBatchRequest | 布尔值 | 构建批量请求将数据写入表格存储时,是否允许有主键相同的行,默认值为true。 重要 当数据表上存在二级索引时,表格存储会忽略此参数的配置,不允许有主键相同的行。此时TableStoreWriter在构建请求时会将主键相同的行加入到不同请求中。 |
maxBatchSize | 整型 | 一次批量请求写入表格存储的最大数据量,默认值为4 MB,单位为字节。 |
maxBatchRowsCount | 整型 | 一次批量请求写入表格存储的最大行数,默认值为200,最大值为200。 |
callbackThreadCount | 整型 | Writer内部Callback运行的线程池线程数,默认值为处理器个数。 |
callbackThreadPoolQueueSize | 整型 | Writer内部Callback运行的线程池队列大小,默认值为1024。 |
maxColumnsCount | 整型 | 写入数据到缓冲区时,一行数据的最大列数,默认值为128。 |
maxAttrColumnSize | 整型 | 写入数据到缓冲区时,单一属性列值的最大大小,默认值为2 MB,单位为字节。 |
maxPKColumnSize | 整型 | 写入数据到缓冲区时,单一主键列值的最大大小,默认值为1 KB,单位为字节。 |
flushInterval | 整型 | Writer将缓冲区数据发送到表格存储时,自动flush缓冲区的时间间隔,默认值为10000,单位为毫秒。 |
logInterval | 整型 | Writer将缓冲区数据发送到表格存储时,自动打印任务状态的时间间隔,默认值为10000,单位为毫秒。 |
clientMaxConnections | 整型 | 内部构建Client时使用的最大连接数配置,默认值为300。 |
writerRetryStrategy | 内部构建Client时使用的重试策略,取值范围如下:
|
配置示例
WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);
附录 3:TableStoreWriter 回调函数
TableStoreWriter通过Callback来反馈写入行数据的成功或者失败信息。如果行数据写入成功,则Writer会调用 onCompleted
函数,如果行数据写入失败,则Writer会根据异常的类别调用对应的 onFailed
函数。
以下示例用于统计成功和失败的行数据数量。
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
//统计成功行数。
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
//统计失败行数。
failedRows.incrementAndGet();
}
};