本文将引导您使用表格存储的TableStoreWriter接口实现并发写入数据,突破批量写200条数据的限制。
注意事项
使用多线程时,建议共用一个TableStoreWriter对象。
准备工作
已创建表格存储实例。具体操作,请参见创建实例。
已为阿里云账号或者RAM用户创建AccessKey。具体操作,请参见创建AccessKey。
操作步骤
步骤一:引入TableStoreWriter工具类
TableStoreWriter是表格存储Java SDK提供的工具类。通过安装表格存储Java SDK即可引入TableStoreWriter工具类。具体操作,请参见安装表格存储Java SDK。
步骤二:初始化TableStoreWriter
初始化TableStoreWriter时,您需要配置实例和表信息、授权信息以及TableStoreWriter的相关配置(例如最大并发数、单次批量请求导入的最大行数等)。
关于TableStoreWriter相关配置的更多信息,请参见TableStoreWriter配置项。
TableStoreWriter的初始化示例如下:
private static TableStoreWriter createTablesStoreWriter() {
/**
* 一般情况下保持默认配置即可,您也可以按需自定义 TableStoreWriter 配置。
* 更多参数说明请参见“配置 TableStoreWriter”文档。
* */
WriterConfig config = new WriterConfig();
// 配置一次批量导入的行数上限,默认值为 200。
config.setMaxBatchRowsCount(200);
// 配置最大并发数,默认值为 10。建议保持默认。
config.setConcurrency(10);
/**
* 自定义行级别 Callback。
* 该示例通过成功、失败计数,简单展示回调能力。
* */
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
failedRows.incrementAndGet();
}
};
/** 配置访问凭证。 **/
ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
/**
* 推荐使用内部构建的线程池与 Client,方便用户使用,隔离初始化和释放的逻辑。
* */
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
endpoint, credentials, instanceName, tableName, config, resultCallback);
return writer;
}
步骤三:构造写入请求并执行
根据不同的增删改操作需要构造RowChange,然后将RowChange添加到TableStoreWriter中。
实际使用时,只需使用单行写入或者批量写入中的一种写入方式即可。
单行写入数据
以下示例用于以单行写入方式并发写入1000行数据到数据表。
public void writeSingleRowWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Single Row With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
futures.add(future);
}
System.out.println("Write thread finished.");
// 对缓冲区中的数据进行 flush。TableStoreWriter也会根据flushInterval和maxBatchSize的配置决定缓冲区的flush时机。其中flushInterval是根据时间定期进行flush,maxBatchSize是根据缓冲区的数据量决定是否进行flush。
writer.flush();
// 打印Future过程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
批量写入数据
以下示例用于以批量写入方式并发写入1000行数据到数据表。
public void writeRowListWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Row List With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
List<RowChange> rowChanges = new LinkedList<RowChange>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
rowChanges.add(rowChange);
if (Math.random() > 0.995 || index == rowsCount - 1) {
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
futures.add(future);
rowChanges.clear();
}
}
System.out.println("Write thread finished.");
// 对缓冲区中的数据进行 flush。TableStoreWriter也会根据flushInterval和maxBatchSize的配置决定缓冲区的flush时机。其中flushInterval是根据时间定期进行flush,maxBatchSize是根据缓冲区的数据量决定是否进行flush。
writer.flush();
// 打印Future过程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
步骤四:关闭TableStoreWriter
退出应用程序前,您需要手动关闭TableStoreWriter。在关闭TableStoreWriter时,系统会先flush掉缓冲区中的所有数据。
如果在关闭过程中或者关闭之后仍然调用addRowChange接口向缓冲区中写入数据,则该部分数据不保证会写入表格存储。
// 主动关闭Writer,内部等候所有队列数据写入后,自动关闭client与内部的线程池。
writer.close();
完整示例代码
以下示例用于创建一张新的数据表,并通过并发写入的方式将数据写入到数据表中。
import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;
import com.aliyuncs.exceptions.ClientException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.commons.codec.digest.DigestUtils.md5Hex;
public class TableStoreWriterDemo {
private static String endpoint = "<ENDPOINT>";
private static String instanceName = "<INSTANCE_NAME>";
private static String accessKeyId = System.getenv("OTS_AK_ENV");
private static String accessKeySecret = System.getenv("OTS_SK_ENV");
private static String tableName = "<TABLE_NAME>";
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws ClientException {
TableStoreWriterDemo sample = new TableStoreWriterDemo();
/**
* 使用Writer前确保表已存在。
* 1、writer会校验表的存在性.
* 2、校验写入数据是否与表的字段、类型一致。
* */
sample.tryCreateTable();
/**
* 初始化建议使用。
* DefaultTableStoreWriter(
* String endpoint, // 实例的服务地址。
* ServiceCredentials credentials, // 认证信息:含 AK,也支持 token
* String instanceName, // 实例名。
* String tableName, // 表名:一个 writer 仅针对一个表。
* WriterConfig config, // writer 的配置。
* TableStoreCallback<RowChange, RowWriteResult> resultCallback // 行级别回调。
* )
* */
TableStoreWriter writer = sample.createTablesStoreWriter();
/**
* Future使用:单行写
* */
sample.writeSingleRowWithFuture(writer);
/**
* Future使用:批量写
* */
//sample.writeRowListWithFuture(writer);
System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());
/**
* 您需要主动关闭Writer,内部等候所有队列数据写入后,自动关闭 client 与内部的线程池。
* */
writer.close();
}
private static TableStoreWriter createTablesStoreWriter() {
WriterConfig config = new WriterConfig();
// 配置一次批量导入的行数上限,默认值为 200。如果希望一次写入超过 200 行数据,请调大该值。
config.setMaxBatchRowsCount(200);
// 配置最大并发数,默认值为 10。建议保持默认即可。
config.setConcurrency(10);
/**
* 自定义的行级别 Callback。
* 该示例通过成功、失败计数,简单展示回调能力。
* */
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
failedRows.incrementAndGet();
}
};
ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
/**
* 推荐使用内部构建的线程池与 client,方便用户使用,隔离初始化和释放的逻辑。
* */
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
endpoint, credentials, instanceName, tableName, config, resultCallback);
return writer;
}
private static void tryCreateTable() throws ClientException {
SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
try {
ots.deleteTable(new DeleteTableRequest(tableName));
} catch (Exception e) {
}
TableMeta tableMeta = new TableMeta(tableName);
tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
TableOptions tableOptions = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(
tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));
try {
CreateTableResponse res = ots.createTable(request);
} catch (Exception e) {
throw new ClientException(e);
} finally {
ots.shutdown();
}
}
public static void writeSingleRowWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Single Row With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
futures.add(future);
}
System.out.println("Write thread finished.");
writer.flush();
// 打印future过程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
public void writeRowListWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Row List With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
List<RowChange> rowChanges = new LinkedList<RowChange>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
rowChanges.add(rowChange);
if (Math.random() > 0.995 || index == rowsCount - 1) {
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
futures.add(future);
rowChanges.clear();
}
}
System.out.println("Write thread finished.");
writer.flush();
// 打印future过程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
private static void printFutureResult(List<Future<WriterResult>> futures) {
int totalRow = 0;
for (int index = 0; index < futures.size(); index++) {
try {
WriterResult result = futures.get(index).get();
totalRow += result.getTotalCount();
System.out.println(String.format(
"Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
index, result.getFailedRows().size(), result.getSucceedRows().size(),
result.getTotalCount(), totalRow));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
执行结果示例如下:
=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
totalRequestCount=6,
totalRowsCount=1000,
totalSucceedRowsCount=1000,
totalFailedRowsCount=0,
totalSingleRowRequestCount=0,
}
计费说明
通过TableStoreWriter写入数据时会产生数据写入和数据存储费用。更多信息,请参见计费概述。
常见问题
使用Java SDK写入数据时报错:The count of attribute columns exceeds the maximum:128
相关文档
如果您想了解TableStoreWriter工具类的应用场景、架构详解等信息,请参见TableStoreWriter介绍。
如果要高并发写入时序数据,您可以使用时序Writer实现。具体操作,请参见使用时序Writer写入时序数据。