表格存储的 SDK 提供了 BatchGetRow、BatchWriteRow 和 GetRange 等多行操作的接口。

批量读(BatchGetRow)

批量读取一个或多个表中的若干行数据。

BatchGetRow 操作可视为多个 GetRow 操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。

与执行大量的 GetRow 操作相比,使用 BatchGetRow 操作可以有效减少请求的响应时间,提高数据的读取速率。

接口

        """
        说明:批量获取多行数据。
        request = BatchGetRowRequest()
        request.add(TableInBatchGetRowItem(myTable0, primary_keys, column_to_get=None, column_filter=None))
        request.add(TableInBatchGetRowItem(myTable1, primary_keys, column_to_get=None, column_filter=None))
        request.add(TableInBatchGetRowItem(myTable2, primary_keys, column_to_get=None, column_filter=None))
        request.add(TableInBatchGetRowItem(myTable3, primary_keys, column_to_get=None, column_filter=None))
        response = client.batch_get_row(request)
        ``response``为返回的结果,类型为tablestore.metadata.BatchGetRowResponse
        """
		def batch_get_row(self, request): 

示例

批量一次读 3 行。

    # 需要返回的列
    columns_to_get = ['name', 'mobile', 'address', 'age']
	
	# 读3行
    rows_to_get = [] 
    for i in range(0, 3):
        primary_key = [('gid',i), ('uid',i+1)]
        rows_to_get.append(primary_key)
		
	# 过滤条件:name等于Johb,且 address等于China。
    cond = CompositeColumnCondition(LogicalOperator.AND)
    cond.add_sub_condition(SingleColumnCondition("name", "John", ComparatorType.EQUAL))
    cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))
	
	# 构造批量读请求。
    request = BatchGetRowRequest()
	
	# 增加表:table_name中需要读取的行,最后一个参数1表示最读取最新的一个版本。
    request.add(TableInBatchGetRowItem(table_name, rows_to_get, columns_to_get, cond, 1))
	
	# 增加表:notExistTable中需要读取的行。
    request.add(TableInBatchGetRowItem('notExistTable', rows_to_get, columns_to_get, cond, 1))
	
	try:
   		result = client.batch_get_row(request) 
    	print ('Result status: %s'%(result.is_all_succeed()))

		table_result_0 = result.get_result_by_table(table_name)
    	table_result_1 = result.get_result_by_table('notExistTable')
    	print ('Check first table\'s result:')
    	for item in table_result_0: 
        	if item.is_ok:
            	print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
        	else:
            	print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
    	print ('Check second table\'s result:')
    	for item in table_result_1:
        	if item.is_ok:
            	print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
        	else: 
            	print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
	# 客户端异常,一般为参数错误或者网络异常。
	except OTSClientError as e:
		print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
	# 服务端异常,一般为参数错误或者流控错误。
	except OTSServiceError as e:
		print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())

详细代码请参见BatchGetRow@GitHub

批量写(BatchWriteRow)

批量插入、修改或删除一个或多个表中的若干行数据。

BatchWriteRow 操作可视为多个 PutRow、UpdateRow、DeleteRow 操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。

接口

        """
        说明:批量修改多行数据。
        request = MiltiTableInBatchWriteRowItem()
        request.add(TableInBatchWriteRowItem(table0, row_items))
        request.add(TableInBatchWriteRowItem(table1, row_items))
        response = client.batch_write_row(request)
        ``response``为返回的结果,类型为tablestore.metadata.BatchWriteRowResponse 
        """
		def batch_write_row(self, request):

示例

批量写数据。

    put_row_items = []
	## 增加PutRow的行。
    for i in range(0, 10):
        primary_key = [('gid',i), ('uid',i+1)]
        attribute_columns = [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]
        row = Row(primary_key, attribute_columns)
        condition = Condition(RowExistenceExpectation.IGNORE)
        item = PutRowItem(row, condition)
        put_row_items.append(item)

	## 增加UpdateRow的行
    for i in range(10, 20):
        primary_key = [('gid',i), ('uid',i+1)]
        attribute_columns = {'put': [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]}
        row = Row(primary_key, attribute_columns)
        condition = Condition(RowExistenceExpectation.IGNORE, SingleColumnCondition("age", i, ComparatorType.EQUAL))
        item = UpdateRowItem(row, condition)
        put_row_items.append(item)

	## 增加DeleteRow的行
    delete_row_items = []
    for i in range(10, 20):
        primary_key = [('gid',i), ('uid',i+1)]
        row = Row(primary_key)
        condition = Condition(RowExistenceExpectation.IGNORE)
        item = DeleteRowItem(row, condition)
        delete_row_items.append(item)

	# 构造批量写的请求。
    request = BatchWriteRowRequest()
    request.add(TableInBatchWriteRowItem(table_name, put_row_items))
    request.add(TableInBatchWriteRowItem('notExistTable', delete_row_items))
	
	# 调用batch_write_row 方法执行批量写,如果请求参数等有错误会抛异常;如果部分行失败,则不会抛异常,但是内部的Item会失败。
	try:
    	result = client.batch_write_row(request)
		print ('Result status: %s'%(result.is_all_succeed()))

		## 检查Put行的结果。
    	print ('check first table\'s put results:')
    	succ, fail = result.get_put()
    	for item in succ:
        	print ('Put succeed, consume %s write cu.' % item.consumed.write)
    	for item in fail:
        	print ('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))

		## 检查Update的行结果
    	print ('check first table\'s update results:')
    	succ, fail = result.get_update()
    	for item in succ:
        	print ('Update succeed, consume %s write cu.' % item.consumed.write)
    	for item in fail:
        	print ('Update failed, error code: %s, error message: %s' % (item.error_code, item.error_message))

		## 检查Delete行的结果。
    	print ('check second table\'s delete results:')
    	succ, fail = result.get_delete()
    	for item in succ:
        	print ('Delete succeed, consume %s write cu.' % item.consumed.write)
    	for item in fail:
        	print ('Delete failed, error code: %s, error message: %s' % (item.error_code, item.error_message)) 
	# 客户端异常,一般为参数错误或者网络异常。
	except OTSClientError as e:
		print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
	# 服务端异常,一般为参数错误或者流控错误。
	except OTSServiceError as e:
		print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())

详细代码请参见BatchWriteRow@GitHub

范围读(GetRange)

读取指定主键范围内的数据。

接口

        """
        说明:根据范围条件获取多行数据。
        ``table_name``是对应的表名。
        ``direction``表示范围的方向,字符串格式,取值包括'FORWARD'和'BACKWARD'。
        ``inclusive_start_primary_key``表示范围的起始主键(在范围内)。
        ``exclusive_end_primary_key``表示范围的结束主键(不在范围内)。
        ``columns_to_get``是可选参数,表示要获取的列的名称列表,类型为list;如果不填,表示获取所有列。
        ``limit``是可选参数,表示最多读取多少行;如果不填,则没有限制。
        ``column_filter``是可选参数,表示读取指定条件的行
        ``max_version``是可选参数,表示返回的最大版本数目,与time_range必须存在一个。
        ``time_range``是可选参数,表示返回的版本的范围,于max_version必须存在一个。
        ``start_column``是可选参数,用于宽行读取,表示本次读取的起始列。
        ``end_column``是可选参数,用于宽行读取,表示本次读取的结束列。
        ``token``是可选参数,用于宽行读取,表示本次读取的起始列位置,内容被二进制编码,来源于上次请求的返回结果中。

        返回:符合条件的结果列表。
        ``consumed``表示本次操作消耗的CapacityUnit,是tablestore.metadata.CapacityUnit类的实例。
        ``next_start_primary_key``表示下次get_range操作的起始点的主健列,类型为dict。
        ``row_list``表示本次操作返回的行数据列表,格式为:[Row, ...]。  
        """
   def get_range(self, table_name, direction,
                  inclusive_start_primary_key,
                  exclusive_end_primary_key,
                  columns_to_get=None,
                  limit=None,
                  column_filter=None,
                  max_version=None,
                  time_range=None,
                  start_column=None,
                  end_column=None,
                  token = None):

示例

范围读取。

	# 范围查询的起始主键
	inclusive_start_primary_key = [('uid',INF_MIN), ('gid',INF_MIN)]
	
	# 范围查询的结束主键
    exclusive_end_primary_key = [('uid',INF_MAX), ('gid',INF_MAX)]
	
	# 查询所有列
    columns_to_get = []
	
	# 每次最多返回90,如果总共有100个结果,首次查询时指定limit=90,则第一次最多返回90,最少可能返回0个结果,但是next_start_primary_key不为None。
    limit = 90

	# 设置过滤器
    cond = CompositeColumnCondition(LogicalOperator.AND)
    cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))
    cond.add_sub_condition(SingleColumnCondition("age", 50, ComparatorType.LESS_THAN))

	try:
		# 调用get_range接口
    	consumed, next_start_primary_key, row_list, next_token  = client.get_range(
                table_name, Direction.FORWARD,
                inclusive_start_primary_key, exclusive_end_primary_key,
                columns_to_get,
                limit,
                column_filter = cond,
                max_version = 1
    )

    	all_rows = []
    	all_rows.extend(row_list)
		
		# 当next_start_primary_key 不为空的时候,说明还有数据,继续循环读取。
    	while next_start_primary_key is not None:
        	inclusive_start_primary_key = next_start_primary_key
        	consumed, next_start_primary_key, row_list, next_token = client.get_range(
                table_name, Direction.FORWARD,
                inclusive_start_primary_key, exclusive_end_primary_key,
                columns_to_get, limit,
                column_filter = cond,
                max_version = 1
        )
        	all_rows.extend(row_list)

		# 打印主键和属性列
    	for row in all_rows:
        	print (row.primary_key, row.attribute_columns)
    	print ('Total rows: ', len(all_rows))
	# 客户端异常,一般为参数错误或者网络异常。
	except OTSClientError as e:
		print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
	# 服务端异常,一般为参数错误或者流控错误。
	except OTSServiceError as e:
		print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())

详细代码请参见GetRange@GitHub