TableStoreReader 是表格存储基于 Java SDK 实现的简单易用、高性能的数据读取工具类,它封装了用于高并发、高吞吐率数据读取的接口,可以实现数据的并发读取,同时支持行级别回调以及自定义配置功能。本文介绍如何使用 TableStoreReader 并发读取数据。
前提条件
为阿里云账号或具有表格存储访问权限的 RAM 用户创建AccessKey。
操作步骤
步骤一:安装 Tablestore SDK
如果您使用的是Maven项目,请在项目的pom.xml文件中添加如下依赖:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.17.4</version>
</dependency>
更多关于安装Tablestore SDK的信息,请参见安装Tablestore SDK。
步骤二:初始化
初始化 TableStoreReader 前,需要先创建表格存储 Client 连接实例,您可以自定义TableStoreReader 的配置参数和回调函数,初始化示例代码如下所示。
说明
使用多线程时,建议共用一个 TableStoreReader 对象。
public static TableStoreReader createReader() {
// 初始化表格存储 Client
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
// TableStoreReader 配置
TableStoreReaderConfig config = new TableStoreReaderConfig();
// 线程池
executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// 回调函数
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
succeedRows.incrementAndGet();
System.out.println(rowReadResult.getRowResult());
}
@Override
public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
failedRows.incrementAndGet();
System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
}
};
return new DefaultTableStoreReader(client, config, executorService, callback);
}
如果不需要回调函数,可以在初始化方法中将回调函数参数设置为 null。
return new DefaultTableStoreReader(client, config, executorService, null);
步骤三:查询数据
使用 TableStoreReader 查询数据前,您需要将行数据的主键信息添加到缓冲区中。
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder() .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row1")) .build(); tableStoreReader.addPrimaryKey("test_table", primaryKey);
如果需要获取查询后的行数据,您可以使用
addPrimaryKeyWithFuture
方法。Future<ReaderResult> readerResult = tableStoreReader.addPrimaryKeyWithFuture("test_table", primaryKey);
您也可以指定查询参数,例如最大版本数、数据版本范围、过滤器等。
RowQueryCriteria rowQueryCriteria = new RowQueryCriteria("test_version"); // 设置最大读取版本数 rowQueryCriteria.setMaxVersions(1); // 设置读取数据版本范围 rowQueryCriteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 86400*1000, System.currentTimeMillis())); // 设置返回的属性列 rowQueryCriteria.addColumnsToGet("col1"); // 设置过滤条件 SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("col1", SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromString("val1")); rowQueryCriteria.setFilter(singleColumnValueFilter); // 添加查询条件 tableStoreReader.setRowQueryCriteria(rowQueryCriteria);
主键信息添加到缓冲区中后,TableStoreReader 按照设置的自动发送时间间隔(默认为 10 秒)将缓冲区中的数据发送到表格存储进行查询,您也可以手动发送缓冲区数据。
同步发送
tableStoreReader.flush();
异步发送
tableStoreReader.send();
步骤四:关闭资源
数据查询完成后,如果不需要进行其它操作,在不影响业务系统运行的情况下,您可以关闭资源。
tableStoreReader.close();
client.shutdown();
executorService.shutdown();
完整示例代码
以下示例代码使用 TableStoreReader 并发查询 test_table 表中的 200 行数据,并在回调函数中打印查询结果。
public class TableStoreReaderExample {
private static final String endpoint = "https://n01k********.cn-hangzhou.ots.aliyuncs.com";
private static final String instanceName = "n01k********";
private static final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
private static AsyncClientInterface client;
private static ExecutorService executorService;
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建 TableStoreReader
TableStoreReader tableStoreReader = createReader();
// 添加查询数据主键
for(int i=0; i<200; i++) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row" + i))
.build();
tableStoreReader.addPrimaryKey("test_table", primaryKey);
}
// 发送缓冲区中的数据
tableStoreReader.flush();
// 等待回调函数执行完成
Thread.sleep(1000L);
System.out.println("Succeed Rows Count: " + succeedRows.get());
System.out.println("Failed Rows Count: " + failedRows.get());
// 关闭资源
tableStoreReader.close();
client.shutdown();
executorService.shutdown();
}
public static TableStoreReader createReader() {
// 初始化表格存储 Client
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
// TableStoreReader 参数配置
TableStoreReaderConfig config = new TableStoreReaderConfig();
// 线程池
executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// 回调函数
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
succeedRows.incrementAndGet();
System.out.println(rowReadResult.getRowResult());
}
@Override
public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
failedRows.incrementAndGet();
System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
}
};
return new DefaultTableStoreReader(client, config, executorService, callback);
}
}
该文章对您有帮助吗?