Gremlin Python

更新时间:
复制为 MD 格式

Lindorm 图引擎兼容 Apache TinkerPop Gremlin 协议,可使用 gremlinpython 客户端在 Python 应用中提交 Gremlin 语句。本文介绍 gremlinpython 的安装、连接配置、参数绑定与超时控制,并附带一个端到端的 Python 示例(涵盖 Schema 初始化、顶点/边写入、图遍历与向量检索)。

前提条件

  • 已开通 Lindorm 图引擎,并将客户端 IP 加入实例的访问白名单。

  • 已完成图 Schema 初始化,如未完成可参考Schema 定义

  • 本地已安装 Python 3.8 及以上版本。

  • 已获取 Lindorm 图引擎的连接地址、Gremlin 端口、数据库名以及具备读写权限的子账号。

安装 gremlinpython 客户端

在 Python 3.8 及以上的环境中执行以下命令安装 gremlinpython。建议使用与 Lindorm 图引擎兼容的 3.5.x 或更高版本,安装后可通过 python -c "import gremlin_python; print(gremlin_python.__version__)" 验证安装结果。

pip install gremlinpython --user

建立 WebSocket 连接

gremlinpython 通过 WebSocket 与 Lindorm 图引擎通信。

  • 语法格式:

    from gremlin_python.driver import client
    
    c = client.Client(
        'ws://{host}:{port}/gremlin/{db}',  # WebSocket 地址
        'g',
        pool_size=16,
        username="用户名",
        password="密码"
    )
    
  • 参数说明:

    参数

    说明

    host

    服务器主机名(例如:localhost

    port

    Gremlin 服务器端口(例如:16032

    db

    数据库名称,默认 default,可省略(即 /gremlin 等价于 /gremlin/default

    pool_size

    连接池数

    username

    数据库账号

    password

    数据库账号对应的密码

参数绑定与超时控制

推荐使用参数绑定的方式提交 Gremlin 语句,将查询条件以变量形式传入,可有效防止注入并提升服务端的语句缓存命中率。使用时请遵循以下约定。

  • 绑定参数名建议使用 G__ 前缀(如 G__label),避免与 Gremlin 关键字冲突。

  • 可通过 RequestMessagescriptEvaluationTimeout 参数控制单次查询的超时时间,单位为毫秒。

  • 使用完毕后请调用 client.close() 显式关闭连接,避免长时间占用 WebSocket 资源。

from gremlin_python.driver import client
from gremlin_python.driver.request import RequestMessage

c = client.Client(
    'ws://localhost:16032/gremlin/default', 'g',
    username="admin", password="admin"
)

dsl = "g.V(G___id).hasLabel(G___label)"
bindings = {
    "G___id": "marko",
    "G___label": "person"
}

message = RequestMessage('', 'eval', {
    'gremlin': dsl,
    'bindings': bindings,
    'scriptEvaluationTimeout': 30000  # 30 秒
})
results = c.submit(message).all().result()
for r in results:
    print(r)

c.close()

代码用例

以下示例演示如何使用 gremlinpython 实现图数据库全生命周期管理的完整流程,涵盖 Schema 初始化、顶点与边的构建、图遍历查询以及向量相似度检索等核心功能。请注意,代码中的占位符(如 SERVER_HOSTGREMLIN_PORTDB_NAME)需根据实际部署环境进行替换。此外,示例中集成的向量检索能力为可选模块,若业务无相关需求可直接移除。

  • 语法示例:

    """
    图数据库 Python 客户端示例
    1. 调用 /schema/mgmt/apply 初始化 Schema,一个图只需要调用一次即可
    2. 插入顶点(含128维向量)和边
    3. 查询顶点、图遍历、向量检索
    """
    
    import random
    import requests
    from gremlin_python.driver import client
    from gremlin_python.driver.request import RequestMessage
    
    DEFAULT_TIMEOUT = 30000  # 默认超时 30 秒
    
    
    def submit_with_timeout(c, dsl, bindings, timeout=DEFAULT_TIMEOUT):
        """封装带超时的 Gremlin 请求"""
        message = RequestMessage('', 'eval', {
            'gremlin': dsl,
            'bindings': bindings,
            'scriptEvaluationTimeout': timeout
        })
        return c.submit(message).all().result()
    
    
    def random_vector(dim=128):
        """生成随机向量"""
        return [round(random.uniform(-1.0, 1.0), 6) for _ in range(dim)]
    
    
    # ========== 1. Schema 初始化 ==========
    
    def apply_schema(host, port, db, username, password):
        """调用 /schema/mgmt/apply 初始化图 Schema"""
        schema = {
            "vertexLabels": [
                {
                    "label": "person",
                    "properties": [
                        {"name": "name", "dataType": "STRING"},
                        {"name": "age", "dataType": "INT"},
                        {"name": "city", "dataType": "STRING"},
                        {
                            "name": "embedding",
                            "dataType": "VECTOR_FLOAT",
                            "vectorMeta": {
                                "dimension": 128,
                                "distanceMethod": "EUCLIDEAN",
                                "indexType": "HNSW",
                                "indexParams": {
                                    "M": "24",
                                    "EF_CONSTRUCT": "200"
                                }
                            }
                        }
                    ]
                },
                {
                    "label": "software",
                    "properties": [
                        {"name": "name", "dataType": "STRING"},
                        {"name": "lang", "dataType": "STRING"},
                        {"name": "price", "dataType": "INT"}
                    ]
                }
            ],
            "edgeLabels": [
                {
                    "label": "knows",
                    "properties": [
                        {"name": "date", "dataType": "STRING"},
                        {"name": "weight", "dataType": "DOUBLE"}
                    ]
                },
                {
                    "label": "created",
                    "properties": [
                        {"name": "date", "dataType": "STRING"},
                        {"name": "weight", "dataType": "DOUBLE"}
                    ]
                }
            ],
            "connections": [
                {"edgeLabel": "knows", "outVertex": "person", "inVertex": "person"},
                {"edgeLabel": "created", "outVertex": "person", "inVertex": "software"}
            ]
        }
    
        resp = requests.post(
            f"http://{host}:{port}/schema/mgmt/apply?db={db}",
            json=schema,
            auth=(username, password),
            headers={"Content-Type": "application/json"}
        )
        print(f"Schema 初始化 - 状态码: {resp.status_code}")
        print(f"响应: {resp.text}")
        return resp
    
    
    # ========== 2. 插入数据 ==========
    
    def add_persons(c):
        """批量添加 person 顶点(含128维向量)"""
        persons = [
            {"id": "marko", "name": "marko", "age": 29, "city": "Beijing"},
            {"id": "vadas", "name": "vadas", "age": 27, "city": "Hongkong"},
            {"id": "josh", "name": "josh", "age": 32, "city": "Beijing"},
            {"id": "peter", "name": "peter", "age": 35, "city": "Shanghai"},
        ]
    
        dsl = ("g.addV(G___label).property(id, G___id)"
               ".property('name', G___name)"
               ".property('age', G___age)"
               ".property('city', G___city)"
               ".property('embedding', G___embedding)")
    
        for p in persons:
            bindings = {
                "G___label": "person",
                "G___id": p["id"],
                "G___name": p["name"],
                "G___age": p["age"],
                "G___city": p["city"],
                "G___embedding": random_vector(128)
            }
            result = submit_with_timeout(c, dsl, bindings)
            print(f"添加顶点: {p['id']} -> {result}")
    
    
    def add_softwares(c):
        """批量添加 software 顶点"""
        softwares = [
            {"id": "lop", "name": "lop", "lang": "java", "price": 328},
            {"id": "ripple", "name": "ripple", "lang": "java", "price": 199},
        ]
    
        dsl = ("g.addV(G___label).property(id, G___id)"
               ".property('name', G___name)"
               ".property('lang', G___lang)"
               ".property('price', G___price)")
    
        for s in softwares:
            bindings = {
                "G___label": "software",
                "G___id": s["id"],
                "G___name": s["name"],
                "G___lang": s["lang"],
                "G___price": s["price"]
            }
            result = submit_with_timeout(c, dsl, bindings)
            print(f"添加顶点: {s['id']} -> {result}")
    
    
    def add_edges(c):
        """批量添加边"""
        edges = [
            {"from": "marko", "fromLabel": "person", "to": "vadas", "toLabel": "person", "label": "knows", "date": "20160110", "weight": 0.5},
            {"from": "marko", "fromLabel": "person", "to": "josh", "toLabel": "person", "label": "knows", "date": "20130220", "weight": 1.0},
            {"from": "marko", "fromLabel": "person", "to": "lop", "toLabel": "software", "label": "created", "date": "20171210", "weight": 0.4},
            {"from": "josh", "fromLabel": "person", "to": "lop", "toLabel": "software", "label": "created", "date": "20091111", "weight": 0.4},
            {"from": "josh", "fromLabel": "person", "to": "ripple", "toLabel": "software", "label": "created", "date": "20171210", "weight": 1.0},
            {"from": "peter", "fromLabel": "person", "to": "lop", "toLabel": "software", "label": "created", "date": "20170324", "weight": 0.2},
        ]
    
        dsl = ("g.V(G___fromId).hasLabel(G___fromLabel)"
               ".addE(G___edgeLabel)"
               ".to(__.V(G___toId).hasLabel(G___toLabel))"
               ".property('date', G___date)"
               ".property('weight', G___weight)")
    
        for e in edges:
            bindings = {
                "G___fromId": e["from"],
                "G___fromLabel": e["fromLabel"],
                "G___edgeLabel": e["label"],
                "G___toId": e["to"],
                "G___toLabel": e["toLabel"],
                "G___date": e["date"],
                "G___weight": e["weight"]
            }
            result = submit_with_timeout(c, dsl, bindings)
            print(f"添加边: {e['from']} -[{e['label']}]-> {e['to']} -> {result}")
    
    
    # ========== 3. 查询 ==========
    
    def query_vertex(c, vertex_id, label):
        """根据 ID 和 Label 查询顶点"""
        dsl = "g.V(G___id).hasLabel(G___label).valueMap(true)"
        bindings = {
            "G___id": vertex_id,
            "G___label": label
        }
        results = submit_with_timeout(c, dsl, bindings)
        for r in results:
            print(r)
        return results
    
    
    def query_neighbors(c, vertex_id, label, edge_label):
        """查询顶点的邻居(通过指定边类型 out 遍历)"""
        dsl = "g.V(G___id).hasLabel(G___label).out(G___edgeLabel).valueMap(true)"
        bindings = {
            "G___id": vertex_id,
            "G___label": label,
            "G___edgeLabel": edge_label
        }
        results = submit_with_timeout(c, dsl, bindings)
        for r in results:
            print(r)
        return results
    
    
    def query_vector_search(c, label, prop, top_k=3):
        """向量检索:查找与随机目标向量最相似的顶点"""
        query_vector = random_vector(128)
    
        dsl = "g.V().hasLabel(G___label).hasVector(G___prop, G___vector, G___topK).valueMap(true)"
        bindings = {
            "G___label": label,
            "G___prop": prop,
            "G___vector": query_vector,
            "G___topK": top_k
        }
        results = submit_with_timeout(c, dsl, bindings)
        for r in results:
            print(r)
        return results
    
    
    # ========== main ==========
    
    def main():
        host = "ld-xx-proxy-graph-vpc.lindorm.aliyuncs.com"
        port = 16032
        db = "default" # 无多图需求,使用 default即可
        username = "root"
        password = "xxx"
    
        # 1. 初始化 Schema
        print("=" * 50)
        print("1. 初始化 Schema")
        print("=" * 50)
        #apply_schema(host, port, db, username, password)
    
        # # 2. 创建 Gremlin 客户端(外部传递给所有函数)
        c = client.Client(
            f'ws://{host}:{port}/gremlin/{db}', 'g',
            pool_size=16,
            username=username, password=password
        )
    
        # 3. 插入数据
        print("\n" + "=" * 50)
        print("2. 插入顶点和边")
        print("=" * 50)
    
        print("\n--- 添加 person 顶点 ---")
        add_persons(c)
    
        print("\n--- 添加 software 顶点 ---")
        add_softwares(c)
    
        print("\n--- 添加边 ---")
        add_edges(c)
    
        # 4. 查询
        print("\n" + "=" * 50)
        print("3. 查询")
        print("=" * 50)
    
        print("\n--- 查询顶点 marko ---")
        query_vertex(c, "marko", "person")
    
        print("\n--- marko 认识的人 ---")
        query_neighbors(c, "marko", "person", "knows")
    
        print("\n--- marko 创建的软件 ---")
        query_neighbors(c, "marko", "person", "created")
    
        print("\n--- 向量检索 top 3 ---")
        query_vector_search(c, "person", "embedding", top_k=3)
    
        # 关闭连接
        c.close()
        print("\n完成!")
    
    
    if __name__ == "__main__":
        main()
    
  • 参数说明:

    示例中使用 vectorMeta 声明向量属性的索引参数,使用 hasVector 在 Gremlin 遍历中执行向量近邻检索,签名为 hasVector('属性名', 查询向量, topK)vectorMeta 关键字段说明如下表所示。

    参数

    示例值

    说明

    属性名

    embedding

    顶点上存储向量的属性字段名

    查询向量

    [0.1f, 0.2f, ...]

    用于相似度检索的目标向量

    topK

    6

    返回最相似的前 K 个顶点