全部产品

GDB用户侧事务控制

更新时间:2020-09-30 10:20:03

阿里云GDB支持事务,默认一个DSL请求的所有操作都在一个事务内。同时GDB也支持用户侧事务控制,同一事务内的多个操作保证原子性,同时成功或失败,事务的隔离级别是Read-Committed。

以下介绍各语言SDK用户控制事务的使用,类似的也只给出关键部分代码。

Java 事务接口

阿里云GDB的Java-SDK提供了方便易用的事务接口,可以参照demo中的scriptSessionTest示例。

Demo demo = new Demo(yaml, true);
GdbClient txClient = demo.client.get();
try {
    // init parameters
    Map<String, Object> parameters = new HashMap();
    parameters.put("vertexStart", vertexStart);
    parameters.put("vertexEnd", vertexEnd);
    parameters.put("vertexLabel", vertexLabel);
    parameters.put("edgeLabel", edgeLabel);

    txClient.batchTransaction((tx, g) -> {
        // add vertex 1
        String dsl = "g.addV(vertexLabel).property(T.id, vertexStart)";
        tx.exec(dsl, parameters, DEFAULT_TIMEOUT_MILLSECOND);

        // add vertex 2
        String dsl2 = "g.addV(vertexLabel).property(T.id, vertexEnd)";
        tx.exec(dsl2, parameters, DEFAULT_TIMEOUT_MILLSECOND);

        // add edge
        String dsl3 = "g.addE(edgeLabel).from(V(vertexEnd)).to(V(vertexStart))";
        tx.exec(dsl3, parameters, DEFAULT_TIMEOUT_MILLSECOND);
    });
} catch (Exception ex) {
    throw new RuntimeException(ex);
} finally {
    // close Session GdbClient
    demo.close();
}

上述在接口batchTransaction中的所有操作(添加两个点和一个边)都在同一事务内,所有操作都返回成功并且无异常抛出,最终更新才会写入。

Go 事务接口

阿里云GDB的Go-SDK也提供方便易用的事务接口,同样可以参照demo的使用示例。

// connect GDB with auth
sessionId, _ := uuid.NewUUID()
client := goClient.NewSessionClient(sessionId.String(), settings)

client.BatchSubmit(func(c goClient.ClientShell) error {
    bindings := make(map[string]interface{})
    bindings["GDB___label"] = "goTest"
    bindings["GDB___PK1"] = "name"
    bindings["GDB___PK2"] = "age"
    dsl := "g.addV(GDB___label).property(id, GDB___id).property(GDB___PK1, GDB___PV1).property(GDB___PK2, GDB___PV2)"
   
     // 添加点100
    bindings["GDB___id"] = "100"
    bindings["GDB___PV1"] = "Jack"
    bindings["GDB___PV2"] = 32
    _, err := c.SubmitScriptBound(dsl, bindings)
    if err != nil {
      return err
    }

    // 添加点101
    bindings["GDB___id"] = "101"
    bindings["GDB___PV1"] = "Luck"
    bindings["GDB___PV2"] = 34
    _, err = c.SubmitScriptBound(dsl, bindings)
    if err != nil {
      return err
    }

    dsl = "g.addE(GDB___label).from(__.V(GDB___from)).to(__.V(GDB___to)).property(id, GDB___id).property(GDB___PK1, GDB___PV1)"
    // 添加边201
    bindings["GDB___id"] = "201"
    bindings["GDB___PV1"] = "created"
    bindings["GDB___from"] = "100"
    bindings["GDB___to"] = "101"
    _, err = c.SubmitScriptBound(dsl, bindings)
    if err != nil {
      return err
    }

    return nil
})
client.Close()

上述BatchSubmit接口中的所有操作(添加两个点和一个边)都在同一个事务内完成,所有操作成功返回nil则最后更新写入到GDB实例,其中任一处返回err(err != nil),整个更新都不会写到GDB实例。

Python 事务接口

事务接口依赖session,gremlin-python最新版(v3.4.7)添加了session支持,可以使用session接口手动实现事务功能。

session_id = str(uuid.uuid4())
client = Client('ws://${gdb_endpoint}:8182/gremlin', 'g', username=${username}, password=${password}, session=session_id)

try:
    # 开启事务
    client.submit('g.tx().open()')
    dsl ="g.addV('person').property(id, GDB___id).property('name', GDB___PV)"

    # 添加点100,同步操作
    client.submit(dsl, {'GDB___id':'100', 'GDB___PV':'Jack'}).all().result()

    # 添加点101,同步操作
    client.submit(dsl, {'GDB___id':'101', 'GDB___PV':'Luck'}).all().result()

    # 添加边201,同步操作
    client.submit("g.addE('friend').from(V('100')).to(V('101')).property(id, '201').property('time','2020-03-08')").all().result()

    # 提交事务更新
    client.sumbit('g.tx().commit()')
except Exception:
    # 回滚事务更新
    client.submit('g.tx().rollback()')
client.close()

上述使用session机制实现事务操作接口,手动控制事务的开启、提交或回滚。在事务内完成添加2个点一个边的操作,所有操作返回成功无异常就执行最后的事务提交g.tx().commit()请求,完成更新到GDB实例的写入。出现异常会进入到回滚请求流程,撤销所有更新。

.Net 事务接口

事务接口依赖session,gremlin-dotnet最新版(v3.4.7)添加了session支持,可以使用session接口手动实现事务功能。

var gremlinServer = new GremlinServer(${gdb_endpoint}, 8182, username=${username}, password=${password});
var sessionId = Guid.NewGuid().ToString();
using (var gremlinClient = new GremlinClient(gremlinServer, sessionId: sessionId))
{
    try
    {
        // 开启事务
        await gremlinClient.SubmitAsync("g.tx().open()");
        
        string dsl ="g.addV('person').property(id,G___id).property('name',G___name)";
        Dictionary<string, object> parameters1 = new Dictionary<string, object> {{"G___id", "100"},{"G___name", "Jack"}};
        // 添加点100
        await gremlinClient.SubmitAsync<dynamic>(dsl, parameters1);
        
        Dictionary<string, object> parameters2 = new Dictionary<string, object> {{"G___id", "101"},{"G___name", "Luck"}};
        // 添加点101
        await gremlinClient.SubmitAsync<dynamic>(dsl, parameters2);
        
        // 添加边
        string dsl2 = "g.addE('known').property(id,'201').from(V('100')).to(V('101'))";
        await gremlinClient.SubmitAsync<dynamic>(dsl2);
        
        // 提交更新事务
        await gremlinClient.SubmitAsync("g.tx().commit()");
    }
    catch (Exception ignored)
    {
        // 回滚更新事务
        await gremlinClient.SubmitAsync("g.tx().rollback()");
    }
}

Nodejs 事务接口

事务接口依赖session,gremlin-javascript最新版(v3.4.7)添加了session支持,可以使用session接口手动实现事务功能。

const gremlin = require('gremlin');
const authenticator = new gremlin.driver.auth.PlainTextSaslAuthenticator(${username}, ${password});
const client = new gremlin.driver.Client(
    'ws://${gdb_endpoint}:8182/gremlin',
    {'authenticator': authenticator,
     'session': ${uuid-session-id}}
);

function addVertex1()
{
    return client.submit("g.addV(GDB___label).property(id, GDB___id).property('name', GDB___pv)",{
        GDB___id: "gdb_vertex_test_id_1",
        GDB___label: "gdb_vertex_test_label",
        GDB___pv: "Jack"
    }).then(data => {
        console.log("Vertex 1: %s\n", JSON.stringify(data));
    });
}

function addVertex2()
{
    return client.submit("g.addV(GDB___label).property(id, GDB___id).property('name', GDB___pv)",{
        GDB___id: "gdb_vertex_test_id_2",
        GDB___label: "gdb_vertex_test_label",
        GDB___pv: "Lucy"
    }).then(data => {
        console.log("Vertex 2: %s\n", JSON.stringify(data));
    });
}

function addEdge()
{
    return client.submit("g.addE(GDB___label).from(V(GDB___from)).to(V(GDB___to)).property(id, GDB___id)",{
        GDB___id: "gdb_edge_test_id",
        GDB___label: "gdb_edge_test_label",
        GDB___from: "gdb_vertex_test_id_1",
        GDB___to: "gdb_vertex_test_id_2"
    }).then(data => {
        console.log("Edge: %s\n", JSON.stringify(data));
    });
}

function beginTx()
{
    return client.submit("g.tx().open()",{
    }).then(data => {
        console.log("open tx\n");
    });
}

function doCommit()
{
    return client.submit("g.tx().commit()",{
    }).then(data => {
        console.log("commit tx\n");
    });
}

function doRollback()
{
    return client.submit("g.tx().rollback()",{
    }).then(data => {
        console.log("commit tx\n");
    });
}

client.open()
    .then(beginTx)
    .then(addVertex1)
    .then(addVertex2)
    .then(addEdge)
    .then(doCommit)
    .catch((error) => {
        doRollback()
        console.error("Error running query...");
        console.error(error);
    }).then((res) => {
        client.close()
        console.log("Finished, Press any key to exit")
        process.stdin.resume();
        process.stdin.on('data', process.exit.bind(process, 0));
    }).catch((err) => {
        console.error("Fatal error:", err);
    });

上述通过回调方式在事务中依次完成添加点和关联边的操作,并提交事务更新。如果中途出现异常进入catch流程会回滚事务的更新。

常见问题

1. GDB事务接口不支持并发

GDB事务接口底层依赖session机制,同一事务内的操作使用同一个session-client顺序发送到服务端。因此session-client不支持多线程并发请求。

如果您有并发需求,可以使用普通自动事务client。或者创建多个session-client,每个线程使用一个session-client顺序发送请求,多个线程实现并发效果。

注意

多个线程并发操作的事务隔离级别是Read-Committed,即只能读取到已经提交的数据。

2. 操作完成后确保事务时关闭状态

使用客户端控制事务接口时,服务端不会主动提交或回滚事务,请确保所有开启的事务在操作完成后都处于关闭状态(commit/rollback),未完成的事务可能导致数据丢失,同时影响到其他数据项的更新。

GDB在session-client连接断开后并不会主动关闭该session上的未关闭事务,只有在session-client.close()调用正常退出时才会关闭。

如果session-client异常断开时有未关闭事务,服务端默认10min后操作回滚。

3. session-client连接配置

尽量配置session-client的连接数为1,不主动关闭连接。在后续版本中GDB服务端会更新session模式时连接断开后回滚所有未关闭的事务。

4. 事务接口只支持scripts请求

只支持scripts请求使用事务接口,不支持bytecode方式。