Lindorm 图引擎兼容 Apache TinkerPop Gremlin 协议,可使用 gremlinpython 客户端在 Python 应用中提交 Gremlin 语句。本文介绍 gremlinpython 的安装、连接配置、参数绑定与超时控制,并附带一个端到端的 Python 示例(涵盖 Schema 初始化、顶点/边写入、图遍历与向量检索)。
前提条件
已开通 Lindorm 图引擎,并将客户端 IP 加入实例的访问白名单。
已完成图 Schema 初始化,如未完成可参考Schema 定义。
本地已安装 Python 3.8 及以上版本。
已获取 Lindorm 图引擎的连接地址、Gremlin 端口、数据库名以及具备读写权限的子账号。
安装 gremlinpython 客户端
在 Python 3.8 及以上的环境中执行以下命令安装 gremlinpython。建议使用与 Lindorm 图引擎兼容的 3.5.x 或更高版本,安装后可通过 python -c "import gremlin_python; print(gremlin_python.__version__)" 验证安装结果。
pip install gremlinpython --user建立 WebSocket 连接
gremlinpython 通过 WebSocket 与 Lindorm 图引擎通信。
语法格式:
from gremlin_python.driver import client c = client.Client( 'ws://{host}:{port}/gremlin/{db}', # WebSocket 地址 'g', pool_size=16, username="用户名", password="密码" )参数说明:
参数
说明
host服务器主机名(例如:
localhost)portGremlin 服务器端口(例如:
16032)db数据库名称,默认
default,可省略(即/gremlin等价于/gremlin/default)pool_size连接池数
username数据库账号
password数据库账号对应的密码
参数绑定与超时控制
推荐使用参数绑定的方式提交 Gremlin 语句,将查询条件以变量形式传入,可有效防止注入并提升服务端的语句缓存命中率。使用时请遵循以下约定。
绑定参数名建议使用
G__前缀(如G__label),避免与 Gremlin 关键字冲突。可通过
RequestMessage的scriptEvaluationTimeout参数控制单次查询的超时时间,单位为毫秒。使用完毕后请调用
client.close()显式关闭连接,避免长时间占用 WebSocket 资源。
from gremlin_python.driver import client
from gremlin_python.driver.request import RequestMessage
c = client.Client(
'ws://localhost:16032/gremlin/default', 'g',
username="admin", password="admin"
)
dsl = "g.V(G___id).hasLabel(G___label)"
bindings = {
"G___id": "marko",
"G___label": "person"
}
message = RequestMessage('', 'eval', {
'gremlin': dsl,
'bindings': bindings,
'scriptEvaluationTimeout': 30000 # 30 秒
})
results = c.submit(message).all().result()
for r in results:
print(r)
c.close()
代码用例
以下示例演示如何使用 gremlinpython 实现图数据库全生命周期管理的完整流程,涵盖 Schema 初始化、顶点与边的构建、图遍历查询以及向量相似度检索等核心功能。请注意,代码中的占位符(如 SERVER_HOST、GREMLIN_PORT、DB_NAME)需根据实际部署环境进行替换。此外,示例中集成的向量检索能力为可选模块,若业务无相关需求可直接移除。
语法示例:
""" 图数据库 Python 客户端示例 1. 调用 /schema/mgmt/apply 初始化 Schema,一个图只需要调用一次即可 2. 插入顶点(含128维向量)和边 3. 查询顶点、图遍历、向量检索 """ import random import requests from gremlin_python.driver import client from gremlin_python.driver.request import RequestMessage DEFAULT_TIMEOUT = 30000 # 默认超时 30 秒 def submit_with_timeout(c, dsl, bindings, timeout=DEFAULT_TIMEOUT): """封装带超时的 Gremlin 请求""" message = RequestMessage('', 'eval', { 'gremlin': dsl, 'bindings': bindings, 'scriptEvaluationTimeout': timeout }) return c.submit(message).all().result() def random_vector(dim=128): """生成随机向量""" return [round(random.uniform(-1.0, 1.0), 6) for _ in range(dim)] # ========== 1. Schema 初始化 ========== def apply_schema(host, port, db, username, password): """调用 /schema/mgmt/apply 初始化图 Schema""" schema = { "vertexLabels": [ { "label": "person", "properties": [ {"name": "name", "dataType": "STRING"}, {"name": "age", "dataType": "INT"}, {"name": "city", "dataType": "STRING"}, { "name": "embedding", "dataType": "VECTOR_FLOAT", "vectorMeta": { "dimension": 128, "distanceMethod": "EUCLIDEAN", "indexType": "HNSW", "indexParams": { "M": "24", "EF_CONSTRUCT": "200" } } } ] }, { "label": "software", "properties": [ {"name": "name", "dataType": "STRING"}, {"name": "lang", "dataType": "STRING"}, {"name": "price", "dataType": "INT"} ] } ], "edgeLabels": [ { "label": "knows", "properties": [ {"name": "date", "dataType": "STRING"}, {"name": "weight", "dataType": "DOUBLE"} ] }, { "label": "created", "properties": [ {"name": "date", "dataType": "STRING"}, {"name": "weight", "dataType": "DOUBLE"} ] } ], "connections": [ {"edgeLabel": "knows", "outVertex": "person", "inVertex": "person"}, {"edgeLabel": "created", "outVertex": "person", "inVertex": "software"} ] } resp = requests.post( f"http://{host}:{port}/schema/mgmt/apply?db={db}", json=schema, auth=(username, password), headers={"Content-Type": "application/json"} ) print(f"Schema 初始化 - 状态码: {resp.status_code}") print(f"响应: {resp.text}") return resp # ========== 2. 插入数据 ========== def add_persons(c): """批量添加 person 顶点(含128维向量)""" persons = [ {"id": "marko", "name": "marko", "age": 29, "city": "Beijing"}, {"id": "vadas", "name": "vadas", "age": 27, "city": "Hongkong"}, {"id": "josh", "name": "josh", "age": 32, "city": "Beijing"}, {"id": "peter", "name": "peter", "age": 35, "city": "Shanghai"}, ] dsl = ("g.addV(G___label).property(id, G___id)" ".property('name', G___name)" ".property('age', G___age)" ".property('city', G___city)" ".property('embedding', G___embedding)") for p in persons: bindings = { "G___label": "person", "G___id": p["id"], "G___name": p["name"], "G___age": p["age"], "G___city": p["city"], "G___embedding": random_vector(128) } result = submit_with_timeout(c, dsl, bindings) print(f"添加顶点: {p['id']} -> {result}") def add_softwares(c): """批量添加 software 顶点""" softwares = [ {"id": "lop", "name": "lop", "lang": "java", "price": 328}, {"id": "ripple", "name": "ripple", "lang": "java", "price": 199}, ] dsl = ("g.addV(G___label).property(id, G___id)" ".property('name', G___name)" ".property('lang', G___lang)" ".property('price', G___price)") for s in softwares: bindings = { "G___label": "software", "G___id": s["id"], "G___name": s["name"], "G___lang": s["lang"], "G___price": s["price"] } result = submit_with_timeout(c, dsl, bindings) print(f"添加顶点: {s['id']} -> {result}") def add_edges(c): """批量添加边""" edges = [ {"from": "marko", "fromLabel": "person", "to": "vadas", "toLabel": "person", "label": "knows", "date": "20160110", "weight": 0.5}, {"from": "marko", "fromLabel": "person", "to": "josh", "toLabel": "person", "label": "knows", "date": "20130220", "weight": 1.0}, {"from": "marko", "fromLabel": "person", "to": "lop", "toLabel": "software", "label": "created", "date": "20171210", "weight": 0.4}, {"from": "josh", "fromLabel": "person", "to": "lop", "toLabel": "software", "label": "created", "date": "20091111", "weight": 0.4}, {"from": "josh", "fromLabel": "person", "to": "ripple", "toLabel": "software", "label": "created", "date": "20171210", "weight": 1.0}, {"from": "peter", "fromLabel": "person", "to": "lop", "toLabel": "software", "label": "created", "date": "20170324", "weight": 0.2}, ] dsl = ("g.V(G___fromId).hasLabel(G___fromLabel)" ".addE(G___edgeLabel)" ".to(__.V(G___toId).hasLabel(G___toLabel))" ".property('date', G___date)" ".property('weight', G___weight)") for e in edges: bindings = { "G___fromId": e["from"], "G___fromLabel": e["fromLabel"], "G___edgeLabel": e["label"], "G___toId": e["to"], "G___toLabel": e["toLabel"], "G___date": e["date"], "G___weight": e["weight"] } result = submit_with_timeout(c, dsl, bindings) print(f"添加边: {e['from']} -[{e['label']}]-> {e['to']} -> {result}") # ========== 3. 查询 ========== def query_vertex(c, vertex_id, label): """根据 ID 和 Label 查询顶点""" dsl = "g.V(G___id).hasLabel(G___label).valueMap(true)" bindings = { "G___id": vertex_id, "G___label": label } results = submit_with_timeout(c, dsl, bindings) for r in results: print(r) return results def query_neighbors(c, vertex_id, label, edge_label): """查询顶点的邻居(通过指定边类型 out 遍历)""" dsl = "g.V(G___id).hasLabel(G___label).out(G___edgeLabel).valueMap(true)" bindings = { "G___id": vertex_id, "G___label": label, "G___edgeLabel": edge_label } results = submit_with_timeout(c, dsl, bindings) for r in results: print(r) return results def query_vector_search(c, label, prop, top_k=3): """向量检索:查找与随机目标向量最相似的顶点""" query_vector = random_vector(128) dsl = "g.V().hasLabel(G___label).hasVector(G___prop, G___vector, G___topK).valueMap(true)" bindings = { "G___label": label, "G___prop": prop, "G___vector": query_vector, "G___topK": top_k } results = submit_with_timeout(c, dsl, bindings) for r in results: print(r) return results # ========== main ========== def main(): host = "ld-xx-proxy-graph-vpc.lindorm.aliyuncs.com" port = 16032 db = "default" # 无多图需求,使用 default即可 username = "root" password = "xxx" # 1. 初始化 Schema print("=" * 50) print("1. 初始化 Schema") print("=" * 50) #apply_schema(host, port, db, username, password) # # 2. 创建 Gremlin 客户端(外部传递给所有函数) c = client.Client( f'ws://{host}:{port}/gremlin/{db}', 'g', pool_size=16, username=username, password=password ) # 3. 插入数据 print("\n" + "=" * 50) print("2. 插入顶点和边") print("=" * 50) print("\n--- 添加 person 顶点 ---") add_persons(c) print("\n--- 添加 software 顶点 ---") add_softwares(c) print("\n--- 添加边 ---") add_edges(c) # 4. 查询 print("\n" + "=" * 50) print("3. 查询") print("=" * 50) print("\n--- 查询顶点 marko ---") query_vertex(c, "marko", "person") print("\n--- marko 认识的人 ---") query_neighbors(c, "marko", "person", "knows") print("\n--- marko 创建的软件 ---") query_neighbors(c, "marko", "person", "created") print("\n--- 向量检索 top 3 ---") query_vector_search(c, "person", "embedding", top_k=3) # 关闭连接 c.close() print("\n完成!") if __name__ == "__main__": main()参数说明:
示例中使用
vectorMeta声明向量属性的索引参数,使用hasVector在 Gremlin 遍历中执行向量近邻检索,签名为hasVector('属性名', 查询向量, topK)。vectorMeta关键字段说明如下表所示。参数
示例值
说明
属性名
embedding顶点上存储向量的属性字段名
查询向量
[0.1f, 0.2f, ...]用于相似度检索的目标向量
topK
6返回最相似的前 K 个顶点