读取数据

表格存储提供了单行读取、批量读取、范围读取、迭代读取和并行读取的查询方式用于读取数据表中数据。数据写入到数据表后,您可以选择所需数据查询方式进行数据读取。

如果需要了解表格存储各场景的应用案例,请参见快速玩转Tablestore入门与实战

查询方式

表格存储提供的数据读取接口包括GetRow、BatchGetRowGetRange。读取数据时,请根据实际查询场景使用相应查询方式读取数据。

重要

当要读取带有自增主键列的表数据时,请确保已获取到包含自增主键列值在内的完整主键。更多信息,请参见主键列自增。如果未记录自增主键列的值,您可以使用范围读取数据按照第一个主键列确定范围读取数据。

查询方式

说明

适用场景

读取单行数据

调用GetRow接口读取一行数据。

适用于能确定完整主键且要读取行数较少的场景。

批量读取数据

调用BatchGetRow接口一次请求读取多行数据或者一次对多张表进行读取。

BatchGetRow操作由多个GetRow子操作组成,构造子操作的过程与使用GetRow接口时相同。

适用于能确定完整主键,且要读取行数较多或者要读取多个表中数据的场景。

范围读取数据

调用GetRange接口读取一个范围内的数据。

GetRange操作支持按照确定范围进行正序读取和逆序读取,可以设置要读取的行数。如果范围较大,已扫描的行数或者数据量超过一定限制,会停止扫描,并返回已获取的行和下一个主键信息。您可以根据返回的下一个主键信息,继续发起请求,获取范围内剩余的行。

适用于能确定完整主键范围或者主键前缀的场景。

重要

如果不能确定主键前缀,您也可以通过设置完整主键范围均为虚拟点INF_MININF_MAX进行全表数据扫描,但是执行此操作会消耗较多计算资源,请谨慎使用。

迭代读取数据

通过createRangeIterator接口迭代读取数据。

适用于能确定完整主键范围或者主键前缀,且需要迭代读取的场景。

并发读取数据

TableStoreReader是表格存储Java SDK提供的工具类,封装了BatchGetRow接口,可以实现并发查询表中数据。同时支持多表查询、查询状态统计、行级别回调和自定义配置功能。

适用于能确定完整主键,且要读取行数较多或者要读取多个表中数据的场景。

前提条件

  • 已初始化OTSClient。具体操作,请参见初始化OTSClient

  • 已创建数据表并写入数据。

读取单行数据

调用GetRow接口读取一行数据。读取的结果可能有如下两种:

  • 如果该行存在,则返回该行的各主键列以及属性列。

  • 如果该行不存在,则返回中不包含行,并且不会报错。

参数

参数

说明

tableName

数据表名称。

primaryKey

行的主键。主键包括主键列名、主键类型和主键值。

重要

设置的主键个数和类型必须和表的主键个数和类型一致。

columnsToGet

读取的列集合,列名可以是主键列或属性列。

  • 如果不设置返回的列名,则返回整行数据。

  • 如果设置了返回的列名,当某行中指定的列均不存在时,则不返回该行,即返回值为null;当某行中存在部分指定的列时,则返回该行且只返回存在的列。

说明
  • 查询一行数据时,默认返回此行所有列的数据。如果需要只返回特定列,可以通过设置columnsToGet参数限制。如果将col0col1加入到columnsToGet中,则只返回col0col1列的值。

  • columnsToGetfilter同时使用时,执行顺序是先获取columnsToGet指定的列,再在返回的列中进行条件过滤。

maxVersions

最多读取的版本数。

重要

maxVersionstimeRange必须至少设置一个。

  • 如果仅设置maxVersions,则最多返回所有版本中从新到旧指定数量版本的数据。

  • 如果仅设置timeRange,则返回该范围内所有数据或指定版本数据。

  • 如果同时设置maxVersionstimeRange,则最多返回版本号范围内从新到旧指定数量版本的数据。

timeRange

读取版本号范围或特定版本号的数据。更多信息,请参见TimeRange

重要

maxVersionstimeRange必须至少设置一个。

  • 如果仅设置maxVersions,则最多返回所有版本中从新到旧指定数量版本的数据。

  • 如果仅设置timeRange,则返回该范围内所有数据或指定版本数据。

  • 如果同时设置maxVersionstimeRange,则最多返回版本号范围内从新到旧指定数量版本的数据。

  • 如果要查询一个范围的数据,则需要设置startend。startend分别表示起始时间戳和结束时间戳,范围为前闭后开区间,即[start, end)

  • 如果要查询特定版本号的数据,则需要设置timestamp。timestamp表示特定的时间戳。

timestamp[start, end)中只需要设置一个。

时间戳的单位为毫秒,最小值为0,最大值为Long.MAX_VALUE

filter

使用过滤器,在服务端对读取结果再进行一次过滤,只返回符合过滤器中条件的数据行。更多信息,请参见过滤器

说明

columnsToGetfilter同时使用时,执行顺序是先获取columnsToGet指定的列,再在返回的列中进行条件过滤。

示例

读取数据时,您可以指定要读取的数据版本、要读取的列、过滤器、正则过滤等。

读取最新版本数据和指定列

以下示例用于读取数据表中的一行数据,设置读取最新版本的数据和读取指定的列。

private static void getRow(SyncClient client, String pkValue) {
    //构造主键。
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue));
    PrimaryKey primaryKey = primaryKeyBuilder.build();

    //读取一行数据,设置数据表名称。
    SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>", primaryKey);
    //设置读取最新版本。
    criteria.setMaxVersions(1);
    GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
    Row row = getRowResponse.getRow();

    System.out.println("读取完毕,结果为: ");
    System.out.println(row);

    //设置读取某些列。
    criteria.addColumnsToGet("Col0");
    getRowResponse = client.getRow(new GetRowRequest(criteria));
    row = getRowResponse.getRow();

    System.out.println("读取完毕,结果为:");
    System.out.println(row);
} 

读取数据时使用过滤器

以下示例用于读取数据表中的一行数据,设置读取最新版本的数据以及根据Col0列的值过滤数据。

private static void getRow(SyncClient client, String pkValue) {
    //构造主键。
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue));
    PrimaryKey primaryKey = primaryKeyBuilder.build();

    //读取一行数据,设置数据表名称。
    SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>", primaryKey);
    //设置读取最新版本。
    criteria.setMaxVersions(1);

    //设置过滤器,当Col0列的值为0时,返回该行。
    SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
            SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
    //如果Col0列不存在,则不返回该行。
    singleColumnValueFilter.setPassIfMissing(false);
    criteria.setFilter(singleColumnValueFilter);

    GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
    Row row = getRowResponse.getRow();

    System.out.println("读取完毕,结果为: ");
    System.out.println(row);
}

读取数据时使用正则过滤

以下示例用于读取数据表一行中Col1列的数据,并对该列的数据执行正则过滤。

private static void getRow(SyncClient client, String pkValue) {
    //设置数据表名称。
    SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>");
 
    //构造主键。
    PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue))
        .build();
    criteria.setPrimaryKey(primaryKey);
 
    // 设置读取最新版本。
    criteria.setMaxVersions(1);
 
    // 设置过滤器,当cast<int>(regex(Col1)) > 100时,返回该行。
    RegexRule regexRule = new RegexRule("t1:([0-9]+),", RegexRule.CastType.VT_INTEGER);
    SingleColumnValueRegexFilter filter =  new SingleColumnValueRegexFilter("Col1",
        regexRule,SingleColumnValueRegexFilter.CompareOperator.GREATER_THAN, ColumnValue.fromLong(100));
    criteria.setFilter(filter);
 
    GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
    Row row = getRowResponse.getRow();

    System.out.println("读取完毕,结果为: ");
    System.out.println(row);
}

批量读取数据

调用BatchGetRow接口一次请求读取多行数据,也支持一次对多张表进行读取。BatchGetRow由多个GetRow子操作组成。构造子操作的过程与使用GetRow接口时相同。

BatchGetRow的各个子操作独立执行,表格存储会分别返回各个子操作的执行结果。

注意事项

  • 批量读取的所有行采用相同的参数条件,例如ColumnsToGet=[colA],则要读取的所有行都只读取colA列。

  • 由于批量读取可能存在部分行失败的情况,失败行的错误信息在返回的BatchGetRowResponse中,但并不抛出异常。因此调用BatchGetRow接口时,需要检查返回值,可通过BatchGetRowResponseisAllSucceed方法判断是否所有行都获取成功;通过BatchGetRowResponsegetFailedRows方法获取失败行的信息。

  • BatchGetRow操作单次支持读取的最大行数为100行。

参数

更多信息,请参见读取单行数据参数

示例

以下示例用于读取10行,设置版本条件、要读取的列、过滤器等。

private static void batchGetRow(SyncClient client) {
    //设置数据表名称。
    MultiRowQueryCriteria multiRowQueryCriteria = new MultiRowQueryCriteria("<TABLE_NAME>");
    //加入10个要读取的行。
    for (int i = 0; i < 10; i++) {
        PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
        primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("pk" + i));
        PrimaryKey primaryKey = primaryKeyBuilder.build();
        multiRowQueryCriteria.addRow(primaryKey);
    }
    //添加条件。
    multiRowQueryCriteria.setMaxVersions(1);
    multiRowQueryCriteria.addColumnsToGet("Col0");
    multiRowQueryCriteria.addColumnsToGet("Col1");
    SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
            SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
    singleColumnValueFilter.setPassIfMissing(false);
    multiRowQueryCriteria.setFilter(singleColumnValueFilter);

    BatchGetRowRequest batchGetRowRequest = new BatchGetRowRequest();
    //BatchGetRow支持读取多个表的数据,一个multiRowQueryCriteria对应一个表的查询条件,可以添加多个multiRowQueryCriteria。
    batchGetRowRequest.addMultiRowQueryCriteria(multiRowQueryCriteria);

    BatchGetRowResponse batchGetRowResponse = client.batchGetRow(batchGetRowRequest);

    System.out.println("是否全部成功:" + batchGetRowResponse.isAllSucceed());
    System.out.println("读取完毕,结果为: ");
    for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getSucceedRows()) {
        System.out.println(rowResult.getRow());
    }
    if (!batchGetRowResponse.isAllSucceed()) {
        for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getFailedRows()) {
            System.out.println("失败的行:" + batchGetRowRequest.getPrimaryKey(rowResult.getTableName(), rowResult.getIndex()));
            System.out.println("失败原因:" + rowResult.getError());
        }

        /**
         * 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试。此处只给出构造重试请求的部分。
         * 推荐的重试方法是使用SDK的自定义重试策略功能,支持对batch操作的部分行错误进行重试。设置重试策略后,调用接口处无需增加重试代码。
         */
        BatchGetRowRequest retryRequest = batchGetRowRequest.createRequestForRetry(batchGetRowResponse.getFailedRows());
    }
}

详细代码请参见BatchGetRow@GitHub

范围读取数据

调用GetRange接口读取一个范围内的数据。

GetRange操作支持按照确定范围进行正序读取和逆序读取,可以设置要读取的行数。如果范围较大,已扫描的行数或者数据量超过一定限制,会停止扫描,并返回已获取的行和下一个主键信息。您可以根据返回的下一个主键信息,继续发起请求,获取范围内剩余的行。

说明

表格存储表中的行都是按照主键排序的,而主键是由全部主键列按照顺序组成的,所以不能理解为表格存储会按照某列主键排序,这是常见的误区。

注意事项

GetRange操作遵循最左匹配原则,读取数据时,依次比较第一主键列到第四主键列。例如表的主键包括PK1、PK2、PK3三个主键列,读取数据时,优先比较PK1是否在开始主键与结束主键的范围内,如果PK1在设置的主键范围内,则不会再比较其他的主键,返回在PK1主键范围内的数据;如果PK1在设置的主键边界上,则继续比较PK2是否在开始主键与结束主键的范围内,以此类推。关于范围查询原理的更多信息,请参见GetRange范围查询详解

GetRange操作可能在如下情况停止执行并返回数据。

  • 扫描的行数据大小之和达到4 MB。

  • 扫描的行数等于5000。

  • 返回的行数等于最大返回行数。

  • 当前剩余的预留读吞吐量已全部使用,余量不足以读取下一条数据。

当使用GetRange扫描的数据量较大时,表格存储每次请求仅会扫描一次(行数大于5000或者大小大于4 MB停止扫描),超过限制的数据不会继续返回,需要通过翻页继续获取后面的数据。

参数

参数

说明

tableName

数据表名称。

direction

读取方向。

  • 如果值为正序(FORWARD),则起始主键必须小于结束主键,返回的行按照主键由小到大的顺序进行排列。

  • 如果值为逆序(BACKWARD),则起始主键必须大于结束主键,返回的行按照主键由大到小的顺序进行排列。

假设同一表中有两个主键AB,A小于B,如果正序读取[A, B),则按从AB的顺序返回主键大于等于A且小于B的行数据;如果逆序读取[B, A),则按从BA的顺序返回大于A且小于等于B的行数据。

inclusiveStartPrimaryKey

本次范围读取的起始主键和结束主键,起始主键和结束主键需要是有效的主键或者是由INF_MININF_MAX类型组成的虚拟点,虚拟点的列数必须与主键相同。

其中INF_MIN表示无限小,任何类型的值都比它大;INF_MAX表示无限大,任何类型的值都比它小。

  • inclusiveStartPrimaryKey表示起始主键,如果该行存在,则返回结果中一定会包含此行。

  • exclusiveEndPrimaryKey表示结束主键,无论该行是否存在,返回结果中都不会包含此行。

数据表中的行按主键从小到大排序,读取范围是一个左闭右开的区间,正序读取时,返回的是大于等于起始主键且小于结束主键的所有的行。

exclusiveEndPrimaryKey

limit

数据的最大返回行数,此值必须大于 0。

表格存储按照正序或者逆序返回指定的最大返回行数后即结束该操作的执行,即使该区间内仍有未返回的数据。此时可以通过返回结果中的nextStartPrimaryKey记录本次读取到的位置,用于下一次读取。

columnsToGet

读取的列集合,列名可以是主键列或属性列。

  • 如果不设置返回的列名,则返回整行数据。

  • 如果设置了返回的列名,当某行中指定的列均不存在时,则不返回该行,即返回值为null;当某行中存在部分指定的列时,则返回该行且只返回存在的列。

说明
  • 查询一行数据时,默认返回此行所有列的数据。如果需要只返回特定列,可以通过设置columnsToGet参数限制。如果将col0col1加入到columnsToGet中,则只返回col0col1列的值。

  • 如果某行数据的主键属于读取范围,但是该行数据不包含指定返回的列,则返回结果中不包含该行数据。

  • columnsToGetfilter同时使用时,执行顺序是先获取columnsToGet指定的列,再在返回的列中进行条件过滤。

maxVersions

最多读取的版本数。

重要

maxVersionstimeRange必须至少设置一个。

  • 如果仅设置maxVersions,则最多返回所有版本中从新到旧指定数量版本的数据。

  • 如果仅设置timeRange,则返回该范围内所有数据或指定版本数据。

  • 如果同时设置maxVersionstimeRange,则最多返回版本号范围内从新到旧指定数量版本的数据。

timeRange

读取版本号范围或特定版本号的数据。更多信息,请参见TimeRange

重要

maxVersionstimeRange必须至少设置一个。

  • 如果仅设置maxVersions,则最多返回所有版本中从新到旧指定数量版本的数据。

  • 如果仅设置timeRange,则返回该范围内所有数据或指定版本数据。

  • 如果同时设置maxVersionstimeRange,则最多返回版本号范围内从新到旧指定数量版本的数据。

  • 如果要查询一个范围的数据,则需要设置startend。startend分别表示起始时间戳和结束时间戳,范围为前闭后开区间,即[start, end)

  • 如果要查询特定版本号的数据,则需要设置timestamp。timestamp表示特定的时间戳。

timestamp[start, end)中只需要设置一个。

时间戳的单位为毫秒,最小值为0,最大值为Long.MAX_VALUE

filter

使用过滤器,在服务端对读取结果再进行一次过滤,只返回符合过滤器中条件的数据行。更多信息,请参见过滤器

说明

columnsToGetfilter同时使用时,执行顺序是先获取columnsToGet指定的列,再在返回的列中进行条件过滤。

nextStartPrimaryKey

根据返回结果中的nextStartPrimaryKey判断数据是否全部读取。

  • 当返回结果中nextStartPrimaryKey不为空时,可以使用此返回值作为下一次GetRange操作的起始点继续读取数据。

  • 当返回结果中nextStartPrimaryKey为空时,表示读取范围内的数据全部返回。

示例

按照确定范围读取数据

以下示例用于按照确定范围进行正序读取,判断nextStartPrimaryKey是否为空,读取完范围内的全部数据。

private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
    //设置数据表名称。
    RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("<TABLE_NAME>");

    //设置起始主键。
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(startPkValue));
    rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());

    //设置结束主键。
    primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(endPkValue));
    rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());

    rangeRowQueryCriteria.setMaxVersions(1);

    System.out.println("GetRange的结果为:");
    while (true) {
        GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
        for (Row row : getRangeResponse.getRows()) {
            System.out.println(row);
        }

        //如果NextStartPrimaryKey不为null,则继续读取。
        if (getRangeResponse.getNextStartPrimaryKey() != null) {
            rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
        } else {
            break;
        }
    }
}         

按照第一个主键列确定范围读取数据

以下示例用于按照第一个主键列确定范围、第二主键列从最小值(INF_MIN)到最大值(INF_MAX)进行正序读取,判断nextStartPrimaryKey是否为null,读取完范围内的全部数据。

private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
    //设置数据表名称。
    RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("<TABLE_NAME>");
    //设置起始主键,以两个主键为例。
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString(startPkValue));//确定值。
    primaryKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.INF_MIN);//最小值。
    rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());

    //设置结束主键。
    primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString(endPkValue));//确定值。
    primaryKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.INF_MAX);//最大值。
    rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());

    rangeRowQueryCriteria.setMaxVersions(1);

    System.out.println("GetRange的结果为:");
    while (true) {
        GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
        for (Row row : getRangeResponse.getRows()) {
            System.out.println(row);
        }

        //如果nextStartPrimaryKey不为null,则继续读取。
        if (getRangeResponse.getNextStartPrimaryKey() != null) {
            rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
        } else {
            break;
        }
    }
}    

按照确定范围读取数据并对指定列使用正则过滤

以下示例用于读取主键范围为["pk:2020-01-01.log", "pk:2021-01-01.log")Col1列的数据,并对该列的数据执行正则过滤。

private static void getRange(SyncClient client) {
    //设置数据表名称。
    RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("<TABLE_NAME>");
 
    //设置主键范围为["pk:2020-01-01.log", "pk:2021-01-01.log"),读取范围为左闭右开的区间。
    PrimaryKey pk0 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("2020-01-01.log"))
        .build();
    PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("2021-01-01.log"))
        .build();
    criteria.setInclusiveStartPrimaryKey(pk0);
    criteria.setExclusiveEndPrimaryKey(pk1);
 
    //设置读取最新版本。
    criteria.setMaxVersions(1);
 
    //设置过滤器,当cast<int>(regex(Col1)) > 100时,返回该行。
    RegexRule regexRule = new RegexRule("t1:([0-9]+),", RegexRule.CastType.VT_INTEGER);
    SingleColumnValueRegexFilter filter =  new SingleColumnValueRegexFilter("Col1",
        regexRule,SingleColumnValueRegexFilter.CompareOperator.GREATER_THAN,ColumnValue.fromLong(100));
    criteria.setFilter(filter);

    while (true) {
        GetRangeResponse resp = client.getRange(new GetRangeRequest(criteria));
        for (Row row : resp.getRows()) {
            // do something
            System.out.println(row);
        }
        if (resp.getNextStartPrimaryKey() != null) {
            criteria.setInclusiveStartPrimaryKey(resp.getNextStartPrimaryKey());
        } else {
            break;
        }
   }
}

详细代码请参见GetRange@GitHub

迭代读取数据

以下示例用于通过createRangeIterator接口迭代读取exampletable表中的数据。

private static void getRangeByIterator(SyncClient client, String startPkValue, String endPkValue) {
    //设置数据表名称。
    RangeIteratorParameter rangeIteratorParameter = new RangeIteratorParameter("<TABLE_NAME>");

    //设置起始主键。
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(startPkValue));
    rangeIteratorParameter.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());

    //设置结束主键。
    primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(endPkValue));
    rangeIteratorParameter.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());

    rangeIteratorParameter.setMaxVersions(1);

    Iterator<Row> iterator = client.createRangeIterator(rangeIteratorParameter);

    System.out.println("使用Iterator进行GetRange的结果为:");
    while (iterator.hasNext()) {
        Row row = iterator.next();
        System.out.println(row);
    }
}           

详细代码请参见GetRangeByIterator@GitHub

并发读取数据

TableStoreReader是表格存储Java SDK提供的工具类,封装了BatchGetRow接口,可以实现并发查询表中数据。同时支持多表查询、查询状态统计、行级别回调和自定义配置功能。

重要

表格存储从Java SDK 5.16.1版本开始支持此功能,请确保使用了正确的SDK版本。关于Java SDK历史迭代版本的更多信息,请参见Java SDK历史迭代版本

快速上手

  1. 构造TableStoreReader。

    String endpoint = "<ENDPOINT>";
    String accessKeyId = System.getenv("OTS_AK_ENV");
    String accessKeySecret = System.getenv("OTS_SK_ENV");
    String instanceName = "<INSTANCE_NAME>";
    
    AsyncClientInterface client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
    TableStoreReaderConfig config = new TableStoreReaderConfig();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024))
    
    TableStoreReader reader = new DefaultTableStoreReader(client, config, executor, null);
  2. 构造查询请求。

    将要查询的数据缓存到内存中,支持一次添加一条或多条数据。

    PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
            .addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
            .build();
    //新增一列查询表中,pk1的属性列。
    Future<ReaderResult> readerResult = reader.addPrimaryKeyWithFuture("<TABLE_NAME1>", pk1);
    //也可以使用List一次添加多列。
    List<PrimaryKey> primaryKeyList = new ArrayList<PrimaryKey>();
    Future<ReaderResult> readerResult = reader.addPrimaryKeysWithFuture("<TABLE_NAME2>", primaryKeyList);
  3. 查询数据。

    将内存中缓存的查询发送出去。您可以通过同步阻塞方式或者异步方式查询数据。

    • 同步阻塞方式

      reader.flush();
    • 异步方式

      reader.send();
  4. 获取查询结果。

    //打印成功查询和失败查询的信息。
    for (RowReadResult success : readerResult.get().getSucceedRows()) {
        System.out.println(success.getRowResult());
    }
    
    for (RowReadResult fail : readerResult.get().getFailedRows()) {
        System.out.println(fail.getRowResult());
    }
  5. 关闭TableStoreReader。

    reader.close();
    //根据实际需要关闭clientexecutor。
    client.shutdown();
    executor.shutdown();

配置项

您可以通过修改TableStoreReaderConfig自定义配置TableStoreReader。

参数

说明

checkTableMeta

是否在添加查询行时检查表的结构。默认值为true。

如果在添加查询行时不需要检查表结构,请设置此参数为false。

bucketCount

Reader内存中的缓存分桶数。默认值为4。

bufferSize

每个分桶的RingBuffer缓冲区大小,默认值为1024。

concurrency

调用batchGetRow的最大并发度。默认值为10。

maxBatchRowsCount

每次batchGetRow的最大请求行数。默认值为100,最大值为100。

defaultMaxVersions

默认情况下,getRow查询的最大版本数。默认值为1。

flushInterval

自动flush缓存中查询请求的间隔。默认值为10000。单位为毫秒。

logInterval

自动打印任务状态的间隔。默认值为10000。单位为毫秒

指定查询条件

您可以指定表级别的查询参数,例如查询的最大版本数,要查询的列、时间范围等。

//查询表的col1列过去60秒的最多10个版本。
//设置数据表名称。
RowQueryCriteria criteria = new RowQueryCriteria("<TABLE_NAME>");
//设置要返回的列。
criteria.addColumnsToGet("col1");
//设置返回的最大版本数。
criteria.setMaxVersions(10);
criteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 60 * 1000, System.currentTimeMillis()));
reader.setRowQueryCriteria(criteria);

完整示例

public class TableStoreReaderDemo {
    private static final String endpoint = "<ENDPOINT>";
    private static final String accessKeyId = System.getenv("OTS_AK_ENV");
    private static final String accessKeySecret = System.getenv("OTS_SK_ENV");
    private static final String instanceName = "<INSTANCE_NAME>";
    private static AsyncClientInterface client;
    private static ExecutorService executor;
    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /**
         * 步骤一:构造TableStoreReader。
         */
        //构造AsyncClient。
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
        //构造reader配置类。
        TableStoreReaderConfig config = new TableStoreReaderConfig();
        {
            //以下参数均有默认值,可不配置。
            //向reader添加要查询的数据前,会先检查表的结构。
            config.setCheckTableMeta(true);  
            //一次请求的最大请求行数,上限为100。
            config.setMaxBatchRowsCount(100);    
            //默认情况下,获取的columns最大版本数。
            config.setDefaultMaxVersions(1);
            //发送请求的总并发数。
            config.setConcurrency(16); 
            //内存分桶数。
            config.setBucketCount(4);      
            //将缓存数据全部发送的时间间隔。
            config.setFlushInterval(10000);      
            //日志记录reader状态的时间间隔。
            config.setLogInterval(10000);                   
        }
        //构造用于发送请求的executor。
        ThreadFactory threadFactory = new ThreadFactory() {
            private final AtomicInteger counter = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "reader-" + counter.getAndIncrement());
            }
        };
        executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue(1024), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

        //构造reader的回调函数。
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable req, RowReadResult res) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(PrimaryKeyWithTable req, Exception ex) {
                failedRows.incrementAndGet();
            }
        };
        TableStoreReader reader = new DefaultTableStoreReader(client, config, executor, callback);
        /**
         * 步骤二:构造查询请求。
         */
        //向内存添加一列要查询的数据。
        PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
                .addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
                .build();
        reader.addPrimaryKey("<TABLE_NAME1>", pk1);

        //向内存添加一列要查询的数据,并获取查询结果Future。
        PrimaryKey pk2 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
                .addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
                .build();
        Future<ReaderResult> readerResult = reader.addPrimaryKeyWithFuture("<TABLE_NAME2>", pk2);
        /**
         * 步骤三:查询数据。
         */
        //异步将内存中的数据发送出去。
        reader.send();
        /**
         * 步骤四:获取查询结果。
         */
        //打印成功查询和失败查询的信息。
        for (RowReadResult success : readerResult.get().getSucceedRows()) {
            System.out.println(success.getRowResult());
        }
        for (RowReadResult fail : readerResult.get().getFailedRows()) {
            System.out.println(fail.getRowResult());
        }
        /**
         * 步骤五:关闭TableStoreReader。
         */
        reader.close();
        client.shutdown();
        executor.shutdown();
    }
}

相关文档

  • 如果要使用索引技术加速数据查询,您可以通过二级索引或者多元索引功能实现。更多信息,请参见二级索引或者多元索引

  • 如果要可视化展示表中数据,您可以通过对接DataV或者Grafana工具实现。更多信息,请参见数据可视化

  • 如果要下载表中数据到本地,您可以通过DataX、表格存储命令行CLI工具实现。更多信息,请参见将表格存储数据下载到本地文件

  • 如果要计算与分析表中数据,您可以通过表格存储SQL查询实现。更多信息,请参见SQL查询

    说明

    您还可以通过MaxCompute、Spark、Hive或者HadoopMR、函数计算、Flink等计算引擎实现表中数据的计算与分析。具体操作,请参见计算与分析