表格存储的 SDK 提供了 BatchGetRow、BatchWriteRow 和 GetRange 等多行操作的接口。

批量读(BatchGetRow)

批量读取一个或多个表中的若干行数据。

BatchGetRow 操作可视为多个 GetRow 操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。

与执行大量的 GetRow 操作相比,使用 BatchGetRow 操作可以有效减少请求的响应时间,提高数据的读取速率。

接口

        """
        说明:批量获取多行数据。
        request = BatchGetRowRequest()
        request.add(TableInBatchGetRowItem(myTable0, primary_keys, column_to_get=None, column_filter=None))
        request.add(TableInBatchGetRowItem(myTable1, primary_keys, column_to_get=None, column_filter=None))
        request.add(TableInBatchGetRowItem(myTable2, primary_keys, column_to_get=None, column_filter=None))
        request.add(TableInBatchGetRowItem(myTable3, primary_keys, column_to_get=None, column_filter=None))
        response = client.batch_get_row(request)
        ``response``为返回的结果,类型为tablestore.metadata.BatchGetRowResponse
        """
        def batch_get_row(self, request): 
			

示例

批量一次读 3 行。

    # 需要返回的列
    columns_to_get = ['name', 'mobile', 'address', 'age']

    # 读3行
    rows_to_get = [] 
    for i in range(0, 3):
        primary_key = [('gid',i), ('uid',i+1)]
        rows_to_get.append(primary_key)

    # 过滤条件:name等于Johb,且 address等于China。
    cond = CompositeColumnCondition(LogicalOperator.AND)
    cond.add_sub_condition(SingleColumnCondition("name", "John", ComparatorType.EQUAL))
    cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))

    # 构造批量读请求。
    request = BatchGetRowRequest()

    # 增加表:table_name中需要读取的行,最后一个参数1表示最读取最新的一个版本。
    request.add(TableInBatchGetRowItem(table_name, rows_to_get, columns_to_get, cond, 1))

    # 增加表:notExistTable中需要读取的行。
    request.add(TableInBatchGetRowItem('notExistTable', rows_to_get, columns_to_get, cond, 1))

    try:
           result = client.batch_get_row(request) 
        print ('Result status: %s'%(result.is_all_succeed()))

        table_result_0 = result.get_result_by_table(table_name)
        table_result_1 = result.get_result_by_table('notExistTable')
        print ('Check first table\'s result:')
        for item in table_result_0: 
            if item.is_ok:
                print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
            else:
                print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
        print ('Check second table\'s result:')
        for item in table_result_1:
            if item.is_ok:
                print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
            else: 
                print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
    # 客户端异常,一般为参数错误或者网络异常。
    except OTSClientError as e:
        print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
    # 服务端异常,一般为参数错误或者流控错误。
    except OTSServiceError as e:
        print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())
			

详细代码请参见BatchGetRow@GitHub

批量写(BatchWriteRow)

批量插入、修改或删除一个或多个表中的若干行数据。

BatchWriteRow 操作可视为多个 PutRow、UpdateRow、DeleteRow 操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。

接口

        """
        说明:批量修改多行数据。
        request = MiltiTableInBatchWriteRowItem()
        request.add(TableInBatchWriteRowItem(table0, row_items))
        request.add(TableInBatchWriteRowItem(table1, row_items))
        response = client.batch_write_row(request)
        ``response``为返回的结果,类型为tablestore.metadata.BatchWriteRowResponse 
        """
        def batch_write_row(self, request):
			

示例

批量写数据。

    put_row_items = []
    ## 增加PutRow的行。
    for i in range(0, 10):
        primary_key = [('gid',i), ('uid',i+1)]
        attribute_columns = [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]
        row = Row(primary_key, attribute_columns)
        condition = Condition(RowExistenceExpectation.IGNORE)
        item = PutRowItem(row, condition)
        put_row_items.append(item)

    ## 增加UpdateRow的行
    for i in range(10, 20):
        primary_key = [('gid',i), ('uid',i+1)]
        attribute_columns = {'put': [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]}
        row = Row(primary_key, attribute_columns)
        condition = Condition(RowExistenceExpectation.IGNORE, SingleColumnCondition("age", i, ComparatorType.EQUAL))
        item = UpdateRowItem(row, condition)
        put_row_items.append(item)

    ## 增加DeleteRow的行
    delete_row_items = []
    for i in range(10, 20):
        primary_key = [('gid',i), ('uid',i+1)]
        row = Row(primary_key)
        condition = Condition(RowExistenceExpectation.IGNORE)
        item = DeleteRowItem(row, condition)
        delete_row_items.append(item)

    # 构造批量写的请求。
    request = BatchWriteRowRequest()
    request.add(TableInBatchWriteRowItem(table_name, put_row_items))
    request.add(TableInBatchWriteRowItem('notExistTable', delete_row_items))

    # 调用batch_write_row 方法执行批量写,如果请求参数等有错误会抛异常;如果部分行失败,则不会抛异常,但是内部的Item会失败。
    try:
        result = client.batch_write_row(request)
        print ('Result status: %s'%(result.is_all_succeed()))

        ## 检查Put行的结果。
        print ('check first table\'s put results:')
        succ, fail = result.get_put()
        for item in succ:
            print ('Put succeed, consume %s write cu.' % item.consumed.write)
        for item in fail:
            print ('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))

        ## 检查Update的行结果
        print ('check first table\'s update results:')
        succ, fail = result.get_update()
        for item in succ:
            print ('Update succeed, consume %s write cu.' % item.consumed.write)
        for item in fail:
            print ('Update failed, error code: %s, error message: %s' % (item.error_code, item.error_message))

        ## 检查Delete行的结果。
        print ('check second table\'s delete results:')
        succ, fail = result.get_delete()
        for item in succ:
            print ('Delete succeed, consume %s write cu.' % item.consumed.write)
        for item in fail:
            print ('Delete failed, error code: %s, error message: %s' % (item.error_code, item.error_message)) 
    # 客户端异常,一般为参数错误或者网络异常。
    except OTSClientError as e:
        print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
    # 服务端异常,一般为参数错误或者流控错误。
    except OTSServiceError as e:
        print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())
			

详细代码请参见BatchWriteRow@GitHub

范围读(GetRange)

读取指定主键范围内的数据。

接口

        """
        说明:根据范围条件获取多行数据。
        ``table_name``是对应的表名。
        ``direction``表示范围的方向,字符串格式,取值包括'FORWARD'和'BACKWARD'。
        ``inclusive_start_primary_key``表示范围的起始主键(在范围内)。
        ``exclusive_end_primary_key``表示范围的结束主键(不在范围内)。
        ``columns_to_get``是可选参数,表示要获取的列的名称列表,类型为list;如果不填,表示获取所有列。
        ``limit``是可选参数,表示最多读取多少行;如果不填,则没有限制。
        ``column_filter``是可选参数,表示读取指定条件的行
        ``max_version``是可选参数,表示返回的最大版本数目,与time_range必须存在一个。
        ``time_range``是可选参数,表示返回的版本的范围,于max_version必须存在一个。
        ``start_column``是可选参数,用于宽行读取,表示本次读取的起始列。
        ``end_column``是可选参数,用于宽行读取,表示本次读取的结束列。
        ``token``是可选参数,用于宽行读取,表示本次读取的起始列位置,内容被二进制编码,来源于上次请求的返回结果中。

        返回:符合条件的结果列表。
        ``consumed``表示本次操作消耗的CapacityUnit,是tablestore.metadata.CapacityUnit类的实例。
        ``next_start_primary_key``表示下次get_range操作的起始点的主健列,类型为dict。
        ``row_list``表示本次操作返回的行数据列表,格式为:[Row, ...]。  
        """
   def get_range(self, table_name, direction,
                  inclusive_start_primary_key,
                  exclusive_end_primary_key,
                  columns_to_get=None,
                  limit=None,
                  column_filter=None,
                  max_version=None,
                  time_range=None,
                  start_column=None,
                  end_column=None,
                  token = None):
			

示例

范围读取。

    # 范围查询的起始主键
    inclusive_start_primary_key = [('uid',INF_MIN), ('gid',INF_MIN)]

    # 范围查询的结束主键
    exclusive_end_primary_key = [('uid',INF_MAX), ('gid',INF_MAX)]

    # 查询所有列
    columns_to_get = []

    # 每次最多返回90,如果总共有100个结果,首次查询时指定limit=90,则第一次最多返回90,最少可能返回0个结果,但是next_start_primary_key不为None。
    limit = 90

    # 设置过滤器
    cond = CompositeColumnCondition(LogicalOperator.AND)
    cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))
    cond.add_sub_condition(SingleColumnCondition("age", 50, ComparatorType.LESS_THAN))

    try:
        # 调用get_range接口
        consumed, next_start_primary_key, row_list, next_token  = client.get_range(
                table_name, Direction.FORWARD,
                inclusive_start_primary_key, exclusive_end_primary_key,
                columns_to_get,
                limit,
                column_filter = cond,
                max_version = 1
                time_range = (1557125059000, 1557129059000) # start_time大于等于1557125059000,end_time小于1557129059000
    )

        all_rows = []
        all_rows.extend(row_list)

        # 当next_start_primary_key 不为空的时候,说明还有数据,继续循环读取。
        while next_start_primary_key is not None:
            inclusive_start_primary_key = next_start_primary_key
            consumed, next_start_primary_key, row_list, next_token = client.get_range(
                table_name, Direction.FORWARD,
                inclusive_start_primary_key, exclusive_end_primary_key,
                columns_to_get, limit,
                column_filter = cond,
                max_version = 1
        )
            all_rows.extend(row_list)

        # 打印主键和属性列
        for row in all_rows:
            print (row.primary_key, row.attribute_columns)
        print ('Total rows: ', len(all_rows))
    # 客户端异常,一般为参数错误或者网络异常。
    except OTSClientError as e:
        print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
    # 服务端异常,一般为参数错误或者流控错误。
    except OTSServiceError as e:
        print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())
			

详细代码请参见GetRange@GitHub