全部产品

并发导出数据

当使用场景中不关心整个结果集的顺序时,可以使用并发导出数据(ParallelScan)功能以更快的速度将匹配的数据全部返回。

前提条件

  • 已初始化Client,详情请参见初始化

  • 已创建数据表并写入数据。

  • 已在数据表上创建多元索引,详情请参见创建多元索引

参数

参数

说明

TableName

数据表名称。

IndexName

多元索引名称。

ScanQuery

Query

多元索引的查询语句。支持精确查询、模糊查询、范围查询、地理位置查询、嵌套查询等,功能和Search接口一致。

Limit

扫描数据时一次能返回的数据行数。

MaxParallel

最大并发数。请求支持的最大并发数由用户数据量决定。数据量越大,支持的并发数越多,每次任务前可以通过ComputeSplits API进行获取。

CurrentParallelID

当前并发ID。取值范围为[0, MaxParallel)。

Token

用于翻页功能。ParallelScan请求结果中有下一次进行翻页的Token,使用该Token可以接着上一次的结果继续读取数据。

AliveTime

ParallelScan的当前任务有效时间,也是Token的有效时间。建议使用默认值。如果在有效时间内没有发起下一次请求,则不能继续读取数据。持续发起请求会刷新Token有效时间。

ColumnsToGet

ParallelScan目前仅可以扫描多元索引中的数据,需要在创建多元索引时设置附加存储(即Store=true)。

SessionId

本次并发扫描数据任务的SessionId。创建Session可以通过ComputeSplits API来创建,同时获得本次任务支持的最大并发数。

示例

单并发扫描数据和多线程并发扫描数据的代码示例如下:

  • 单并发扫描数据

    /**
     * ParallelScan单并发扫描
    数据。
     */
    func ParallelScanSingleConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) {
    	computeSplitsResp, err := computeSplits(client, tableName, indexName)
    	if err != nil {
    		fmt.Printf("%#v", err)
    		return
    	}
    
    	query := search.NewScanQuery().SetQuery(&search.MatchAllQuery{}).SetLimit(2)
    
    	req := &tablestore.ParallelScanRequest{}
    	req.SetTableName(tableName).
    		SetIndexName(indexName).
    		SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}).
    		SetScanQuery(query).
    		SetSessionId(computeSplitsResp.SessionId)
    
    	res, err := client.ParallelScan(req)
    	if err != nil {
    		fmt.Printf("%#v", err)
    		return
    	}
    
    	total := len(res.Rows)
    	for res.NextToken != nil {
    		req.SetScanQuery(query.SetToken(res.NextToken))
    		res, err = client.ParallelScan(req)
    		if err != nil {
    			fmt.Printf("%#v", err)
    			return
    		}
    
    		total += len(res.Rows) //process rows each loop
    	}
    	fmt.Println("total: ", total)
    }
    
  • 多线程并发扫描数据

    /**
     * ParallelScan多并发扫描
    数据。
     */
    func ParallelScanMultiConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) {
    	computeSplitsResp, err := computeSplits(client, tableName, indexName)
    	if err != nil {
    		fmt.Printf("%#v", err)
    		return
    	}
    
    	var wg sync.WaitGroup
    	wg.Add(int(computeSplitsResp.SplitsSize))
    
    	for i := int32(0); i < computeSplitsResp.SplitsSize; i++ {
    		current := i
    		go func() {
    			defer wg.Done()
    			query := search.NewScanQuery().
    				SetQuery(&search.MatchAllQuery{}).
    				SetCurrentParallelID(current).
    				SetMaxParallel(computeSplitsResp.SplitsSize).
    				SetLimit(2)
    
    			req := &tablestore.ParallelScanRequest{}
    			req.SetTableName(tableName).
    				SetIndexName(indexName).
    				SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}).
    				SetScanQuery(query).
    				SetSessionId(computeSplitsResp.SessionId)
    
    			res, err := client.ParallelScan(req)
    			if err != nil {
    				fmt.Printf("%#v", err)
    				return
    			}
    
    			total := len(res.Rows)
    			for res.NextToken != nil {
    				req.SetScanQuery(query.SetToken(res.NextToken))
    				res, err = client.ParallelScan(req)
    				if err != nil {
    					fmt.Printf("%#v", err)
    					return
    				}
    
    				total += len(res.Rows) //process rows each loop
    			}
    			fmt.Println("total: ", total)
    		}()
    	}
    	wg.Wait()
    }