当使用场景中不关心整个结果集的顺序时,可以使用并发导出数据功能以更快的速度将命中的数据全部返回。

说明 5.6.0及其以上版本的SDK开始支持ParallelScan功能。

前提条件

  • 已初始化OTSClient。具体操作,请参见初始化
  • 已创建数据表并写入数据。
  • 已在数据表上创建多元索引。具体操作,请参见创建多元索引

参数

参数 说明
tableName 数据表名称。
indexName 多元索引名称。
scanQuery query 多元索引的查询语句。支持精确查询、模糊查询、范围查询、地理位置查询、嵌套查询等,功能和Search接口一致。
limit 扫描数据时一次能返回的数据行数。
maxParallel 最大并发数。请求支持的最大并发数由用户数据量决定。数据量越大,支持的并发数越多,每次任务前可以通过ComputeSplits API进行获取。
currentParallelId 当前并发ID。取值范围为[0, maxParallel)。
token 用于翻页功能。ParallelScan请求结果中有下一次进行翻页的token,使用该token可以接着上一次的结果继续读取数据。
aliveTime ParallelScan的当前任务有效时间,也是token的有效时间。默认值为60,建议使用默认值,单位为秒。如果在有效时间内没有发起下一次请求,则不能继续读取数据。持续发起请求会刷新token有效时间。
说明 由于服务端采用异步方式清理过期任务,因此当前任务只保证在设置的有效时间内不会过期,但不能保证有效时间之后一定过期。
columnsToGet ParallelScan目前仅可以扫描多元索引中的数据,需要在创建多元索引时设置附加存储(即store=true)。
sessionId 本次并发扫描数据任务的sessionId。创建Session可以通过ComputeSplits API来创建,同时获得本次任务支持的最大并发数。

示例

单并发扫描数据和多线程并发扫描数据的代码示例如下:

  • 单并发扫描数据

    相对于多并发扫描数据,单并发扫描数据的代码更简单,单并发代码无需关心currentParallelId和maxParallel参数。单并发使用方式的整体吞吐比Search接口方式高,但是比多线程多并发使用方式的吞吐低,多线程多并发方式请参见最下方的“多线程并发扫描数据”示例代码。

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
    import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
    import com.alicloud.openservices.tablestore.model.search.ScanQuery;
    import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
    import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
    import com.alicloud.openservices.tablestore.model.search.query.Query;
    import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
    
    public class Test {
    
        public static List<Row> scanQuery(final SyncClient client) {
            String tableName = "<TableName>";
            String indexName = "<IndexName>";
            //获取sessionId和本次请求支持的最大并发数。
            ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
            computeSplitsRequest.setTableName(tableName);
            computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
            ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
            byte[] sessionId = computeSplitsResponse.getSessionId();
            int splitsSize = computeSplitsResponse.getSplitsSize();
            /*
             *  创建并发扫描数据请求。
             */
            ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
            parallelScanRequest.setTableName(tableName);
            parallelScanRequest.setIndexName(indexName);
            ScanQuery scanQuery = new ScanQuery();
            //该query决定了扫描出的数据范围,可用于构建嵌套的复杂的query。
            Query query = new MatchAllQuery();
            scanQuery.setQuery(query);
    
            //设置单次请求返回的数据行数。
            scanQuery.setLimit(2000);
            parallelScanRequest.setScanQuery(scanQuery);
            ColumnsToGet columnsToGet = new ColumnsToGet();
            columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
            parallelScanRequest.setColumnsToGet(columnsToGet);
            parallelScanRequest.setSessionId(sessionId);
    
            /*
             * 使用builder模式创建并发扫描数据请求,功能与前面一致。
             */
            ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
                .tableName(tableName)
                .indexName(indexName)
                .scanQuery(ScanQuery.newBuilder()
                    .query(QueryBuilders.matchAll())
                    .limit(2000)
                    .build())
                .addColumnsToGet("col_1", "col_2")
                .sessionId(sessionId)
                .build();
            List<Row> result = new ArrayList<>();
    
            /*
             * 使用原生的API扫描数据。
             */
            {
                ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
                //下次请求的ScanQuery的token。
                byte[] nextToken = parallelScanResponse.getNextToken();
                //获取数据。
                List<Row> rows = parallelScanResponse.getRows();
                result.addAll(rows);
                while (nextToken != null) {
                    //设置token。
                    parallelScanRequest.getScanQuery().setToken(nextToken);
                    //继续扫描数据。
                    parallelScanResponse = client.parallelScan(parallelScanRequest);
                    //获取数据。
                    rows = parallelScanResponse.getRows();
                    result.addAll(rows);
                    nextToken = parallelScanResponse.getNextToken();
                }
            }
    
            /*
             *  推荐方式。
             *  使用iterator方式扫描所有匹配数据。使用方式上更简单,速度和前面方法一致。
             */
            {
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    result.add(row);
                    //获取具体的值。
                    String col_1 = row.getLatestColumn("col_1").getValue().asString();
                    long col_2 = row.getLatestColumn("col_2").getValue().asLong();
                }
            }
    
            /*
             * 关于失败重试的问题,如果本函数的外部调用者有重试机制或者不需要考虑失败重试问题,可以忽略此部分内容。
             * 为了保证可用性,遇到任何异常均推荐进行任务级别的重试,重新开始一个新的ParallelScan任务。
             * 异常分为如下两种:
             * 1、服务端Session异常OTSSessionExpired。
             * 2、调用者客户端网络等异常。
             */
            try {
                //正常处理逻辑。
                {
                    RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                    while (iterator.hasNext()) {
                        Row row = iterator.next();
                        //处理row,内存足够大时可直接放到list中。
                        result.add(row);
                    }
                }
            } catch (Exception ex) {
                //重试。
                {
                    result.clear();
                    RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                    while (iterator.hasNext()) {
                        Row row = iterator.next();
                        //处理row,内存足够大时可直接放到list中。
                        result.add(row);
                    }
                }
            }
            return result;
        }
    }
  • 多线程并发扫描数据
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
    import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
    import com.alicloud.openservices.tablestore.model.search.ScanQuery;
    import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class Test {
    
        public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
            // 获取机器的CPU数量。
            final int cpuProcessors = Runtime.getRuntime().availableProcessors();
            // 指定客户端多线程的并发数量。建议和客户端的CPU核数一致,避免客户端压力太大,影响查询性能。
            final Semaphore semaphore = new Semaphore(cpuProcessors);
    
            // 获取sessionId和本次请求支持的最大并发数。
            ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
            computeSplitsRequest.setTableName(tableName);
            computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
            ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
            final byte[] sessionId = computeSplitsResponse.getSessionId();
            final int maxParallel = computeSplitsResponse.getSplitsSize();
    
            // 业务统计行数使用。
            AtomicLong rowCount = new AtomicLong(0);
            /*
             *  为了使用一个函数实现多线程功能,此处构建一个内部类继承Thread来使用多线程。
             *  您也可以构建一个正常的外部类,使代码更有条理。
             */
            final class ThreadForScanQuery extends Thread {
                private final int currentParallelId;
    
                private ThreadForScanQuery(int currentParallelId) {
                    this.currentParallelId = currentParallelId;
                    this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId);  // 设置线程名称。
                }
    
                @Override
                public void run() {
                    System.out.println("start thread:" + this.getName());
                    try {
                        // 正常处理逻辑。
                        {
                            ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
                                    .tableName(tableName)
                                    .indexName(indexName)
                                    .scanQuery(ScanQuery.newBuilder()
                                            .query(QueryBuilders.range("col_long").lessThan(10_0000)) // 此处的query决定了获取什么数据。
                                            .limit(2000)
                                            .currentParallelId(currentParallelId)
                                            .maxParallel(maxParallel)
                                            .build())
                                    .addColumnsToGet("col_long", "col_keyword", "col_bool")  // 设置要返回的多元索引中的部分字段,或者使用下行注释的内容获取多元索引中全部数据。
                                    //.returnAllColumnsFromIndex(true)
                                    .sessionId(sessionId)
                                    .build();
                            // 使用Iterator形式获取所有数据。
                            RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
                            long count = 0;
                            while (ltr.hasNext()) {
                                Row row = ltr.next();
                                // 增加自定义的处理逻辑,此处代码以统计行数为例介绍。
                                count++;
                            }
                            rowCount.addAndGet(count);
                            System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
                        }
                    } catch (Exception ex) {
                        // 如果有异常,此处需要考虑重试正常处理逻辑。
                    } finally {
                        semaphore.release();
                    }
                }
            }
    
            // 多个线程同时执行,currentParallelId取值范围为[0, maxParallel)。
            List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
            for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
                ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
                threadList.add(thread);
            }
    
            // 同时启动。
            for (ThreadForScanQuery thread : threadList) {
                // 利用semaphore限制同时启动的线程数量,避免客户端瓶颈。
                semaphore.acquire();
                thread.start();
            }
    
            // 主线程阻塞等待所有线程完成任务。
            for (ThreadForScanQuery thread : threadList) {
                thread.join();
            }
            System.out.println("all thread finished! total rows:" + rowCount.get());
        }
    }