全部产品
表格存储

多行数据操作

更新时间:2017-06-27 10:21:36   分享:   

表格存储的 SDK 提供了 BatchGetRow、BatchWriteRow 和 GetRange 等多行操作的接口。

批量读(BatchGetRow)

批量读取一个或多个表中的若干行数据。

BatchGetRow 操作可视为多个 GetRow 操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。

与执行大量的 GetRow 操作相比,使用 BatchGetRow 操作可以有效减少请求的响应时间,提高数据的读取速率。

接口

  1. """
  2. 说明:批量获取多行数据。
  3. request = BatchGetRowRequest()
  4. request.add(TableInBatchGetRowItem(myTable0, primary_keys, column_to_get=None, column_filter=None))
  5. request.add(TableInBatchGetRowItem(myTable1, primary_keys, column_to_get=None, column_filter=None))
  6. request.add(TableInBatchGetRowItem(myTable2, primary_keys, column_to_get=None, column_filter=None))
  7. request.add(TableInBatchGetRowItem(myTable3, primary_keys, column_to_get=None, column_filter=None))
  8. response = client.batch_get_row(request)
  9. ``response``为返回的结果,类型为tablestore.metadata.BatchGetRowResponse
  10. """
  11. def batch_get_row(self, request):

示例

批量一次读 3 行。

  1. # 需要返回的列
  2. columns_to_get = ['name', 'mobile', 'address', 'age']
  3. # 读3行
  4. rows_to_get = []
  5. for i in range(0, 3):
  6. primary_key = [('gid',i), ('uid',i+1)]
  7. rows_to_get.append(primary_key)
  8. # 过滤条件:name等于Johb,且 address等于China。
  9. cond = CompositeColumnCondition(LogicalOperator.AND)
  10. cond.add_sub_condition(SingleColumnCondition("name", "John", ComparatorType.EQUAL))
  11. cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))
  12. # 构造批量读请求。
  13. request = BatchGetRowRequest()
  14. # 增加表:table_name中需要读取的行,最后一个参数1表示最读取最新的一个版本。
  15. request.add(TableInBatchGetRowItem(table_name, rows_to_get, columns_to_get, cond, 1))
  16. # 增加表:notExistTable中需要读取的行。
  17. request.add(TableInBatchGetRowItem('notExistTable', rows_to_get, columns_to_get, cond, 1))
  18. try:
  19. result = client.batch_get_row(request)
  20. print ('Result status: %s'%(result.is_all_succeed()))
  21. table_result_0 = result.get_result_by_table(table_name)
  22. table_result_1 = result.get_result_by_table('notExistTable')
  23. print ('Check first table\'s result:')
  24. for item in table_result_0:
  25. if item.is_ok:
  26. print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
  27. else:
  28. print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
  29. print ('Check second table\'s result:')
  30. for item in table_result_1:
  31. if item.is_ok:
  32. print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
  33. else:
  34. print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
  35. # 客户端异常,一般为参数错误或者网络异常。
  36. except OTSClientError as e:
  37. print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
  38. # 服务端异常,一般为参数错误或者流控错误。
  39. except OTSServiceError as e:
  40. print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())

提示:

详细代码:BatchGetRow@GitHub

批量写(BatchWriteRow)

批量插入、修改或删除一个或多个表中的若干行数据。

BatchWriteRow 操作可视为多个 PutRow、UpdateRow、DeleteRow 操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。

接口

  1. """
  2. 说明:批量修改多行数据。
  3. request = MiltiTableInBatchWriteRowItem()
  4. request.add(TableInBatchWriteRowItem(table0, row_items))
  5. request.add(TableInBatchWriteRowItem(table1, row_items))
  6. response = client.batch_write_row(request)
  7. ``response``为返回的结果,类型为tablestore.metadata.BatchWriteRowResponse
  8. """
  9. def batch_write_row(self, request):

示例

批量写数据。

  1. put_row_items = []
  2. ## 增加PutRow的行。
  3. for i in range(0, 10):
  4. primary_key = [('gid',i), ('uid',i+1)]
  5. attribute_columns = [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]
  6. row = Row(primary_key, attribute_columns)
  7. condition = Condition(RowExistenceExpectation.IGNORE)
  8. item = PutRowItem(row, condition)
  9. put_row_items.append(item)
  10. ## 增加UpdateRow的行
  11. for i in range(10, 20):
  12. primary_key = [('gid',i), ('uid',i+1)]
  13. attribute_columns = {'put': [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]}
  14. row = Row(primary_key, attribute_columns)
  15. condition = Condition(RowExistenceExpectation.IGNORE, SingleColumnCondition("age", i, ComparatorType.EQUAL))
  16. item = UpdateRowItem(row, condition)
  17. put_row_items.append(item)
  18. ## 增加DeleteRow的行
  19. delete_row_items = []
  20. for i in range(10, 20):
  21. primary_key = [('gid',i), ('uid',i+1)]
  22. row = Row(primary_key)
  23. condition = Condition(RowExistenceExpectation.IGNORE)
  24. item = DeleteRowItem(row, condition)
  25. delete_row_items.append(item)
  26. # 构造批量写的请求。
  27. request = BatchWriteRowRequest()
  28. request.add(TableInBatchWriteRowItem(table_name, put_row_items))
  29. request.add(TableInBatchWriteRowItem('notExistTable', delete_row_items))
  30. # 调用batch_write_row 方法执行批量写,如果请求参数等有错误会抛异常;如果部分行失败,则不会抛异常,但是内部的Item会失败。
  31. try:
  32. result = client.batch_write_row(request)
  33. print ('Result status: %s'%(result.is_all_succeed()))
  34. ## 检查Put行的结果。
  35. print ('check first table\'s put results:')
  36. succ, fail = result.get_put()
  37. for item in succ:
  38. print ('Put succeed, consume %s write cu.' % item.consumed.write)
  39. for item in fail:
  40. print ('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
  41. ## 检查Update的行结果
  42. print ('check first table\'s update results:')
  43. succ, fail = result.get_update()
  44. for item in succ:
  45. print ('Update succeed, consume %s write cu.' % item.consumed.write)
  46. for item in fail:
  47. print ('Update failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
  48. ## 检查Delete行的结果。
  49. print ('check second table\'s delete results:')
  50. succ, fail = result.get_delete()
  51. for item in succ:
  52. print ('Delete succeed, consume %s write cu.' % item.consumed.write)
  53. for item in fail:
  54. print ('Delete failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
  55. # 客户端异常,一般为参数错误或者网络异常。
  56. except OTSClientError as e:
  57. print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
  58. # 服务端异常,一般为参数错误或者流控错误。
  59. except OTSServiceError as e:
  60. print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())

提示:

详细代码:BatchWriteRow@GitHub

范围读(GetRange)

读取指定主键范围内的数据。

接口

  1. """
  2. 说明:根据范围条件获取多行数据。
  3. ``table_name``是对应的表名。
  4. ``direction``表示范围的方向,字符串格式,取值包括'FORWARD'和'BACKWARD'。
  5. ``inclusive_start_primary_key``表示范围的起始主键(在范围内)。
  6. ``exclusive_end_primary_key``表示范围的结束主键(不在范围内)。
  7. ``columns_to_get``是可选参数,表示要获取的列的名称列表,类型为list;如果不填,表示获取所有列。
  8. ``limit``是可选参数,表示最多读取多少行;如果不填,则没有限制。
  9. ``column_filter``是可选参数,表示读取指定条件的行
  10. ``max_version``是可选参数,表示返回的最大版本数目,与time_range必须存在一个。
  11. ``time_range``是可选参数,表示返回的版本的范围,于max_version必须存在一个。
  12. ``start_column``是可选参数,用于宽行读取,表示本次读取的起始列。
  13. ``end_column``是可选参数,用于宽行读取,表示本次读取的结束列。
  14. ``token``是可选参数,用于宽行读取,表示本次读取的起始列位置,内容被二进制编码,来源于上次请求的返回结果中。
  15. 返回:符合条件的结果列表。
  16. ``consumed``表示本次操作消耗的CapacityUnit,是tablestore.metadata.CapacityUnit类的实例。
  17. ``next_start_primary_key``表示下次get_range操作的起始点的主健列,类型为dict。
  18. ``row_list``表示本次操作返回的行数据列表,格式为:[Row, ...]。
  19. """
  20. def get_range(self, table_name, direction,
  21. inclusive_start_primary_key,
  22. exclusive_end_primary_key,
  23. columns_to_get=None,
  24. limit=None,
  25. column_filter=None,
  26. max_version=None,
  27. time_range=None,
  28. start_column=None,
  29. end_column=None,
  30. token = None):

示例

范围读取。

  1. # 范围查询的起始主键
  2. inclusive_start_primary_key = [('uid',INF_MIN), ('gid',INF_MIN)]
  3. # 范围查询的结束主键
  4. exclusive_end_primary_key = [('uid',INF_MAX), ('gid',INF_MAX)]
  5. # 查询所有列
  6. columns_to_get = []
  7. # 每次最多返回90,如果总共有100个结果,首次查询时指定limit=90,则第一次最多返回90,最少可能返回0个结果,但是next_start_primary_key不为None。
  8. limit = 90
  9. # 设置过滤器
  10. cond = CompositeColumnCondition(LogicalOperator.AND)
  11. cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))
  12. cond.add_sub_condition(SingleColumnCondition("age", 50, ComparatorType.LESS_THAN))
  13. try:
  14. # 调用get_range接口
  15. consumed, next_start_primary_key, row_list, next_token = client.get_range(
  16. table_name, Direction.FORWARD,
  17. inclusive_start_primary_key, exclusive_end_primary_key,
  18. columns_to_get,
  19. limit,
  20. column_filter = cond,
  21. max_version = 1
  22. )
  23. all_rows = []
  24. all_rows.extend(row_list)
  25. # 当next_start_primary_key 不为空的时候,说明还有数据,继续循环读取。
  26. while next_start_primary_key is not None:
  27. inclusive_start_primary_key = next_start_primary_key
  28. consumed, next_start_primary_key, row_list, next_token = client.get_range(
  29. table_name, Direction.FORWARD,
  30. inclusive_start_primary_key, exclusive_end_primary_key,
  31. columns_to_get, limit,
  32. column_filter = cond,
  33. max_version = 1
  34. )
  35. all_rows.extend(row_list)
  36. # 打印主键和属性列
  37. for row in all_rows:
  38. print (row.primary_key, row.attribute_columns)
  39. print ('Total rows: ', len(all_rows))
  40. # 客户端异常,一般为参数错误或者网络异常。
  41. except OTSClientError as e:
  42. print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
  43. # 服务端异常,一般为参数错误或者流控错误。
  44. except OTSServiceError as e:
  45. print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())

提示:

详细代码:GetRange@GitHub

本文导读目录
本文导读目录
以上内容是否对您有帮助?