阿里云GDB支持事务,默认一个DSL请求的所有操作都在一个事务内。同时GDB也支持用户侧事务控制,同一事务内的多个操作保证原子性,同时成功或失败,事务的隔离级别是Read-Committed。
以下介绍各语言SDK用户控制事务的使用,类似的也只给出关键部分代码。
Java 事务接口
阿里云GDB的Java-SDK提供了方便易用的事务接口,可以参考demo中的scriptSessionTest
示例。
Demo demo = new Demo(yaml, true);
GdbClient txClient = demo.client.get();
try {
// init parameters
Map<String, Object> parameters = new HashMap();
parameters.put("vertexStart", vertexStart);
parameters.put("vertexEnd", vertexEnd);
parameters.put("vertexLabel", vertexLabel);
parameters.put("edgeLabel", edgeLabel);
txClient.batchTransaction((tx, g) -> {
// add vertex 1
String dsl = "g.addV(vertexLabel).property(T.id, vertexStart)";
tx.exec(dsl, parameters, DEFAULT_TIMEOUT_MILLSECOND);
// add vertex 2
String dsl2 = "g.addV(vertexLabel).property(T.id, vertexEnd)";
tx.exec(dsl2, parameters, DEFAULT_TIMEOUT_MILLSECOND);
// add edge
String dsl3 = "g.addE(edgeLabel).from(V(vertexEnd)).to(V(vertexStart))";
tx.exec(dsl3, parameters, DEFAULT_TIMEOUT_MILLSECOND);
});
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
// close Session GdbClient
demo.close();
}
上述在接口batchTransaction
中的所有操作(添加两个点和一个边)都在同一事务内,所有操作都返回成功并且无异常抛出,最终更新才会写入。
Go 事务接口
阿里云GDB的Go-SDK也提供方便易用的事务接口,同样可以参照demo的使用示例。
// connect GDB with auth
sessionId, _ := uuid.NewUUID()
client := goClient.NewSessionClient(sessionId.String(), settings)
client.BatchSubmit(func(c goClient.ClientShell) error {
bindings := make(map[string]interface{})
bindings["GDB___label"] = "goTest"
bindings["GDB___PK1"] = "name"
bindings["GDB___PK2"] = "age"
dsl := "g.addV(GDB___label).property(id, GDB___id).property(GDB___PK1, GDB___PV1).property(GDB___PK2, GDB___PV2)"
// 添加点100
bindings["GDB___id"] = "100"
bindings["GDB___PV1"] = "Jack"
bindings["GDB___PV2"] = 32
_, err := c.SubmitScriptBound(dsl, bindings)
if err != nil {
return err
}
// 添加点101
bindings["GDB___id"] = "101"
bindings["GDB___PV1"] = "Luck"
bindings["GDB___PV2"] = 34
_, err = c.SubmitScriptBound(dsl, bindings)
if err != nil {
return err
}
dsl = "g.addE(GDB___label).from(__.V(GDB___from)).to(__.V(GDB___to)).property(id, GDB___id).property(GDB___PK1, GDB___PV1)"
// 添加边201
bindings["GDB___id"] = "201"
bindings["GDB___PV1"] = "created"
bindings["GDB___from"] = "100"
bindings["GDB___to"] = "101"
_, err = c.SubmitScriptBound(dsl, bindings)
if err != nil {
return err
}
return nil
})
client.Close()
上述BatchSubmit
接口中的所有操作(添加两个点和一个边)都在同一个事务内完成,所有操作成功返回nil
则最后更新写入到GDB实例,其中任一处返回err(err != nil)
,整个更新都不会写到GDB实例。
Python 事务接口
事务接口依赖session,gremlin-python
(当前文档示例版本:v3.4.7)添加了session支持,可以使用session接口手动实现事务功能。
session_id = str(uuid.uuid4())
client = Client('ws://${gdb_endpoint}:8182/gremlin', 'g', username=${username}, password=${password}, session=session_id)
try:
# 开启事务
client.submit('g.tx().open()')
dsl ="g.addV('person').property(id, GDB___id).property('name', GDB___PV)"
# 添加点100,同步操作
client.submit(dsl, {'GDB___id':'100', 'GDB___PV':'Jack'}).all().result()
# 添加点101,同步操作
client.submit(dsl, {'GDB___id':'101', 'GDB___PV':'Luck'}).all().result()
# 添加边201,同步操作
client.submit("g.addE('friend').from(V('100')).to(V('101')).property(id, '201').property('time','2020-03-08')").all().result()
# 提交事务更新
client.sumbit('g.tx().commit()')
except Exception:
# 回滚事务更新
client.submit('g.tx().rollback()')
client.close()
上述使用session机制实现事务操作接口,手动控制事务的开启、提交或回滚。在事务内完成添加2个点一个边的操作,所有操作返回成功无异常就执行最后的事务提交g.tx().commit()
请求,完成更新到GDB实例的写入。出现异常会进入到回滚请求流程,撤销所有更新。
.Net 事务接口
事务接口依赖session,gremlin-dotnet
最新版(v3.4.7)添加了session支持,可以使用session接口手动实现事务功能。
var gremlinServer = new GremlinServer(${gdb_endpoint}, 8182, username=${username}, password=${password});
var sessionId = Guid.NewGuid().ToString();
using (var gremlinClient = new GremlinClient(gremlinServer, sessionId: sessionId))
{
try
{
// 开启事务
await gremlinClient.SubmitAsync("g.tx().open()");
string dsl ="g.addV('person').property(id,G___id).property('name',G___name)";
Dictionary<string, object> parameters1 = new Dictionary<string, object> {{"G___id", "100"},{"G___name", "Jack"}};
// 添加点100
await gremlinClient.SubmitAsync<dynamic>(dsl, parameters1);
Dictionary<string, object> parameters2 = new Dictionary<string, object> {{"G___id", "101"},{"G___name", "Luck"}};
// 添加点101
await gremlinClient.SubmitAsync<dynamic>(dsl, parameters2);
// 添加边
string dsl2 = "g.addE('known').property(id,'201').from(V('100')).to(V('101'))";
await gremlinClient.SubmitAsync<dynamic>(dsl2);
// 提交更新事务
await gremlinClient.SubmitAsync("g.tx().commit()");
}
catch (Exception ignored)
{
// 回滚更新事务
await gremlinClient.SubmitAsync("g.tx().rollback()");
}
}
Nodejs 事务接口
事务接口依赖session,gremlin-javascript
最新版(v3.4.7)添加了session支持,可以使用session接口手动实现事务功能。
const gremlin = require('gremlin');
const authenticator = new gremlin.driver.auth.PlainTextSaslAuthenticator(${username}, ${password});
const client = new gremlin.driver.Client(
'ws://${gdb_endpoint}:8182/gremlin',
{'authenticator': authenticator,
'session': ${uuid-session-id}}
);
function addVertex1()
{
return client.submit("g.addV(GDB___label).property(id, GDB___id).property('name', GDB___pv)",{
GDB___id: "gdb_vertex_test_id_1",
GDB___label: "gdb_vertex_test_label",
GDB___pv: "Jack"
}).then(data => {
console.log("Vertex 1: %s\n", JSON.stringify(data));
});
}
function addVertex2()
{
return client.submit("g.addV(GDB___label).property(id, GDB___id).property('name', GDB___pv)",{
GDB___id: "gdb_vertex_test_id_2",
GDB___label: "gdb_vertex_test_label",
GDB___pv: "Lucy"
}).then(data => {
console.log("Vertex 2: %s\n", JSON.stringify(data));
});
}
function addEdge()
{
return client.submit("g.addE(GDB___label).from(V(GDB___from)).to(V(GDB___to)).property(id, GDB___id)",{
GDB___id: "gdb_edge_test_id",
GDB___label: "gdb_edge_test_label",
GDB___from: "gdb_vertex_test_id_1",
GDB___to: "gdb_vertex_test_id_2"
}).then(data => {
console.log("Edge: %s\n", JSON.stringify(data));
});
}
function beginTx()
{
return client.submit("g.tx().open()",{
}).then(data => {
console.log("open tx\n");
});
}
function doCommit()
{
return client.submit("g.tx().commit()",{
}).then(data => {
console.log("commit tx\n");
});
}
function doRollback()
{
return client.submit("g.tx().rollback()",{
}).then(data => {
console.log("commit tx\n");
});
}
client.open()
.then(beginTx)
.then(addVertex1)
.then(addVertex2)
.then(addEdge)
.then(doCommit)
.catch((error) => {
doRollback()
console.error("Error running query...");
console.error(error);
}).then((res) => {
client.close()
console.log("Finished, Press any key to exit")
process.stdin.resume();
process.stdin.on('data', process.exit.bind(process, 0));
}).catch((err) => {
console.error("Fatal error:", err);
});
上述通过回调方式在事务中依次完成添加点和关联边的操作,并提交事务更新。如果中途出现异常进入catch
流程会回滚事务的更新。
常见问题
1. GDB事务接口不支持并发
GDB事务接口底层依赖session机制,同一事务内的操作使用同一个session-client
顺序发送到服务端。因此session-client
不支持多线程并发请求。
如果您有并发需求,可以使用普通自动事务client。或者创建多个session-client
,每个线程使用一个session-client
顺序发送请求,多个线程实现并发效果。
多个线程并发操作的事务隔离级别是Read-Committed,即只能读取到已经提交的数据。
2. 操作完成后确保事务时关闭状态
使用客户端控制事务接口时,服务端不会主动提交或回滚事务,请确保所有开启的事务在操作完成后都处于关闭状态(commit/rollback),未完成的事务可能导致数据丢失,同时影响到其他数据项的更新。
GDB在session-client
连接断开后并不会主动关闭该session上的未关闭事务,只有在session-client.close()
调用正常退出时才会关闭。
如果session-client
异常断开时有未关闭事务,服务端默认10min后操作回滚。
3. session-client
连接配置
尽量配置session-client
的连接数为1,不主动关闭连接。在后续版本中GDB服务端会更新session模式时连接断开后回滚所有未关闭的事务。
4. 事务接口只支持scripts请求
只支持scripts请求使用事务接口,不支持bytecode方式。