当使用场景中不关心整个结果集的顺序时,可以使用并发导出数据功能以更快的速度将命中的数据全部返回。
说明 5.6.0及其以上版本的SDK开始支持ParallelScan功能。
前提条件
参数
参数 | 说明 | |
---|---|---|
tableName | 数据表名称。 | |
indexName | 多元索引名称。 | |
scanQuery | query | 多元索引的查询语句。支持精确查询、模糊查询、范围查询、地理位置查询、嵌套查询等,功能和Search接口一致。 |
limit | 扫描数据时一次能返回的数据行数。 | |
maxParallel | 最大并发数。请求支持的最大并发数由用户数据量决定。数据量越大,支持的并发数越多,每次任务前可以通过ComputeSplits API进行获取。 | |
currentParallelId | 当前并发ID。取值范围为[0, maxParallel)。 | |
token | 用于翻页功能。ParallelScan请求结果中有下一次进行翻页的token,使用该token可以接着上一次的结果继续读取数据。 | |
aliveTime | ParallelScan的当前任务有效时间,也是token的有效时间。默认值为60,建议使用默认值,单位为秒。如果在有效时间内没有发起下一次请求,则不能继续读取数据。持续发起请求会刷新token有效时间。
说明 由于服务端采用异步方式清理过期任务,因此当前任务只保证在设置的有效时间内不会过期,但不能保证有效时间之后一定过期。
|
|
columnsToGet | ParallelScan目前仅可以扫描多元索引中的数据,需要在创建多元索引时设置附加存储(即store=true)。 | |
sessionId | 本次并发扫描数据任务的sessionId。创建Session可以通过ComputeSplits API来创建,同时获得本次任务支持的最大并发数。 |
示例
单并发扫描数据和多线程并发扫描数据的代码示例如下:
- 单并发扫描数据
相对于多并发扫描数据,单并发扫描数据的代码更简单,单并发代码无需关心currentParallelId和maxParallel参数。单并发使用方式的整体吞吐比Search接口方式高,但是比多线程多并发使用方式的吞吐低,多线程多并发方式请参见最下方的“多线程并发扫描数据”示例代码。
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import com.alicloud.openservices.tablestore.SyncClient; import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest; import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse; import com.alicloud.openservices.tablestore.model.Row; import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions; import com.alicloud.openservices.tablestore.model.iterator.RowIterator; import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest; import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse; import com.alicloud.openservices.tablestore.model.search.ScanQuery; import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet; import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery; import com.alicloud.openservices.tablestore.model.search.query.Query; import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders; public class Test { public static List<Row> scanQuery(final SyncClient client) { String tableName = "<TableName>"; String indexName = "<IndexName>"; //获取sessionId和本次请求支持的最大并发数。 ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest(); computeSplitsRequest.setTableName(tableName); computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName)); ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest); byte[] sessionId = computeSplitsResponse.getSessionId(); int splitsSize = computeSplitsResponse.getSplitsSize(); /* * 创建并发扫描数据请求。 */ ParallelScanRequest parallelScanRequest = new ParallelScanRequest(); parallelScanRequest.setTableName(tableName); parallelScanRequest.setIndexName(indexName); ScanQuery scanQuery = new ScanQuery(); //该query决定了扫描出的数据范围,可用于构建嵌套的复杂的query。 Query query = new MatchAllQuery(); scanQuery.setQuery(query); //设置单次请求返回的数据行数。 scanQuery.setLimit(2000); parallelScanRequest.setScanQuery(scanQuery); ColumnsToGet columnsToGet = new ColumnsToGet(); columnsToGet.setColumns(Arrays.asList("col_1", "col_2")); parallelScanRequest.setColumnsToGet(columnsToGet); parallelScanRequest.setSessionId(sessionId); /* * 使用builder模式创建并发扫描数据请求,功能与前面一致。 */ ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder() .tableName(tableName) .indexName(indexName) .scanQuery(ScanQuery.newBuilder() .query(QueryBuilders.matchAll()) .limit(2000) .build()) .addColumnsToGet("col_1", "col_2") .sessionId(sessionId) .build(); List<Row> result = new ArrayList<>(); /* * 使用原生的API扫描数据。 */ { ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest); //下次请求的ScanQuery的token。 byte[] nextToken = parallelScanResponse.getNextToken(); //获取数据。 List<Row> rows = parallelScanResponse.getRows(); result.addAll(rows); while (nextToken != null) { //设置token。 parallelScanRequest.getScanQuery().setToken(nextToken); //继续扫描数据。 parallelScanResponse = client.parallelScan(parallelScanRequest); //获取数据。 rows = parallelScanResponse.getRows(); result.addAll(rows); nextToken = parallelScanResponse.getNextToken(); } } /* * 推荐方式。 * 使用iterator方式扫描所有匹配数据。使用方式上更简单,速度和前面方法一致。 */ { RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder); while (iterator.hasNext()) { Row row = iterator.next(); result.add(row); //获取具体的值。 String col_1 = row.getLatestColumn("col_1").getValue().asString(); long col_2 = row.getLatestColumn("col_2").getValue().asLong(); } } /* * 关于失败重试的问题,如果本函数的外部调用者有重试机制或者不需要考虑失败重试问题,可以忽略此部分内容。 * 为了保证可用性,遇到任何异常均推荐进行任务级别的重试,重新开始一个新的ParallelScan任务。 * 异常分为如下两种: * 1、服务端Session异常OTSSessionExpired。 * 2、调用者客户端网络等异常。 */ try { //正常处理逻辑。 { RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder); while (iterator.hasNext()) { Row row = iterator.next(); //处理row,内存足够大时可直接放到list中。 result.add(row); } } } catch (Exception ex) { //重试。 { result.clear(); RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder); while (iterator.hasNext()) { Row row = iterator.next(); //处理row,内存足够大时可直接放到list中。 result.add(row); } } } return result; } }
- 多线程并发扫描数据
import com.alicloud.openservices.tablestore.SyncClient; import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest; import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse; import com.alicloud.openservices.tablestore.model.Row; import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions; import com.alicloud.openservices.tablestore.model.iterator.RowIterator; import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest; import com.alicloud.openservices.tablestore.model.search.ScanQuery; import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; public class Test { public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException { // 获取机器的CPU数量。 final int cpuProcessors = Runtime.getRuntime().availableProcessors(); // 指定客户端多线程的并发数量。建议和客户端的CPU核数一致,避免客户端压力太大,影响查询性能。 final Semaphore semaphore = new Semaphore(cpuProcessors); // 获取sessionId和本次请求支持的最大并发数。 ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest(); computeSplitsRequest.setTableName(tableName); computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName)); ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest); final byte[] sessionId = computeSplitsResponse.getSessionId(); final int maxParallel = computeSplitsResponse.getSplitsSize(); // 业务统计行数使用。 AtomicLong rowCount = new AtomicLong(0); /* * 为了使用一个函数实现多线程功能,此处构建一个内部类继承Thread来使用多线程。 * 您也可以构建一个正常的外部类,使代码更有条理。 */ final class ThreadForScanQuery extends Thread { private final int currentParallelId; private ThreadForScanQuery(int currentParallelId) { this.currentParallelId = currentParallelId; this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId); // 设置线程名称。 } @Override public void run() { System.out.println("start thread:" + this.getName()); try { // 正常处理逻辑。 { ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder() .tableName(tableName) .indexName(indexName) .scanQuery(ScanQuery.newBuilder() .query(QueryBuilders.range("col_long").lessThan(10_0000)) // 此处的query决定了获取什么数据。 .limit(2000) .currentParallelId(currentParallelId) .maxParallel(maxParallel) .build()) .addColumnsToGet("col_long", "col_keyword", "col_bool") // 设置要返回的多元索引中的部分字段,或者使用下行注释的内容获取多元索引中全部数据。 //.returnAllColumnsFromIndex(true) .sessionId(sessionId) .build(); // 使用Iterator形式获取所有数据。 RowIterator ltr = client.createParallelScanIterator(parallelScanRequest); long count = 0; while (ltr.hasNext()) { Row row = ltr.next(); // 增加自定义的处理逻辑,此处代码以统计行数为例介绍。 count++; } rowCount.addAndGet(count); System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count); } } catch (Exception ex) { // 如果有异常,此处需要考虑重试正常处理逻辑。 } finally { semaphore.release(); } } } // 多个线程同时执行,currentParallelId取值范围为[0, maxParallel)。 List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>(); for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) { ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId); threadList.add(thread); } // 同时启动。 for (ThreadForScanQuery thread : threadList) { // 利用semaphore限制同时启动的线程数量,避免客户端瓶颈。 semaphore.acquire(); thread.start(); } // 主线程阻塞等待所有线程完成任务。 for (ThreadForScanQuery thread : threadList) { thread.join(); } System.out.println("all thread finished! total rows:" + rowCount.get()); } }