当使用场景中不关心整个结果集的顺序时,可以使用并发导出数据(ParallelScan)功能以更快的速度将匹配的数据全部返回。
前提条件
参数
参数 | 说明 | |
---|---|---|
TableName | 数据表名称。 | |
IndexName | 多元索引名称。 | |
ScanQuery | Query | 多元索引的查询语句。支持精确查询、模糊查询、范围查询、地理位置查询、嵌套查询等,功能和Search接口一致。 |
Limit | 扫描数据时一次能返回的数据行数。 | |
MaxParallel | 最大并发数。请求支持的最大并发数由用户数据量决定。数据量越大,支持的并发数越多,每次任务前可以通过ComputeSplits API进行获取。 | |
CurrentParallelID | 当前并发ID。取值范围为[0, MaxParallel)。 | |
Token | 用于翻页功能。ParallelScan请求结果中有下一次进行翻页的Token,使用该Token可以接着上一次的结果继续读取数据。 | |
AliveTime | ParallelScan的当前任务有效时间,也是Token的有效时间。默认值为60,建议使用默认值,单位为秒。如果在有效时间内没有发起下一次请求,则不能继续读取数据。持续发起请求会刷新Token有效时间。 说明 由于服务端采用异步方式清理过期任务,因此当前任务只保证在设置的有效时间内不会过期,但不能保证有效时间之后一定过期。 | |
ColumnsToGet | ParallelScan目前仅可以扫描多元索引中的数据,需要在创建多元索引时设置附加存储(即Store=true)。 | |
SessionId | 本次并发扫描数据任务的SessionId。您可以通过ComputeSplits API创建Session,同时获得本次任务支持的最大并发数。 |
示例
单并发扫描数据和多线程并发扫描数据的代码示例如下:
单并发扫描数据
/** * ParallelScan单并发扫描数据。 */ func ParallelScanSingleConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) { computeSplitsResp, err := computeSplits(client, tableName, indexName) if err != nil { fmt.Printf("%#v", err) return } query := search.NewScanQuery().SetQuery(&search.MatchAllQuery{}).SetLimit(2) req := &tablestore.ParallelScanRequest{} req.SetTableName(tableName). SetIndexName(indexName). SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}). SetScanQuery(query). SetSessionId(computeSplitsResp.SessionId) res, err := client.ParallelScan(req) if err != nil { fmt.Printf("%#v", err) return } total := len(res.Rows) for res.NextToken != nil { req.SetScanQuery(query.SetToken(res.NextToken)) res, err = client.ParallelScan(req) if err != nil { fmt.Printf("%#v", err) return } total += len(res.Rows) //process rows each loop } fmt.Println("total: ", total) }
多线程并发扫描数据
/** * ParallelScan多并发扫描数据。 */ func ParallelScanMultiConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) { computeSplitsResp, err := computeSplits(client, tableName, indexName) if err != nil { fmt.Printf("%#v", err) return } var wg sync.WaitGroup wg.Add(int(computeSplitsResp.SplitsSize)) for i := int32(0); i < computeSplitsResp.SplitsSize; i++ { current := i go func() { defer wg.Done() query := search.NewScanQuery(). SetQuery(&search.MatchAllQuery{}). SetCurrentParallelID(current). SetMaxParallel(computeSplitsResp.SplitsSize). SetLimit(2) req := &tablestore.ParallelScanRequest{} req.SetTableName(tableName). SetIndexName(indexName). SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}). SetScanQuery(query). SetSessionId(computeSplitsResp.SessionId) res, err := client.ParallelScan(req) if err != nil { fmt.Printf("%#v", err) return } total := len(res.Rows) for res.NextToken != nil { req.SetScanQuery(query.SetToken(res.NextToken)) res, err = client.ParallelScan(req) if err != nil { fmt.Printf("%#v", err) return } total += len(res.Rows) //process rows each loop } fmt.Println("total: ", total) }() } wg.Wait() }
反馈
- 本页导读 (1)
文档反馈