Java SDK 在指定分区键值范围内创建局部事务,事务内的所有操作要么全部成功要么全部失败(隔离级别为读已提交 Read Committed),适用于实现单行或多行的原子读写。
前提条件
安装 Tablestore Java SDK并初始化客户端。
已为数据表开启局部事务功能。
说明创建数据表时如果未启用局部事务,建表后需要使用局部事务功能,或者需要查看表是否开启局部事务,请提交工单或加入钉钉技术交流群36165029092。
功能说明
// 在 SyncClient 上调用
public StartLocalTransactionResponse startLocalTransaction(StartLocalTransactionRequest request) throws TableStoreException, ClientException
public CommitTransactionResponse commitTransaction(CommitTransactionRequest request) throws TableStoreException, ClientException
public AbortTransactionResponse abortTransaction(AbortTransactionRequest request) throws TableStoreException, ClientException
// 在事务请求(继承 TxnRequest)上携带事务 ID
public void setTransactionId(String transactionId)局部事务的作用范围限定在单个分区键值内。事务内的所有读写请求共享同一个事务 ID,表格存储通过该 ID 保证原子性和隔离性。三个核心接口配合使用:
startLocalTransaction(StartLocalTransactionRequest):传入分区键值,创建局部事务并获取事务 ID。在事务内执行读写操作,每个请求需通过
setTransactionId(txnId)携带事务 ID。支持的操作:GetRow/PutRow/UpdateRow/DeleteRow/BatchWriteRow/GetRange。commitTransaction(CommitTransactionRequest)提交事务使修改生效;或abortTransaction(AbortTransactionRequest)丢弃事务放弃所有修改。
以下示例为分区键值 pkvalue 创建局部事务,在事务内写入主键为 (pkvalue, 10001) 的一行数据,然后提交事务。
String tableName = "local_tx_demo";
// 1. 为指定分区键值创建局部事务,获取事务 ID
PrimaryKeyBuilder pkBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pkBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString("pkvalue"));
PrimaryKey partitionKey = pkBuilder.build();
StartLocalTransactionRequest startRequest =
new StartLocalTransactionRequest(tableName, partitionKey);
String txnId = client.startLocalTransaction(startRequest).getTransactionID();
// 2. 在事务内写入一行数据,需指定完整主键并携带事务 ID
PrimaryKeyBuilder rowKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
rowKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString("pkvalue"));
rowKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(10001));
PrimaryKey rowKey = rowKeyBuilder.build();
RowPutChange rowPutChange = new RowPutChange(tableName, rowKey);
rowPutChange.addColumn(new Column("col1", ColumnValue.fromString("colvalue")));
rowPutChange.addColumn(new Column("col2", ColumnValue.fromLong(10)));
PutRowRequest putRequest = new PutRowRequest(rowPutChange);
putRequest.setTransactionId(txnId);
client.putRow(putRequest);
// 3. 提交事务使所有写入生效;如需放弃修改,调用 abortTransaction()
CommitTransactionRequest commitRequest = new CommitTransactionRequest(txnId);
client.commitTransaction(commitRequest);参数说明
名称 | 类型 | 说明 |
tableName(必选) | String | 数据表名称。 |
primaryKey(必选) | PrimaryKey | 数据表主键。
|
transactionId(必选) | String | 局部事务 ID,由 事务内的每个读写请求都需通过 |
使用限制
主键自增列与局部事务功能不能同时使用。
局部事务通过悲观锁(Pessimistic Lock)实现并发控制;事务期间分区键值的数据加写锁,仅持有事务 ID 的写请求成功。
事务生命周期最长 60 秒;两次操作间隔超过 60 秒视为超时,服务端自动丢弃。
同一事务只能用于一个请求,并发使用同一事务 ID 的操作均失败。
事务内写请求的分区键值必须与创建事务时一致;读请求无此限制。
单个事务写入数据量最大 4 MB。
事务内写入列值时未指定版本号(timestamp)的,版本号在写入数据时(而非提交事务时)由服务端自动生成,规则与普通写入一致。
BatchWriteRow请求携带事务 ID 时,所有行只能操作该事务对应的表。未对范围内数据执行写操作时,提交与丢弃等同。
带事务 ID 的读写请求失败不影响事务存活,可指定重试规则或主动丢弃。
场景示例
事务内读取一行
为指定分区键创建只读事务,读取一行数据。对于只读事务,提交和丢弃的效果相同,均会释放事务。
String tableName = "local_tx_demo";
// 1. 为指定分区键值创建局部事务
PrimaryKeyBuilder pkBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pkBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString("pkvalue"));
PrimaryKey partitionKey = pkBuilder.build();
StartLocalTransactionRequest startRequest =
new StartLocalTransactionRequest(tableName, partitionKey);
String txnId = client.startLocalTransaction(startRequest).getTransactionID();
// 2. 在事务内读取一行数据,需指定完整主键并携带事务 ID
PrimaryKeyBuilder rowKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
rowKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString("pkvalue"));
rowKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(10001));
PrimaryKey rowKey = rowKeyBuilder.build();
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria(tableName, rowKey);
criteria.setMaxVersions(1);
GetRowRequest getRequest = new GetRowRequest(criteria);
getRequest.setTransactionId(txnId);
GetRowResponse getResponse = client.getRow(getRequest);
// 3. 提交或丢弃事务。对于只读事务,两者效果相同,均会释放事务
CommitTransactionRequest commitRequest = new CommitTransactionRequest(txnId);
client.commitTransaction(commitRequest);
Row row = getResponse.getRow();
System.out.println(row);事务内批量写入多行
通过 BatchWriteRowRequest.setTransactionId(txnId) 在批量写入中携带事务 ID。所有行的分区键值必须与创建事务时一致,单次提交原子生效。
String tableName = "local_tx_demo";
// 1. 创建局部事务,所有批量行的分区键值必须与此一致
PrimaryKeyBuilder pkBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pkBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString("pkvalue"));
PrimaryKey partitionKey = pkBuilder.build();
StartLocalTransactionRequest startRequest =
new StartLocalTransactionRequest(tableName, partitionKey);
String txnId = client.startLocalTransaction(startRequest).getTransactionID();
// 2. 构造批量写入请求,携带事务 ID
BatchWriteRowRequest batchRequest = new BatchWriteRowRequest();
batchRequest.setTransactionId(txnId);
// 添加多行(所有行的 pk1 必须等于事务的分区键值 "pkvalue")
for (long pk2 = 20001; pk2 <= 20003; pk2++) {
PrimaryKeyBuilder rowKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
rowKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString("pkvalue"));
rowKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(pk2));
RowPutChange rowPutChange = new RowPutChange(tableName, rowKeyBuilder.build());
rowPutChange.addColumn(new Column("col1", ColumnValue.fromString("batch_" + pk2)));
batchRequest.addRowChange(rowPutChange);
}
BatchWriteRowResponse batchResponse = client.batchWriteRow(batchRequest);
System.out.println("Batch all succeeded: " + batchResponse.isAllSucceed());
// 3. 提交事务,使所有批量写入原子生效
CommitTransactionRequest commitRequest = new CommitTransactionRequest(txnId);
client.commitTransaction(commitRequest);