表格存储提供了单行读取、批量读取、范围读取、迭代读取和并行读取的查询方式用于读取数据表中数据。数据写入到数据表后,您可以选择所需数据查询方式进行数据读取。
如果需要了解表格存储各场景的应用案例,请参见快速玩转Tablestore入门与实战。
查询方式
表格存储提供的数据读取接口包括GetRow、BatchGetRow和GetRange。读取数据时,请根据实际查询场景使用相应查询方式读取数据。
当要读取带有自增主键列的表数据时,请确保已获取到包含自增主键列值在内的完整主键。更多信息,请参见主键列自增。如果未记录自增主键列的值,您可以使用范围读取数据按照第一个主键列确定范围读取数据。
查询方式 | 说明 | 适用场景 |
调用GetRow接口读取一行数据。 | 适用于能确定完整主键且要读取行数较少的场景。 | |
调用BatchGetRow接口一次请求读取多行数据或者一次对多张表进行读取。 BatchGetRow操作由多个GetRow子操作组成,构造子操作的过程与使用GetRow接口时相同。 | 适用于能确定完整主键,且要读取行数较多或者要读取多个表中数据的场景。 | |
调用GetRange接口读取一个范围内的数据。 GetRange操作支持按照确定范围进行正序读取和逆序读取,可以设置要读取的行数。如果范围较大,已扫描的行数或者数据量超过一定限制,会停止扫描,并返回已获取的行和下一个主键信息。您可以根据返回的下一个主键信息,继续发起请求,获取范围内剩余的行。 | 适用于能确定完整主键范围或者主键前缀的场景。 重要 如果不能确定主键前缀,您也可以通过设置完整主键范围均为虚拟点INF_MIN和INF_MAX进行全表数据扫描,但是执行此操作会消耗较多计算资源,请谨慎使用。 | |
通过createRangeIterator接口迭代读取数据。 | 适用于能确定完整主键范围或者主键前缀,且需要迭代读取的场景。 | |
TableStoreReader是表格存储Java SDK提供的工具类,封装了BatchGetRow接口,可以实现并发查询表中数据。同时支持多表查询、查询状态统计、行级别回调和自定义配置功能。 | 适用于能确定完整主键,且要读取行数较多或者要读取多个表中数据的场景。 |
前提条件
已初始化OTSClient。具体操作,请参见初始化OTSClient。
已创建数据表并写入数据。
读取单行数据
调用GetRow接口读取一行数据。读取的结果可能有如下两种:
如果该行存在,则返回该行的各主键列以及属性列。
如果该行不存在,则返回中不包含行,并且不会报错。
参数
参数 | 说明 |
tableName | 数据表名称。 |
primaryKey | 行的主键。主键包括主键列名、主键类型和主键值。 重要 设置的主键个数和类型必须和表的主键个数和类型一致。 |
columnsToGet | 读取的列集合,列名可以是主键列或属性列。
说明
|
maxVersions | 最多读取的版本数。 重要 maxVersions与timeRange必须至少设置一个。
|
timeRange | 读取版本号范围或特定版本号的数据。更多信息,请参见TimeRange。 重要 maxVersions与timeRange必须至少设置一个。
timestamp和 时间戳的单位为毫秒,最小值为0,最大值为 |
filter | 使用过滤器,在服务端对读取结果再进行一次过滤,只返回符合过滤器中条件的数据行。更多信息,请参见过滤器。 说明 当columnsToGet和filter同时使用时,执行顺序是先获取columnsToGet指定的列,再在返回的列中进行条件过滤。 |
示例
读取数据时,您可以指定要读取的数据版本、要读取的列、过滤器、正则过滤等。
读取最新版本数据和指定列
以下示例用于读取数据表中的一行数据,设置读取最新版本的数据和读取指定的列。
private static void getRow(SyncClient client, String pkValue) {
//构造主键。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
//读取一行数据,设置数据表名称。
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>", primaryKey);
//设置读取最新版本。
criteria.setMaxVersions(1);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("读取完毕,结果为: ");
System.out.println(row);
//设置读取某些列。
criteria.addColumnsToGet("Col0");
getRowResponse = client.getRow(new GetRowRequest(criteria));
row = getRowResponse.getRow();
System.out.println("读取完毕,结果为:");
System.out.println(row);
}
读取数据时使用过滤器
以下示例用于读取数据表中的一行数据,设置读取最新版本的数据以及根据Col0列的值过滤数据。
private static void getRow(SyncClient client, String pkValue) {
//构造主键。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
//读取一行数据,设置数据表名称。
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>", primaryKey);
//设置读取最新版本。
criteria.setMaxVersions(1);
//设置过滤器,当Col0列的值为0时,返回该行。
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
//如果Col0列不存在,则不返回该行。
singleColumnValueFilter.setPassIfMissing(false);
criteria.setFilter(singleColumnValueFilter);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("读取完毕,结果为: ");
System.out.println(row);
}
读取数据时使用正则过滤
以下示例用于读取数据表一行中Col1列的数据,并对该列的数据执行正则过滤。
private static void getRow(SyncClient client, String pkValue) {
//设置数据表名称。
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>");
//构造主键。
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue))
.build();
criteria.setPrimaryKey(primaryKey);
// 设置读取最新版本。
criteria.setMaxVersions(1);
// 设置过滤器,当cast<int>(regex(Col1)) > 100时,返回该行。
RegexRule regexRule = new RegexRule("t1:([0-9]+),", RegexRule.CastType.VT_INTEGER);
SingleColumnValueRegexFilter filter = new SingleColumnValueRegexFilter("Col1",
regexRule,SingleColumnValueRegexFilter.CompareOperator.GREATER_THAN, ColumnValue.fromLong(100));
criteria.setFilter(filter);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("读取完毕,结果为: ");
System.out.println(row);
}
批量读取数据
调用BatchGetRow接口一次请求读取多行数据,也支持一次对多张表进行读取。BatchGetRow由多个GetRow子操作组成。构造子操作的过程与使用GetRow接口时相同。
BatchGetRow的各个子操作独立执行,表格存储会分别返回各个子操作的执行结果。
注意事项
批量读取的所有行采用相同的参数条件,例如
ColumnsToGet=[colA]
,则要读取的所有行都只读取colA列。由于批量读取可能存在部分行失败的情况,失败行的错误信息在返回的BatchGetRowResponse中,但并不抛出异常。因此调用BatchGetRow接口时,需要检查返回值,可通过BatchGetRowResponse的isAllSucceed方法判断是否所有行都获取成功;通过BatchGetRowResponse的getFailedRows方法获取失败行的信息。
BatchGetRow操作单次支持读取的最大行数为100行。
参数
更多信息,请参见读取单行数据参数。
示例
以下示例用于读取10行,设置版本条件、要读取的列、过滤器等。
private static void batchGetRow(SyncClient client) {
//设置数据表名称。
MultiRowQueryCriteria multiRowQueryCriteria = new MultiRowQueryCriteria("<TABLE_NAME>");
//加入10个要读取的行。
for (int i = 0; i < 10; i++) {
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("pk" + i));
PrimaryKey primaryKey = primaryKeyBuilder.build();
multiRowQueryCriteria.addRow(primaryKey);
}
//添加条件。
multiRowQueryCriteria.setMaxVersions(1);
multiRowQueryCriteria.addColumnsToGet("Col0");
multiRowQueryCriteria.addColumnsToGet("Col1");
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
singleColumnValueFilter.setPassIfMissing(false);
multiRowQueryCriteria.setFilter(singleColumnValueFilter);
BatchGetRowRequest batchGetRowRequest = new BatchGetRowRequest();
//BatchGetRow支持读取多个表的数据,一个multiRowQueryCriteria对应一个表的查询条件,可以添加多个multiRowQueryCriteria。
batchGetRowRequest.addMultiRowQueryCriteria(multiRowQueryCriteria);
BatchGetRowResponse batchGetRowResponse = client.batchGetRow(batchGetRowRequest);
System.out.println("是否全部成功:" + batchGetRowResponse.isAllSucceed());
System.out.println("读取完毕,结果为: ");
for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getSucceedRows()) {
System.out.println(rowResult.getRow());
}
if (!batchGetRowResponse.isAllSucceed()) {
for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getFailedRows()) {
System.out.println("失败的行:" + batchGetRowRequest.getPrimaryKey(rowResult.getTableName(), rowResult.getIndex()));
System.out.println("失败原因:" + rowResult.getError());
}
/**
* 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试。此处只给出构造重试请求的部分。
* 推荐的重试方法是使用SDK的自定义重试策略功能,支持对batch操作的部分行错误进行重试。设置重试策略后,调用接口处无需增加重试代码。
*/
BatchGetRowRequest retryRequest = batchGetRowRequest.createRequestForRetry(batchGetRowResponse.getFailedRows());
}
}
详细代码请参见BatchGetRow@GitHub。
范围读取数据
调用GetRange接口读取一个范围内的数据。
GetRange操作支持按照确定范围进行正序读取和逆序读取,可以设置要读取的行数。如果范围较大,已扫描的行数或者数据量超过一定限制,会停止扫描,并返回已获取的行和下一个主键信息。您可以根据返回的下一个主键信息,继续发起请求,获取范围内剩余的行。
表格存储表中的行都是按照主键排序的,而主键是由全部主键列按照顺序组成的,所以不能理解为表格存储会按照某列主键排序,这是常见的误区。
注意事项
GetRange操作遵循最左匹配原则,读取数据时,依次比较第一主键列到第四主键列。例如表的主键包括PK1、PK2、PK3三个主键列,读取数据时,优先比较PK1是否在开始主键与结束主键的范围内,如果PK1在设置的主键范围内,则不会再比较其他的主键,返回在PK1主键范围内的数据;如果PK1在设置的主键边界上,则继续比较PK2是否在开始主键与结束主键的范围内,以此类推。关于范围查询原理的更多信息,请参见GetRange范围查询详解。
GetRange操作可能在如下情况停止执行并返回数据。
扫描的行数据大小之和达到4 MB。
扫描的行数等于5000。
返回的行数等于最大返回行数。
当前剩余的预留读吞吐量已全部使用,余量不足以读取下一条数据。
当使用GetRange扫描的数据量较大时,表格存储每次请求仅会扫描一次(行数大于5000或者大小大于4 MB停止扫描),超过限制的数据不会继续返回,需要通过翻页继续获取后面的数据。
参数
参数 | 说明 |
tableName | 数据表名称。 |
direction | 读取方向。
假设同一表中有两个主键A和B,A小于B,如果正序读取 |
inclusiveStartPrimaryKey | 本次范围读取的起始主键和结束主键,起始主键和结束主键需要是有效的主键或者是由INF_MIN和INF_MAX类型组成的虚拟点,虚拟点的列数必须与主键相同。 其中INF_MIN表示无限小,任何类型的值都比它大;INF_MAX表示无限大,任何类型的值都比它小。
数据表中的行按主键从小到大排序,读取范围是一个左闭右开的区间,正序读取时,返回的是大于等于起始主键且小于结束主键的所有的行。 |
exclusiveEndPrimaryKey | |
limit | 数据的最大返回行数,此值必须大于 0。 表格存储按照正序或者逆序返回指定的最大返回行数后即结束该操作的执行,即使该区间内仍有未返回的数据。此时可以通过返回结果中的nextStartPrimaryKey记录本次读取到的位置,用于下一次读取。 |
columnsToGet | 读取的列集合,列名可以是主键列或属性列。
说明
|
maxVersions | 最多读取的版本数。 重要 maxVersions与timeRange必须至少设置一个。
|
timeRange | 读取版本号范围或特定版本号的数据。更多信息,请参见TimeRange。 重要 maxVersions与timeRange必须至少设置一个。
timestamp和 时间戳的单位为毫秒,最小值为0,最大值为 |
filter | 使用过滤器,在服务端对读取结果再进行一次过滤,只返回符合过滤器中条件的数据行。更多信息,请参见过滤器。 说明 当columnsToGet和filter同时使用时,执行顺序是先获取columnsToGet指定的列,再在返回的列中进行条件过滤。 |
nextStartPrimaryKey | 根据返回结果中的nextStartPrimaryKey判断数据是否全部读取。
|
示例
按照确定范围读取数据
以下示例用于按照确定范围进行正序读取,判断nextStartPrimaryKey是否为空,读取完范围内的全部数据。
private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
//设置数据表名称。
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("<TABLE_NAME>");
//设置起始主键。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(startPkValue));
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
//设置结束主键。
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(endPkValue));
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeRowQueryCriteria.setMaxVersions(1);
System.out.println("GetRange的结果为:");
while (true) {
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
for (Row row : getRangeResponse.getRows()) {
System.out.println(row);
}
//如果NextStartPrimaryKey不为null,则继续读取。
if (getRangeResponse.getNextStartPrimaryKey() != null) {
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
} else {
break;
}
}
}
按照第一个主键列确定范围读取数据
以下示例用于按照第一个主键列确定范围、第二主键列从最小值(INF_MIN)到最大值(INF_MAX)进行正序读取,判断nextStartPrimaryKey是否为null,读取完范围内的全部数据。
private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
//设置数据表名称。
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("<TABLE_NAME>");
//设置起始主键,以两个主键为例。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString(startPkValue));//确定值。
primaryKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.INF_MIN);//最小值。
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
//设置结束主键。
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString(endPkValue));//确定值。
primaryKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.INF_MAX);//最大值。
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeRowQueryCriteria.setMaxVersions(1);
System.out.println("GetRange的结果为:");
while (true) {
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
for (Row row : getRangeResponse.getRows()) {
System.out.println(row);
}
//如果nextStartPrimaryKey不为null,则继续读取。
if (getRangeResponse.getNextStartPrimaryKey() != null) {
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
} else {
break;
}
}
}
按照确定范围读取数据并对指定列使用正则过滤
以下示例用于读取主键范围为["pk:2020-01-01.log", "pk:2021-01-01.log")
时Col1列的数据,并对该列的数据执行正则过滤。
private static void getRange(SyncClient client) {
//设置数据表名称。
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("<TABLE_NAME>");
//设置主键范围为["pk:2020-01-01.log", "pk:2021-01-01.log"),读取范围为左闭右开的区间。
PrimaryKey pk0 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("2020-01-01.log"))
.build();
PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("2021-01-01.log"))
.build();
criteria.setInclusiveStartPrimaryKey(pk0);
criteria.setExclusiveEndPrimaryKey(pk1);
//设置读取最新版本。
criteria.setMaxVersions(1);
//设置过滤器,当cast<int>(regex(Col1)) > 100时,返回该行。
RegexRule regexRule = new RegexRule("t1:([0-9]+),", RegexRule.CastType.VT_INTEGER);
SingleColumnValueRegexFilter filter = new SingleColumnValueRegexFilter("Col1",
regexRule,SingleColumnValueRegexFilter.CompareOperator.GREATER_THAN,ColumnValue.fromLong(100));
criteria.setFilter(filter);
while (true) {
GetRangeResponse resp = client.getRange(new GetRangeRequest(criteria));
for (Row row : resp.getRows()) {
// do something
System.out.println(row);
}
if (resp.getNextStartPrimaryKey() != null) {
criteria.setInclusiveStartPrimaryKey(resp.getNextStartPrimaryKey());
} else {
break;
}
}
}
详细代码请参见GetRange@GitHub。
迭代读取数据
以下示例用于通过createRangeIterator接口迭代读取exampletable表中的数据。
private static void getRangeByIterator(SyncClient client, String startPkValue, String endPkValue) {
//设置数据表名称。
RangeIteratorParameter rangeIteratorParameter = new RangeIteratorParameter("<TABLE_NAME>");
//设置起始主键。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(startPkValue));
rangeIteratorParameter.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
//设置结束主键。
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(endPkValue));
rangeIteratorParameter.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeIteratorParameter.setMaxVersions(1);
Iterator<Row> iterator = client.createRangeIterator(rangeIteratorParameter);
System.out.println("使用Iterator进行GetRange的结果为:");
while (iterator.hasNext()) {
Row row = iterator.next();
System.out.println(row);
}
}
详细代码请参见GetRangeByIterator@GitHub。
并发读取数据
TableStoreReader是表格存储Java SDK提供的工具类,封装了BatchGetRow接口,可以实现并发查询表中数据。同时支持多表查询、查询状态统计、行级别回调和自定义配置功能。
表格存储从Java SDK 5.16.1版本开始支持此功能,请确保使用了正确的SDK版本。关于Java SDK历史迭代版本的更多信息,请参见Java SDK历史迭代版本。
快速上手
构造TableStoreReader。
String endpoint = "<ENDPOINT>"; String accessKeyId = System.getenv("OTS_AK_ENV"); String accessKeySecret = System.getenv("OTS_SK_ENV"); String instanceName = "<INSTANCE_NAME>"; AsyncClientInterface client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName); TableStoreReaderConfig config = new TableStoreReaderConfig(); ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024)) TableStoreReader reader = new DefaultTableStoreReader(client, config, executor, null);
构造查询请求。
将要查询的数据缓存到内存中,支持一次添加一条或多条数据。
PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder() .addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0)) .addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0)) .build(); //新增一列查询表中,pk1的属性列。 Future<ReaderResult> readerResult = reader.addPrimaryKeyWithFuture("<TABLE_NAME1>", pk1); //也可以使用List一次添加多列。 List<PrimaryKey> primaryKeyList = new ArrayList<PrimaryKey>(); Future<ReaderResult> readerResult = reader.addPrimaryKeysWithFuture("<TABLE_NAME2>", primaryKeyList);
查询数据。
将内存中缓存的查询发送出去。您可以通过同步阻塞方式或者异步方式查询数据。
同步阻塞方式
reader.flush();
异步方式
reader.send();
获取查询结果。
//打印成功查询和失败查询的信息。 for (RowReadResult success : readerResult.get().getSucceedRows()) { System.out.println(success.getRowResult()); } for (RowReadResult fail : readerResult.get().getFailedRows()) { System.out.println(fail.getRowResult()); }
关闭TableStoreReader。
reader.close(); //根据实际需要关闭client和executor。 client.shutdown(); executor.shutdown();
配置项
您可以通过修改TableStoreReaderConfig
自定义配置TableStoreReader。
参数 | 说明 |
checkTableMeta | 是否在添加查询行时检查表的结构。默认值为true。 如果在添加查询行时不需要检查表结构,请设置此参数为false。 |
bucketCount | Reader内存中的缓存分桶数。默认值为4。 |
bufferSize | 每个分桶的RingBuffer缓冲区大小,默认值为1024。 |
concurrency | 调用batchGetRow的最大并发度。默认值为10。 |
maxBatchRowsCount | 每次batchGetRow的最大请求行数。默认值为100,最大值为100。 |
defaultMaxVersions | 默认情况下,getRow查询的最大版本数。默认值为1。 |
flushInterval | 自动flush缓存中查询请求的间隔。默认值为10000。单位为毫秒。 |
logInterval | 自动打印任务状态的间隔。默认值为10000。单位为毫秒 |
指定查询条件
您可以指定表级别的查询参数,例如查询的最大版本数,要查询的列、时间范围等。
//查询表的col1列过去60秒的最多10个版本。
//设置数据表名称。
RowQueryCriteria criteria = new RowQueryCriteria("<TABLE_NAME>");
//设置要返回的列。
criteria.addColumnsToGet("col1");
//设置返回的最大版本数。
criteria.setMaxVersions(10);
criteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 60 * 1000, System.currentTimeMillis()));
reader.setRowQueryCriteria(criteria);
完整示例
public class TableStoreReaderDemo {
private static final String endpoint = "<ENDPOINT>";
private static final String accessKeyId = System.getenv("OTS_AK_ENV");
private static final String accessKeySecret = System.getenv("OTS_SK_ENV");
private static final String instanceName = "<INSTANCE_NAME>";
private static AsyncClientInterface client;
private static ExecutorService executor;
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 步骤一:构造TableStoreReader。
*/
//构造AsyncClient。
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
//构造reader配置类。
TableStoreReaderConfig config = new TableStoreReaderConfig();
{
//以下参数均有默认值,可不配置。
//向reader添加要查询的数据前,会先检查表的结构。
config.setCheckTableMeta(true);
//一次请求的最大请求行数,上限为100。
config.setMaxBatchRowsCount(100);
//默认情况下,获取的columns最大版本数。
config.setDefaultMaxVersions(1);
//发送请求的总并发数。
config.setConcurrency(16);
//内存分桶数。
config.setBucketCount(4);
//将缓存数据全部发送的时间间隔。
config.setFlushInterval(10000);
//日志记录reader状态的时间间隔。
config.setLogInterval(10000);
}
//构造用于发送请求的executor。
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "reader-" + counter.getAndIncrement());
}
};
executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(1024), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
//构造reader的回调函数。
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable req, RowReadResult res) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(PrimaryKeyWithTable req, Exception ex) {
failedRows.incrementAndGet();
}
};
TableStoreReader reader = new DefaultTableStoreReader(client, config, executor, callback);
/**
* 步骤二:构造查询请求。
*/
//向内存添加一列要查询的数据。
PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
.addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
.build();
reader.addPrimaryKey("<TABLE_NAME1>", pk1);
//向内存添加一列要查询的数据,并获取查询结果Future。
PrimaryKey pk2 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
.addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
.build();
Future<ReaderResult> readerResult = reader.addPrimaryKeyWithFuture("<TABLE_NAME2>", pk2);
/**
* 步骤三:查询数据。
*/
//异步将内存中的数据发送出去。
reader.send();
/**
* 步骤四:获取查询结果。
*/
//打印成功查询和失败查询的信息。
for (RowReadResult success : readerResult.get().getSucceedRows()) {
System.out.println(success.getRowResult());
}
for (RowReadResult fail : readerResult.get().getFailedRows()) {
System.out.println(fail.getRowResult());
}
/**
* 步骤五:关闭TableStoreReader。
*/
reader.close();
client.shutdown();
executor.shutdown();
}
}