表格存储的SDK提供了BatchGetRow、BatchWriteRow和GetRange等多行操作的接口。

批量读(BatchGetRow)

批量读取一个或多个表中的若干行数据。

BatchGetRow操作可视为多个GetRow操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。

与执行大量的GetRow操作相比,使用BatchGetRow操作可以有效减少请求的响应时间,提高数据的读取速率。

  • 接口
            """
            说明:批量获取多行数据。
            request = BatchGetRowRequest()
            request.add(TableInBatchGetRowItem(myTable0, primary_keys, column_to_get=None, column_filter=None))
            request.add(TableInBatchGetRowItem(myTable1, primary_keys, column_to_get=None, column_filter=None))
            request.add(TableInBatchGetRowItem(myTable2, primary_keys, column_to_get=None, column_filter=None))
            request.add(TableInBatchGetRowItem(myTable3, primary_keys, column_to_get=None, column_filter=None))
            response = client.batch_get_row(request)
            ``response``为返回的结果,类型为tablestore.metadata.BatchGetRowResponse。
            """
            def batch_get_row(self, request): 
    					
  • 示例

    批量一次读3行。

        # 设置需要返回的列。
        columns_to_get = ['name', 'mobile', 'address', 'age']
    
        # 读取3行。
        rows_to_get = [] 
        for i in range(0, 3):
            primary_key = [('gid',i), ('uid',i+1)]
            rows_to_get.append(primary_key)
    
        # 过滤条件为name等于John,且address等于China。
        cond = CompositeColumnCondition(LogicalOperator.AND)
        cond.add_sub_condition(SingleColumnCondition("name", "John", ComparatorType.EQUAL))
        cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))
    
        # 构造批量读请求。
        request = BatchGetRowRequest()
    
        # 增加表table_name中需要读取的行,最后一个参数1表示读取最新的一个版本。
        request.add(TableInBatchGetRowItem(table_name, rows_to_get, columns_to_get, cond, 1))
    
        # 增加表notExistTable中需要读取的行。
        request.add(TableInBatchGetRowItem('notExistTable', rows_to_get, columns_to_get, cond, 1))
    
        try:
               result = client.batch_get_row(request) 
            print ('Result status: %s'%(result.is_all_succeed()))
    
            table_result_0 = result.get_result_by_table(table_name)
            table_result_1 = result.get_result_by_table('notExistTable')
            print ('Check first table\'s result:')
            for item in table_result_0: 
                if item.is_ok:
                    print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
                else:
                    print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
            print ('Check second table\'s result:')
            for item in table_result_1:
                if item.is_ok:
                    print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
                else: 
                    print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
        # 客户端异常,一般为参数错误或者网络异常。
        except OTSClientError as e:
            print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
        # 服务端异常,一般为参数错误或者流控错误。
        except OTSServiceError as e:
            print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())
    					

    代码详情请参见BatchGetRow@GitHub

批量写(BatchWriteRow)

批量插入、修改或删除一个或多个表中的若干行数据。

BatchWriteRow操作可视为多个PutRow、UpdateRow、DeleteRow操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。

  • 接口
            """
            说明:批量修改多行数据。
            request = MiltiTableInBatchWriteRowItem()
            request.add(TableInBatchWriteRowItem(table0, row_items))
            request.add(TableInBatchWriteRowItem(table1, row_items))
            response = client.batch_write_row(request)
            ``response``为返回的结果,类型为tablestore.metadata.BatchWriteRowResponse。
            """
            def batch_write_row(self, request):
    					
  • 示例

    批量写数据。

        put_row_items = []
        # 增加PutRow的行。
        for i in range(0, 10):
            primary_key = [('gid',i), ('uid',i+1)]
            attribute_columns = [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]
            row = Row(primary_key, attribute_columns)
            condition = Condition(RowExistenceExpectation.IGNORE)
            item = PutRowItem(row, condition)
            put_row_items.append(item)
    
        # 增加UpdateRow的行。
        for i in range(10, 20):
            primary_key = [('gid',i), ('uid',i+1)]
            attribute_columns = {'put': [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]}
            row = Row(primary_key, attribute_columns)
            condition = Condition(RowExistenceExpectation.IGNORE, SingleColumnCondition("age", i, ComparatorType.EQUAL))
            item = UpdateRowItem(row, condition)
            put_row_items.append(item)
    
        # 增加DeleteRow的行。
        delete_row_items = []
        for i in range(10, 20):
            primary_key = [('gid',i), ('uid',i+1)]
            row = Row(primary_key)
            condition = Condition(RowExistenceExpectation.IGNORE)
            item = DeleteRowItem(row, condition)
            delete_row_items.append(item)
    
        # 构造批量写请求。
        request = BatchWriteRowRequest()
        request.add(TableInBatchWriteRowItem(table_name, put_row_items))
        request.add(TableInBatchWriteRowItem('notExistTable', delete_row_items))
    
        # 调用batch_write_row方法执行批量写,如果请求参数等错误会抛异常;如果部分行失败,则不会抛异常,但是内部的Item会失败。
        try:
            result = client.batch_write_row(request)
            print ('Result status: %s'%(result.is_all_succeed()))
    
            # 检查Put行的结果。
            print ('check first table\'s put results:')
            succ, fail = result.get_put()
            for item in succ:
                print ('Put succeed, consume %s write cu.' % item.consumed.write)
            for item in fail:
                print ('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
    
            # 检查Update行的结果。
            print ('check first table\'s update results:')
            succ, fail = result.get_update()
            for item in succ:
                print ('Update succeed, consume %s write cu.' % item.consumed.write)
            for item in fail:
                print ('Update failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
    
            # 检查Delete行的结果。
            print ('check second table\'s delete results:')
            succ, fail = result.get_delete()
            for item in succ:
                print ('Delete succeed, consume %s write cu.' % item.consumed.write)
            for item in fail:
                print ('Delete failed, error code: %s, error message: %s' % (item.error_code, item.error_message)) 
        # 客户端异常,一般为参数错误或者网络异常。
        except OTSClientError as e:
            print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
        # 服务端异常,一般为参数错误或者流控错误。
        except OTSServiceError as e:
            print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())
    					

    代码详情请参见BatchWriteRow@GitHub

范围读(GetRange)

读取指定主键范围内的数据。

  • 接口
            """
            说明:根据范围条件获取多行数据。
            ``table_name``是对应的表名。
            ``direction``表示范围的方向,字符串格式,取值包括'FORWARD'和'BACKWARD'。
            ``inclusive_start_primary_key``表示范围的起始主键(在范围内)。
            ``exclusive_end_primary_key``表示范围的结束主键(不在范围内)。
            ``columns_to_get``是可选参数,表示要获取的列的名称列表,类型为list;如果不填,表示获取所有列。
            ``limit``是可选参数,表示最多读取多少行;如果不填,则没有限制。
            ``column_filter``是可选参数,表示读取指定条件的行。
            ``max_version``是可选参数,表示返回的最大版本数目,与time_range必须存在一个。
            ``time_range``是可选参数,表示返回的版本的范围,于max_version必须存在一个。
            ``start_column``是可选参数,用于宽行读取,表示本次读取的起始列。
            ``end_column``是可选参数,用于宽行读取,表示本次读取的结束列。
            ``token``是可选参数,用于宽行读取,表示本次读取的起始列位置,内容被二进制编码,来源于上次请求的返回结果中。
    
            返回:符合条件的结果列表。
            ``consumed``表示本次操作消耗的CapacityUnit,是tablestore.metadata.CapacityUnit类的实例。
            ``next_start_primary_key``表示下次get_range操作的起始点的主健列,类型为dict。
            ``row_list``表示本次操作返回的行数据列表,格式为:[Row, ...]。  
            """
       def get_range(self, table_name, direction,
                      inclusive_start_primary_key,
                      exclusive_end_primary_key,
                      columns_to_get=None,
                      limit=None,
                      column_filter=None,
                      max_version=None,
                      time_range=None,
                      start_column=None,
                      end_column=None,
                      token = None):
    					
  • 示例

    范围读取。

        # 设置范围查询的起始主键。
        inclusive_start_primary_key = [('uid',INF_MIN), ('gid',INF_MIN)]
    
        # 设置范围查询的结束主键。
        exclusive_end_primary_key = [('uid',INF_MAX), ('gid',INF_MAX)]
    
        # 查询所有列。
        columns_to_get = []
    
        # 每次最多返回90行,如果总共有100个结果,首次查询时指定limit=90,则第一次最多返回90,最少可能返回0个结果,但是next_start_primary_key不为None。
        limit = 90
    
        # 设置过滤器。
        cond = CompositeColumnCondition(LogicalOperator.AND)
        cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))
        cond.add_sub_condition(SingleColumnCondition("age", 50, ComparatorType.LESS_THAN))
    
        try:
            # 调用get_range接口。
            consumed, next_start_primary_key, row_list, next_token  = client.get_range(
                    table_name, Direction.FORWARD,
                    inclusive_start_primary_key, exclusive_end_primary_key,
                    columns_to_get,
                    limit,
                    column_filter = cond,
                    max_version = 1
                    time_range = (1557125059000, 1557129059000) # start_time大于等于1557125059000,end_time小于1557129059000。
        )
    
            all_rows = []
            all_rows.extend(row_list)
    
            # 当next_start_primary_key不为空时,说明还有数据,继续循环读取。
            while next_start_primary_key is not None:
                inclusive_start_primary_key = next_start_primary_key
                consumed, next_start_primary_key, row_list, next_token = client.get_range(
                    table_name, Direction.FORWARD,
                    inclusive_start_primary_key, exclusive_end_primary_key,
                    columns_to_get, limit,
                    column_filter = cond,
                    max_version = 1
            )
                all_rows.extend(row_list)
    
            # 打印主键和属性列。
            for row in all_rows:
                print (row.primary_key, row.attribute_columns)
            print ('Total rows: ', len(all_rows))
        # 客户端异常,一般为参数错误或者网络异常。
        except OTSClientError as e:
            print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
        # 服务端异常,一般为参数错误或者流控错误。
        except OTSServiceError as e:
            print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())
    					

    代码详情请参见GetRange@GitHub