使用 TableStoreReader 并发读取数据

TableStoreReader 是表格存储基于 Java SDK 实现的简单易用、高性能的数据读取工具类,它封装了用于高并发、高吞吐率数据读取的接口,可以实现数据的并发读取,同时支持行级别回调以及自定义配置功能。本文介绍如何使用 TableStoreReader 并发读取数据。

前提条件

为阿里云账号或具有表格存储访问权限的 RAM 用户创建AccessKey

操作步骤

步骤一:安装 Tablestore SDK

如果您使用的是Maven项目,请在项目的pom.xml文件中添加如下依赖:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.17.4</version>
</dependency>                 

更多关于安装Tablestore SDK的信息,请参见安装Tablestore SDK

步骤二:初始化

初始化 TableStoreReader 前,需要先创建表格存储 Client 连接实例,您可以自定义TableStoreReader 的配置参数和回调函数,初始化示例代码如下所示。

说明

使用多线程时,建议共用一个 TableStoreReader 对象。

public static TableStoreReader createReader() {
        // 初始化表格存储 Client
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        // TableStoreReader 配置
        TableStoreReaderConfig config = new TableStoreReaderConfig();

        // 线程池
        executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

        // 回调函数
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
                succeedRows.incrementAndGet();
                System.out.println(rowReadResult.getRowResult());
            }

            @Override
            public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
                failedRows.incrementAndGet();
                System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
            }
        };

        return new DefaultTableStoreReader(client, config, executorService, callback);
    }

TableStoreReaderConfig 参数说明

名称

类型

说明

checkTableMeta

boolean

是否开启 Schema 检查,默认值为 true。开启后,数据写入缓冲区之前,TableStoreReader将会进行以下检查。

  • 数据表是否存在。

  • 查询数据的主键 Schema 是否与表的主键相同。

bufferSize

int

缓冲区队列大小,必须为 2 的指数倍,默认值为 1024。

concurrency

int

将缓冲区数据发送到表格存储时的最大请求并发数,默认值为 10。

maxBatchRowsCount

int

一次批量请求读取的最大行数,默认值为 100,最大值为 100。

defaultMaxVersions

int

读取的数据版本数量,默认值为 1,即只读取最新版本的数据。

flushInterval

int

自动发送缓冲区数据到表格存储的时间间隔,默认值为 10000,单位为毫秒。

logInterval

int

发送缓冲区数据到表格存储时,打印任务状态的时间间隔,默认值为 10000,单位为毫秒。

bucketCount

int

桶数量,每个桶相当于一个缓冲区,默认值为 4。

参数设置示例代码如下。

// 设置是否开启 Schema 检查
config.setCheckTableMeta(false);
// 设置缓冲区队列大小
config.setBufferSize(1024);
// 设置并发数
config.setConcurrency(10);
// 设置批量请求最大行数
config.setMaxBatchRowsCount(100);
// 设置读取数据版本数量
config.setDefaultMaxVersions(1);
// 设置 flush 时间间隔
config.setFlushInterval(10000);
// 设置 log 时间间隔
config.setLogInterval(10000);
// 设置 bucket 数量
config.setBucketCount(4);
  • 如果不需要回调函数,可以在初始化方法中将回调函数参数设置为 null。

    return new DefaultTableStoreReader(client, config, executorService, null);

步骤三:查询数据

  1. 使用 TableStoreReader 查询数据前,您需要将行数据的主键信息添加到缓冲区中。

    PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row1"))
            .build();
    tableStoreReader.addPrimaryKey("test_table", primaryKey);
    • 如果需要获取查询后的行数据,您可以使用 addPrimaryKeyWithFuture 方法。

      Future<ReaderResult> readerResult = tableStoreReader.addPrimaryKeyWithFuture("test_table", primaryKey);
    • 您也可以指定查询参数,例如最大版本数、数据版本范围、过滤器等。

      RowQueryCriteria rowQueryCriteria = new RowQueryCriteria("test_version");
      // 设置最大读取版本数
      rowQueryCriteria.setMaxVersions(1);
      // 设置读取数据版本范围
      rowQueryCriteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 86400*1000, System.currentTimeMillis()));
      // 设置返回的属性列
      rowQueryCriteria.addColumnsToGet("col1");
      // 设置过滤条件
      SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("col1", SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromString("val1"));
      rowQueryCriteria.setFilter(singleColumnValueFilter);
      // 添加查询条件
      tableStoreReader.setRowQueryCriteria(rowQueryCriteria);
  2. 主键信息添加到缓冲区中后,TableStoreReader 按照设置的自动发送时间间隔(默认为 10 秒)将缓冲区中的数据发送到表格存储进行查询,您也可以手动发送缓冲区数据。

    • 同步发送

      tableStoreReader.flush();
    • 异步发送

      tableStoreReader.send();

步骤四:关闭资源

数据查询完成后,如果不需要进行其它操作,在不影响业务系统运行的情况下,您可以关闭资源。

tableStoreReader.close();
client.shutdown();
executorService.shutdown();

完整示例代码

以下示例代码使用 TableStoreReader 并发查询 test_table 表中的 200 行数据,并在回调函数中打印查询结果。

public class TableStoreReaderExample {
    private static final String endpoint = "https://n01k********.cn-hangzhou.ots.aliyuncs.com";
    private static final String instanceName = "n01k********";
    private static final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    private static AsyncClientInterface client;
    private static ExecutorService executorService;
    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 创建 TableStoreReader
        TableStoreReader tableStoreReader = createReader();

        // 添加查询数据主键
        for(int i=0; i<200; i++) {
            PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row" + i))
                    .build();
            tableStoreReader.addPrimaryKey("test_table", primaryKey);
        }

        // 发送缓冲区中的数据
        tableStoreReader.flush();

        // 等待回调函数执行完成
        Thread.sleep(1000L);

        System.out.println("Succeed Rows Count: " + succeedRows.get());
        System.out.println("Failed Rows Count: " + failedRows.get());

        // 关闭资源
        tableStoreReader.close();
        client.shutdown();
        executorService.shutdown();
    }

    public static TableStoreReader createReader() {
        // 初始化表格存储 Client
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        // TableStoreReader 参数配置
        TableStoreReaderConfig config = new TableStoreReaderConfig();

        // 线程池
        executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

        // 回调函数
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
                succeedRows.incrementAndGet();
                System.out.println(rowReadResult.getRowResult());
            }

            @Override
            public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
                failedRows.incrementAndGet();
                System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
            }
        };

        return new DefaultTableStoreReader(client, config, executorService, callback);
    }
}