写入数据

表格存储提供了单行插入、单行更新和批量写入的写入方式用于写入数据到数据表。当要写入数据到数据表时,您需要指定完整主键以及要增删改的属性列。在高并发应用中写入数据时,您可以配置行存在性条件或者列条件实现按照指定条件更新数据。

如果需要了解表格存储各场景的应用案例,请参见快速玩转Tablestore入门与实战

写入方式

表格存储提供的数据写入接口包括PutRow、UpdateRowBatchWriteRow。写入数据时,请根据实际场景选择相应的写入方式。

写入方式

说明

适用场景

插入单行数据

调用PutRow接口新写入一行数据。如果该行已存在,则表格存储会先删除原行数据(原行的所有列以及所有版本的数据),再写入新行数据。

适用于要写入数据较少的场景。

更新单行数据

调用UpdateRow接口更新一行数据,支持增加和删除一行中的属性列,删除属性列指定版本的数据,或者更新已存在的属性列值。如果更新的行不存在,则新增一行数据。

适用于更新已写入数据的场景,例如删除属性列、删除某个数据版本、修改属性列值等。

批量写入数据

调用BatchWriteRow接口在一次请求中进行批量写入操作或者一次对多张表进行写入。

BatchWriteRow操作由多个PutRow、UpdateRow、DeleteRow子操作组成,构造子操作的过程与使用PutRow接口、UpdateRow接口和DeleteRow接口时相同。

适用于要写入、删除或者更新大量数据以及要同时进行增删改数据的场景。

前提条件

  • 已初始化Client。具体操作,请参见初始化OTSClient

  • 已创建数据表并写入数据。

插入单行数据

调用PutRow接口新写入一行数据。如果该行已存在,则先删除原行数据(原行的所有列以及所有版本的数据),再写入新行数据。

接口

"""
说明:写入一行数据。返回本次操作消耗的CapacityUnit。
``table_name``是对应的表名。
``row``是行数据,包括主键和属性列。
``condition``表示执行操作前做条件检查,满足条件才执行,是tablestore.metadata.Condition类的实例。
目前支持两种条件检测,一是对行的存在性进行检查,检查条件包括:'IGNORE','EXPECT_EXIST'和'EXPECT_NOT_EXIST';二是对属性列值的条件检测。
``return_type``表示返回类型,是tablestore.metadata.ReturnType类的实例,目前仅支持返回PrimaryKey,一般用于主键列自增中。
返回:本次操作消耗的CapacityUnit和需要返回的行数据。
consumed表示消耗的CapacityUnit,是tablestore.metadata.CapacityUnit类的实例。
return_row表示返回的行数据,可能包括主键、属性列。
示例:
    primary_key = [('gid',1), ('uid',101)]
    attribute_columns = [('name','张三'), ('mobile',111111111), ('address','中国A地'), ('age',20)]
    row = Row(primary_key, attribute_columns)
    condition = Condition('EXPECT_NOT_EXIST')
    consumed, return_row = client.put_row('myTable', row, condition)
"""
def put_row(self, table_name, row, condition = None, return_type = None, transaction_id = None):

参数

参数

说明

table_name

数据表名称。

row

行数据,包括以下配置:

  • primary_key:行的主键。主键包括主键列名、主键类型和主键值。

    重要
    • 设置的主键个数和类型必须和数据表的主键个数和类型一致。

    • 当主键为自增列时,只需将自增列的值设置为占位符。更多信息,请参见主键列自增

  • attribute_columns:行的属性列。每一项的顺序是属性名、属性值ColumnValue、属性类型ColumnType(可选)、时间戳(可选)。

    • 属性名即属性列的名称,属性类型即属性列的数据类型。更多信息,请参见命名规则和数据类型

      属性类型可以是INTEGER、STRING(UTF-8编码字符串)、BINARY、BOOLEAN、DOUBLE五种,分别用ColumnType.INTEGER、ColumnType.STRING、ColumnType.BINARY、ColumnType.BOOLEAN、ColumnType.DOUBLE表示,其中BINARY不可省略,其他类型都可以省略。

    • 时间戳即数据的版本号。更多信息,请参见数据版本和生命周期

      数据的版本号可以由系统自动生成或者自定义,如果不设置此参数,则默认由系统自动生成。

      • 当由系统自动生成数据的版本号时,系统默认将当前时间的毫秒单位时间戳(从1970-01-01 00:00:00 UTC计算起的毫秒数)作为数据的版本号。

      • 当自定义数据的版本号时,版本号需要为64位的毫秒单位时间戳且在有效版本范围内。

condition

使用条件更新,可以设置原行的存在性条件或者原行中某列的列值条件。更多信息,请参见条件更新

说明
  • RowExistenceExpectation.IGNORE表示无论此行是否存在均会插入新数据,如果之前行已存在,则写入数据时会覆盖原有数据。

  • RowExistenceExpectation.EXPECT_EXIST表示只有此行存在时才会插入新数据,写入数据时会覆盖原有数据。

  • RowExistenceExpectation.EXPECT_NOT_EXIST表示只有此行不存在时才会插入数据。

return_type

表示返回数据的类型,是tablestore.metadata.ReturnType类的实例,目前仅支持返回PrimaryKey,一般用于主键列自增中。

transaction_id

局部事务ID。使用局部事务功能删除数据时必须设置此参数。

示例

插入一行数据。

说明

如下示例中属性列age的版本为1498184687000,此值是20170623日,如果当前时间-max_time_deviation(max_time_deviation由创建数据表时指定)大于1498184687000时,则PutRow时会被禁止。

# 设置数据表名称。
table_name = '<TABLE_NAME>'

# 主键的第一个主键列是gid,值是整数1,第二个主键列是uid,值是整数101。
primary_key = [('gid',1), ('uid',101)]
# 属性列包括五个:
# 第一个属性列的名字是name,值是字符串John,版本号没有指定,使用系统当前时间作为版本号。
# 第二个属性列的名字是mobile,值是整数1390000****,版本号没有指定,使用系统当前时间作为版本号。
# 第三个属性列的名字是address,值是二进制的China,版本号没有指定,使用系统当前时间作为版本号。
# 第四个属性列的名字是female,值是布尔值False,版本号没有指定,使用系统当前时间作为版本号。
# 第五个属性列的名字是age,值是29.7,指定版本号为1498184687000。
attribute_columns = [('name','John'), ('mobile',1390000****),('address', bytearray('China', encoding='utf8')),('female', False), ('age', 29.7, 1498184687000)]
# 通过primary_key和attribute_columns构造Row。
row = Row(primary_key, attribute_columns)

# 设置条件更新,行条件检查为期望行不存在。如果行存在会出现Condition Update Failed错误。
condition = Condition(RowExistenceExpectation.EXPECT_NOT_EXIST)
try:
    # 调用put_row方法,如果没有指定ReturnType,则return_row为None。
    consumed, return_row = client.put_row(table_name, row, condition)

    # 打印此次请求消耗的写CU。
    print('put row succeed, consume %s write cu.' % consumed.write)
# 客户端异常,一般为参数错误或者网络异常。
except OTSClientError as e:
    print("put row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
# 服务端异常,一般为参数错误或者流控错误。
except OTSServiceError as e:
    print("put row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))                    

详细代码请参见PutRow@GitHub

更新单行数据

调用UpdateRow接口更新一行数据,可以增加和删除一行中的属性列,删除属性列指定版本的数据,或者更新已存在的属性列的值。如果更新的行不存在,则新增一行数据。

说明

UpdateRow请求中只包含删除指定的列且该行不存在时,则该请求不会新增一行数据。

接口

"""
说明:更新一行数据。
``table_name``是对应的表名。
``row``表示更新的行数据,包括主键列和属性列,主键列是list;属性列是dict。
``condition``表示执行操作前做条件检查,满足条件才执行,是tablestore.metadata.Condition类的实例。
目前支持两种条件检测,一是对行的存在性进行检查,检查条件包括:'IGNORE','EXPECT_EXIST'和'EXPECT_NOT_EXIST';二是对属性列值的条件检测。
``return_type``表示返回类型,是tablestore.metadata.ReturnType类的实例,目前仅支持返回PrimaryKey,一般用于主键列自增中。
返回:本次操作消耗的CapacityUnit和需要返回的行数据return_row
consumed表示消耗的CapacityUnit,是tablestore.metadata.CapacityUnit类的实例。
return_row表示需要返回的行数据。
示例:
    primary_key = [('gid',1), ('uid',101)]
    update_of_attribute_columns = {
        'put' : [('name','张三丰'), ('address','中国B地')],
        'delete' : [('mobile', 1493725896147)],
        'delete_all' : [('age')],
        'increment' : [('counter', 1)]
    }
    row = Row(primary_key, update_of_attribute_columns)
    condition = Condition('EXPECT_EXIST')
    consumed = client.update_row('myTable', row, condition)
"""
def update_row(self, table_name, row, condition, return_type = None, transaction_id = None):

参数

参数

说明

table_name

数据表名称。

row

行数据,包括以下配置:

  • primary_key:行的主键。主键包括主键列名、主键类型和主键值。

    重要
    • 设置的主键个数和类型必须和数据表的主键个数和类型一致。

    • 当主键为自增列时,只需将自增列的值设置为占位符。更多信息,请参见主键列自增

  • update_of_attribute_columns:更新的属性列。

    • 增加或更新数据时,需要设置属性名、属性值、属性类型(可选)、时间戳(可选)。

      属性名即属性列的名称,属性类型即属性列的数据类型。更多信息,请参见命名规则和数据类型

      时间戳即数据的版本号,可以由系统自动生成或者自定义,如果不设置此参数,则默认由系统自动生成。更多信息,请参见数据版本和生命周期

      • 当由系统自动生成数据的版本号时,系统默认将当前时间的毫秒单位时间戳(从1970-01-01 00:00:00 UTC计算起的毫秒数)作为数据的版本号。

      • 当自定义数据的版本号时,版本号需要为64位的毫秒单位时间戳且在有效版本范围内。

    • 删除属性列特定版本的数据时,只需要设置属性名和时间戳。

      时间戳是64位整数,单位为毫秒,表示某个特定版本的数据。

    • 删除属性列时,只需要设置属性名。

      说明

      删除一行的全部属性列不等同于删除该行,如果需要删除该行,请使用DeleteRow操作。

condition

使用条件更新,可以设置原行的存在性条件或者原行中某列的列值条件。更多信息,请参见条件更新

return_type

表示返回数据的类型,是tablestore.metadata.ReturnType类的实例,目前仅支持返回PrimaryKey,一般用于主键列自增中。

transaction_id

局部事务ID。使用局部事务功能删除数据时必须设置此参数。

示例

更新一行数据。

# 设置数据表名称。
table_name = '<TABLE_NAME>'

# 主键的第一列是gid,值是整数1,第二列是uid,值是整数101。
primary_key = [('gid',1), ('uid',101)]
# 更新包括PUT,DELETE和DELETE_ALL三部分。
# PUT:新增或者更新列值。示例中是新增两列,第一列名字是name,值是David,第二列名字是address,值是Hongkong。
# DELETE: 删除指定版本号(时间戳)的值。示例中是删除版本为1488436949003的address列的值。
# DELETE_ALL:删除列。示例中是删除mobile和age两列的所有版本的值。
update_of_attribute_columns = {
    'PUT' : [('name','David'), ('address','Hongkong')],
    'DELETE' : [('address', None, 1488436949003)],
    'DELETE_ALL' : [('mobile'), ('age')],
}
row = Row(primary_key, update_of_attribute_columns)

# 行条件检查为忽略,无论行是否存在,均会更新。
condition = Condition(RowExistenceExpectation.IGNORE, SingleColumnCondition("age", 20, ComparatorType.EQUAL)) # update row only when this row is exist
try:
    consumed, return_row = client.update_row(table_name, row, condition)
    
    # 打印此次请求消耗的写CU。
    print('put row succeed, consume %s write cu.' % consumed.write)
# 客户端异常,一般为参数错误或者网络异常。
except OTSClientError as e:
    print("update row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
# 服务端异常,一般为参数错误或者流控错误。
except OTSServiceError as e:
    print("update row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))

详细代码请参见UpdateRow@GitHub

批量写入数据

调用BatchWriteRow接口在一次请求中进行批量写入操作或者一次对多张表进行写入。

BatchWriteRow的各个子操作独立执行,表格存储会分别返回各个子操作的执行结果。

注意事项

由于批量写入可能存在部分行失败的情况,失败行的Index及错误信息在返回的BatchWriteRowResponse中,但并不抛出异常。因此调用BatchWriteRow接口时,需要检查返回值,判断每行的状态是否成功;如果不检查返回值,则可能会忽略掉部分操作的失败。

当服务端检查到某些操作出现参数错误时,BatchWriteRow接口可能会抛出参数错误的异常,此时该请求中所有的操作都未执行。

接口

"""
说明:批量修改多行数据。
request = MiltiTableInBatchWriteRowItem()
request.add(TableInBatchWriteRowItem(table0, row_items))
request.add(TableInBatchWriteRowItem(table1, row_items))
response = client.batch_write_row(request)
``response``为返回的结果,类型为tablestore.metadata.BatchWriteRowResponse。
"""
def batch_write_row(self, request):                   

示例

批量写数据。

put_row_items = []
# 增加PutRow的行。
for i in range(0, 20):
    primary_key = [('gid',i), ('uid',i+1)]
    attribute_columns = [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]
    row = Row(primary_key, attribute_columns)
    condition = Condition(RowExistenceExpectation.IGNORE)
    item = PutRowItem(row, condition)
    put_row_items.append(item)

# 增加UpdateRow的行。
for i in range(0, 10):
    primary_key = [('gid',i), ('uid',i+1)]
    attribute_columns = {'put': [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]}
    row = Row(primary_key, attribute_columns)
    condition = Condition(RowExistenceExpectation.IGNORE, SingleColumnCondition("age", i, ComparatorType.EQUAL))
    item = UpdateRowItem(row, condition)
    put_row_items.append(item)

# 增加DeleteRow的行。
delete_row_items = []
for i in range(10, 20):
    primary_key = [('gid',i), ('uid',i+1)]
    row = Row(primary_key)
    condition = Condition(RowExistenceExpectation.IGNORE)
    item = DeleteRowItem(row, condition)
    delete_row_items.append(item)

# 构造批量写请求。
request = BatchWriteRowRequest()
request.add(TableInBatchWriteRowItem('<TABLE_NAME>', put_row_items))
request.add(TableInBatchWriteRowItem('<DELETE_TABLE_NAME>', delete_row_items))

# 调用batch_write_row方法执行批量写,如果请求参数等错误会抛异常;如果部分行失败,则不会抛异常,但是内部的Item会失败。
try:
    result = client.batch_write_row(request)
    print('Result status: %s'%(result.is_all_succeed()))

    # 检查Put行的结果。
    print('check first table\'s put results:')
    succ, fail = result.get_put()
    for item in succ:
        print('Put succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))

    # 检查Update行的结果。
    print('check first table\'s update results:')
    succ, fail = result.get_update()
    for item in succ:
        print('Update succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print('Update failed, error code: %s, error message: %s' % (item.error_code, item.error_message))

    # 检查Delete行的结果。
    print('check second table\'s delete results:')
    succ, fail = result.get_delete()
    for item in succ:
        print('Delete succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print('Delete failed, error code: %s, error message: %s' % (item.error_code, item.error_message)) 
# 客户端异常,一般为参数错误或者网络异常。
except OTSClientError as e:
    print("get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
# 服务端异常,一般为参数错误或者流控错误。
except OTSServiceError as e:
    print("get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))

详细代码请参见BatchWriteRow@GitHub

常见问题

相关文档

  • 当要在高并发应用中实现按照指定条件更新数据时,您可以通过条件更新实现。更多信息,请参见条件更新

  • 当要为在线应用提供实时统计功能时,例如统计帖子的PV(实时浏览量)等,您可以通过原子计数器实现。更多信息,请参见原子计数器

  • 当要进行单行写或多行写的原子操作,您可以通过局部事务实现。更多信息,请参见局部事务

  • 写入数据后,即可根据需要读取或者删除表中数据。更多信息,请参见读取数据删除数据