并发导出数据

当使用场景中不关心整个结果集的顺序时,您可以使用并发导出数据功能以更快的速度将命中的数据全部返回。

重要

表格存储Java SDK5.6.0版本开始支持并发导出数据功能。使用并发导出数据功能时,请确保获取了正确的Java SDK版本。关于Java SDK历史迭代版本的更多信息,请参见Java SDK历史迭代版本

背景

多元索引中提供了Search接口,Search接口支持全功能集,包括所有的查询功能以及排序、统计聚合等分析能力。使用Search接口查询数据的结果会按照指定的顺序返回。

但是在有些场景中,例如对接计算系统Spark、Presto等或者一些圈选场景,只需要使用完整的查询能力将命中的数据以更快的速度全部返回,不关心整个结果集的顺序。为了支持此类需求,多元索引中提供了ParallelScan接口。

ParallelScan接口相对于Search接口,保留了所有的查询功能,但是舍弃了排序、统计聚合等分析功能,从而带来了5倍以上的性能提升,因此使用ParallelScan接口可以实现1分钟内上亿级别数据行的导出能力,导出能力可以水平扩展,不存在上限。

ParallelScan接口的实现中,单次请求的limit限制更宽松。目前Search接口的limit最大允许100行,但是ParallelScan接口的limit最大允许2000行。同时支持多个线程一起并发请求,因此导出速度会极快。

场景

  • 如果请求关心排序、统计聚合,或者是终端客户的直接查询请求,请使用Search接口实现。

  • 如果请求不关心排序,只关心把所有符合条件的数据尽快返回,或者是计算系统(Spark、Presto等)拉取数据,请使用ParallelScan接口实现。

特点

ParallelScan接口相对于Search接口的典型特征如下:

  • 结果稳定性

    ParallelScan任务是有状态的。在一个Session请求中获取到的结果集是确定的,由发起第一次请求时的数据状态决定。如果发起第一次请求后插入了新的数据或者修改了原有的数据不会对结果集造成影响。

  • 新增会话(Session)概念

    重要

    在某些不易获取sessionId的场景中,ParallelScan接口也支持不携带sessionId发起请求,但是不使用sessionId可能会有极小的概率导致获取到的结果集中有重复数据。

    ParallelScan系列接口中新增了Session概念。使用sessionId能够保证获取到的结果集是稳定的,具体流程如下:

    1. 通过ComputeSplits接口获取最大并发数和当前sessionId。

    2. 通过发起多个并发ParallelScan请求读取数据,请求中需要指定当前的sessionId和当前并发ID。

    如果在ParallelScan任务执行的过程中发生网络异常、线程异常、动态Schema修改、索引切换等情况,则会导致ParallelScan不能继续扫描数据,服务端会返回“OTSSessionExpired”错误码,此时需要重新开始一个新的ParallelScan任务,从最开始重新拉取数据。

    同一个sessionIdScanQuery相同的多个并发任务视为一个ParallelScan任务。一个ParallelScan任务的生命周期定义为开始时间是第一次发出ParallelScan请求,结束时间是翻页扫描完所有数据或者请求的token失效。

  • 最大并发数

    ParallelScan支持的单请求的最大并发数由ComputeSplits的返回值确定。数据越多,支持的并发数越大。

    单请求指同一个查询语句,例如查询city="杭州"的结果,如果使用Search接口查询,则Search请求的返回值中会包括所有city="杭州"的结果;如果使用ParallelScan接口查询且并发数为2,则每个ParallelScan请求返回50%的结果,然后将两个并发的结果集合并在一起才是完整的结果集。

  • 性能

    ParallelScan接口单并发扫描数据的性能是Search接口的5倍。当增大并发数时,性能可以继续线性提高,例如8并发时仍可以继续提高4倍性能。

  • 成本

    由于ParallelScan请求对资源的消耗更少,价格会更便宜,所以对于大数据量的导出类需求,强烈建议使用ParallelScan接口。

注意事项

  • 同时存在的ParallelScan任务数量有一定的限制,当前为10个,后续会根据客户需求继续调整。

  • 只支持返回多元索引中已存在的列,但是不能返回多元索引中日期类型和嵌套类型的列。

    目前已支持返回数组和地理位置的列,但是返回的字段值会被格式化,可能和写入数据表的值不一致。例如对于数组类型,写入数据表的值为"[1,2, 3, 4]",则通过ParallelScan接口导出的值为"[1,2,3,4]";对于地理位置类型,写入数据表的值为10,50,则通过ParallelScan接口导出的值为10.0,50.0

    实际使用时,ReturnType可以配置为RETURN_ALL_INDEX或者RETURN_SPECIFIED,不能配置为RETURN_ALL。

  • 每次最大支持返回2000行数据(即limit默认值为2000)。当超过2000后,limit的变化对性能基本无影响。

接口

多并发数据导出功能涉及如下两个接口。

  • ComputeSplits:获取当前ParallelScan单个请求的最大并发数。

  • ParallelScan:执行具体的数据导出功能。

使用

您可以使用如下语言的SDK并发导出数据。

参数

参数

说明

tableName

数据表名称。

indexName

多元索引名称。

scanQuery

query

多元索引的查询语句。支持精确查询、模糊查询、范围查询、地理位置查询、嵌套查询等,功能和Search接口一致。

limit

扫描数据时一次能返回的数据行数。

maxParallel

最大并发数。请求支持的最大并发数由用户数据量决定。数据量越大,支持的并发数越多,每次任务前可以通过ComputeSplits API进行获取。

currentParallelId

当前并发ID。取值范围为[0, maxParallel)。

token

用于翻页功能。ParallelScan请求结果中有下一次进行翻页的token,使用该token可以接着上一次的结果继续读取数据。

aliveTime

ParallelScan的当前任务有效时间,也是token的有效时间。默认值为60,建议使用默认值,单位为秒。如果在有效时间内没有发起下一次请求,则不能继续读取数据。持续发起请求会刷新token有效时间。

说明

动态修改schema中的切换索引、服务端单台机器故障、服务端负载均衡等均会导致Session提前过期,此时需要重新创建Session。

columnsToGet

指定分组结果中需要返回的列名,可以通过将列名加入Columns来实现。

如果需要返回多元索引中的所有列,则可以使用更简洁的ReturnAllFromIndex实现。

重要

此处不能使用ReturnAll。

sessionId

本次并发扫描数据任务的sessionId。创建Session可以通过ComputeSplits API来创建,同时获得本次任务支持的最大并发数。

示例

请根据实际选择单并发扫描数据和多线程并发扫描数据。

单并发扫描数据

相对于多并发扫描数据,单并发扫描数据的代码更简单,单并发代码无需关心currentParallelIdmaxParallel参数。单并发使用方式的整体吞吐比Search接口方式高,但是比多线程多并发使用方式的吞吐低。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;

public class Test {

    public static List<Row> scanQuery(final SyncClient client) {
        String tableName = "<TableName>";
        String indexName = "<IndexName>";
        //获取sessionId和本次请求支持的最大并发数。
        ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
        computeSplitsRequest.setTableName(tableName);
        computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
        ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
        byte[] sessionId = computeSplitsResponse.getSessionId();
        int splitsSize = computeSplitsResponse.getSplitsSize();
        /*
         *  创建并发扫描数据请求。
         */
        ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
        parallelScanRequest.setTableName(tableName);
        parallelScanRequest.setIndexName(indexName);
        ScanQuery scanQuery = new ScanQuery();
        //该query决定了扫描出的数据范围,可用于构建嵌套的复杂的query。
        Query query = new MatchAllQuery();
        scanQuery.setQuery(query);

        //设置单次请求返回的数据行数。
        scanQuery.setLimit(2000);
        parallelScanRequest.setScanQuery(scanQuery);
        ColumnsToGet columnsToGet = new ColumnsToGet();
        columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
        parallelScanRequest.setColumnsToGet(columnsToGet);
        parallelScanRequest.setSessionId(sessionId);

        /*
         * 使用builder模式创建并发扫描数据请求,功能与前面一致。
         */
        ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
            .tableName(tableName)
            .indexName(indexName)
            .scanQuery(ScanQuery.newBuilder()
                .query(QueryBuilders.matchAll())
                .limit(2000)
                .build())
            .addColumnsToGet("col_1", "col_2")
            .sessionId(sessionId)
            .build();
        List<Row> result = new ArrayList<>();

        /*
         * 使用原生的API扫描数据。
         */
        {
            ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
            //下次请求的ScanQuery的token。
            byte[] nextToken = parallelScanResponse.getNextToken();
            //获取数据。
            List<Row> rows = parallelScanResponse.getRows();
            result.addAll(rows);
            while (nextToken != null) {
                //设置token。
                parallelScanRequest.getScanQuery().setToken(nextToken);
                //继续扫描数据。
                parallelScanResponse = client.parallelScan(parallelScanRequest);
                //获取数据。
                rows = parallelScanResponse.getRows();
                result.addAll(rows);
                nextToken = parallelScanResponse.getNextToken();
            }
        }

        /*
         *  推荐方式。
         *  使用iterator方式扫描所有匹配数据。使用方式上更简单,速度和前面方法一致。
         */
        {
            RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
            while (iterator.hasNext()) {
                Row row = iterator.next();
                result.add(row);
                //获取具体的值。
                String col_1 = row.getLatestColumn("col_1").getValue().asString();
                long col_2 = row.getLatestColumn("col_2").getValue().asLong();
            }
        }

        /*
         * 关于失败重试的问题,如果本函数的外部调用者有重试机制或者不需要考虑失败重试问题,可以忽略此部分内容。
         * 为了保证可用性,遇到任何异常均推荐进行任务级别的重试,重新开始一个新的ParallelScan任务。
         * 异常分为如下两种:
         * 1、服务端Session异常OTSSessionExpired。
         * 2、调用者客户端网络等异常。
         */
        try {
            //正常处理逻辑。
            {
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    //处理row,内存足够大时可直接放到list中。
                    result.add(row);
                }
            }
        } catch (Exception ex) {
            //重试。
            {
                result.clear();
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    //处理row,内存足够大时可直接放到list中。
                    result.add(row);
                }
            }
        }
        return result;
    }
}

多线程并发扫描数据

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;

public class Test {

    public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
        // 获取机器的CPU数量。
        final int cpuProcessors = Runtime.getRuntime().availableProcessors();
        // 指定客户端多线程的并发数量。建议和客户端的CPU核数一致,避免客户端压力太大,影响查询性能。
        final Semaphore semaphore = new Semaphore(cpuProcessors);

        // 获取sessionId和本次请求支持的最大并发数。
        ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
        computeSplitsRequest.setTableName(tableName);
        computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
        ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
        final byte[] sessionId = computeSplitsResponse.getSessionId();
        final int maxParallel = computeSplitsResponse.getSplitsSize();

        // 业务统计行数使用。
        AtomicLong rowCount = new AtomicLong(0);
        /*
         *  为了使用一个函数实现多线程功能,此处构建一个内部类继承Thread来使用多线程。
         *  您也可以构建一个正常的外部类,使代码更有条理。
         */
        final class ThreadForScanQuery extends Thread {
            private final int currentParallelId;

            private ThreadForScanQuery(int currentParallelId) {
                this.currentParallelId = currentParallelId;
                this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId);  // 设置线程名称。
            }

            @Override
            public void run() {
                System.out.println("start thread:" + this.getName());
                try {
                    // 正常处理逻辑。
                    {
                        ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
                                .tableName(tableName)
                                .indexName(indexName)
                                .scanQuery(ScanQuery.newBuilder()
                                        .query(QueryBuilders.range("col_long").lessThan(10_0000)) // 此处的query决定了获取什么数据。
                                        .limit(2000)
                                        .currentParallelId(currentParallelId)
                                        .maxParallel(maxParallel)
                                        .build())
                                .addColumnsToGet("col_long", "col_keyword", "col_bool")  // 设置要返回的多元索引中的部分字段,或者使用下行注释的内容获取多元索引中全部数据。
                                //.returnAllColumnsFromIndex(true)
                                .sessionId(sessionId)
                                .build();
                        // 使用Iterator形式获取所有数据。
                        RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
                        long count = 0;
                        while (ltr.hasNext()) {
                            Row row = ltr.next();
                            // 增加自定义的处理逻辑,此处代码以统计行数为例介绍。
                            count++;
                        }
                        rowCount.addAndGet(count);
                        System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
                    }
                } catch (Exception ex) {
                    // 如果有异常,此处需要考虑重试正常处理逻辑。
                } finally {
                    semaphore.release();
                }
            }
        }

        // 多个线程同时执行,currentParallelId取值范围为[0, maxParallel)。
        List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
        for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
            ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
            threadList.add(thread);
        }

        // 同时启动。
        for (ThreadForScanQuery thread : threadList) {
            // 利用semaphore限制同时启动的线程数量,避免客户端瓶颈。
            semaphore.acquire();
            thread.start();
        }

        // 主线程阻塞等待所有线程完成任务。
        for (ThreadForScanQuery thread : threadList) {
            thread.join();
        }
        System.out.println("all thread finished! total rows:" + rowCount.get());
    }
}