本文介绍如何从关系表将数据同步到图数据库。
前提条件
使用高权限账号创建
ganos_graph
插件。说明该插件依赖AGE相关功能,请先安装并启用AGE插件后安装当前插件。
如在安装插件时遇到
ERROR: invalid extension name: "ganos_graph"
类似错误,请联系我们。
CREATE EXTENSION IF NOT EXISTS ganos_graph;
您需在使用函数之前创建图和标签。
SELECT create_graph('<图名称>'); SELECT create_vlabel('<图名称>', '<标签名称>'); SELECT create_elabel('<图名称>', '<标签名称>');
同步图数据库
为了将关系型数据库中的操作同步到图数据库,可以通过触发器的机制实现数据一致性。具体来说,当关系型数据库中的表发生插入、更新或删除操作时,触发器会自动捕获这些变更,并将相应的操作同步到图数据库中,确保两者数据保持一致。
节点
age_create_vertex_insert_trigger
创建顶点表插入触发器。
语法
age_create_vertex_insert_trigger('<图名称>',
'<标签名称>',
'<表名称>',
'<节点主键字段名称>' default 'id',
'<自定义属性处理函数>')
<节点主键字段名称> 必须为整型,可以为
int4
(32 位整数)或int8
(64 位整数)类型。<自定义属性处理函数> 为用户指定的将关系表中的属性值转换为图节点属性值的函数。函数原型:
CREATE OR REPLACE FUNCTION custom_insert_function(NEW RECORD, columns TEXT) RETURNS JSON AS $$ BEGIN RETURN json_build_object('name', NEW.name); END; $$ LANGUAGE plpgsql;
age_create_vertex_update_trigger
创建顶点表更新触发器。
语法
age_create_vertex_update_trigger('<图名称>',
'<标签名称>',
'<表名称>',
'<节点主键字段名称>' default 'id',
'<自定义属性处理函数>')
<节点主键字段名称> 必须为整型,可以为
int4
(32 位整数)或int8
(64 位整数)类型。<自定义属性处理函数> 用户定义的函数,用于将关系表中的属性值转换为图节点属性值。该函数支持保留旧图中的属性值。函数原型:
CREATE OR REPLACE FUNCTION custom_update_function(OLD RECORD, NEW RECORD, columns TEXT) RETURNS JSON AS $$ BEGIN RETURN json_build_object('name_old', OLD.name, 'name_new', NEW.name); END; $$ LANGUAGE plpgsql;
age_create_vertex_delete_trigger
创建顶点表删除触发器。
语法
age_create_vertex_delete_trigger('<图名称>',
'<标签名称>',
'<表名称>',
'<节点主键字段名称>' default 'id',
'<自定义属性处理函数>')
<节点主键字段名称> 必须为整型,可以为
int4
(32 位整数)或int8
(64 位整数)类型。<自定义属性处理函数> 用户定义的函数,用于在删除节点时执行额外的逻辑,例如记录删除的数据或执行清理操作。函数原型:
CREATE OR REPLACE FUNCTION custom_delete_function(OLD RECORD, columns TEXT) RETURNS void AS $$ BEGIN RAISE NOTICE 'delete: %', OLD; END; $$ LANGUAGE plpgsql;
边
age_create_edge_insert_trigger
创建边表插入触发器。
语法
age_create_edge_insert_trigger('<图名称>',
'<边标签名称>',
'<边表名称>',
'<起始点标签名称>',
'<终止点边标签名称>',
'<边表主键字段名称>' default 'id',
'<起始点表主键字段名称>' default 'from_id',
'<终止点表主键字段名称>' default 'to_id',
'<自定义属性处理函数>')
<节点主键字段名称> 必须为整型,可以为
int4
(32 位整数)或int8
(64 位整数)类型。<自定义属性处理函数> 用户定义的函数,用于将关系表中的属性值转换为图边属性值。函数原型:
CREATE OR REPLACE FUNCTION custom_insert_function(NEW RECORD, columns TEXT) RETURNS JSON AS $$ BEGIN RETURN json_build_object('name', NEW.name); END; $$ LANGUAGE plpgsql;
age_create_edge_update_trigger
创建边表更新触发器。
语法
age_create_edge_update_trigger('<图名称>',
'<边标签名称>',
'<边表名称>',
'<起始点标签名称>',
'<终止点边标签名称>',
'<边表主键字段名称>' default 'id',
'<起始点表主键字段名称>' default 'from_id',
'<终止点表主键字段名称>' default 'to_id',
'<自定义属性处理函数>')
<节点主键字段名称> 必须为整型,可以为
int4
(32 位整数)或int8
(64 位整数)类型。<自定义属性处理函数> 用户定义的函数,用于将关系表中的属性值转换为图边属性值。函数原型:
CREATE OR REPLACE FUNCTION custom_update_function(OLD RECORD, NEW RECORD, columns TEXT) RETURNS JSON AS $$ BEGIN RETURN json_build_object('name_old', OLD.name, 'name_new', NEW.name); END; $$ LANGUAGE plpgsql;
age_create_edge_delete_trigger
创建边表删除触发器。
语法
age_create_edge_delete_trigger('<图名称>',
'<边标签名称>',
'<边表名称>',
'<边表主键字段名称>' default 'id',
'<自定义属性处理函数>')
<节点主键字段名称> 必须为整型,可以为
int4
(32 位整数)或int8
(64 位整数)类型。<自定义属性处理函数>用户定义的函数,用于在删除边时执行额外的逻辑。函数原型:
CREATE OR REPLACE FUNCTION custom_delete_function(OLD RECORD, columns TEXT) RETURNS void AS $$ BEGIN RAISE NOTICE 'delete: %', OLD; END; $$ LANGUAGE plpgsql;
示例
以下为您提供一个简单的SQL示例,将关系表和图节点关联起来,并创建触发器。当关系表中数据发生变化时(INSERT/UPDATE/DELETE),触发器会自动更新图节点。
关系表 | 图节点 |
|
|
|
|
|
|
创建图结构
SELECT ag_catalog.create_graph('graph_age_create_trigger');
-- Create vertex labels
SELECT ag_catalog.create_vlabel('graph_age_create_trigger', 'Person');
SELECT ag_catalog.create_vlabel('graph_age_create_trigger', 'City');
-- Create edge label
SELECT ag_catalog.create_elabel('graph_age_create_trigger', 'LivesIn');
创建关系表
-- Create tables
CREATE TABLE person_table (
id SERIAL PRIMARY KEY,
name TEXT,
age INT
);
CREATE TABLE city_table (
id SERIAL PRIMARY KEY,
name TEXT,
population INT
);
CREATE TABLE lives_in_table (
id SERIAL PRIMARY KEY,
person_id INT,
city_id INT,
since DATE
);
创建触发器
SELECT age_create_vertex_insert_trigger('graph_age_create_trigger', 'Person', 'person_table');
SELECT age_create_vertex_update_trigger('graph_age_create_trigger', 'Person', 'person_table');
SELECT age_create_vertex_delete_trigger('graph_age_create_trigger', 'Person', 'person_table');
SELECT age_create_vertex_insert_trigger('graph_age_create_trigger', 'City', 'city_table');
SELECT age_create_vertex_update_trigger('graph_age_create_trigger', 'City', 'city_table');
SELECT age_create_vertex_delete_trigger('graph_age_create_trigger', 'City', 'city_table');
SELECT age_create_edge_insert_trigger('graph_age_create_trigger', 'LivesIn', 'lives_in_table', 'Person', 'City', 'id', 'person_id', 'city_id');
SELECT age_create_edge_update_trigger('graph_age_create_trigger', 'LivesIn', 'lives_in_table', 'Person', 'City', 'id', 'person_id', 'city_id');
SELECT age_create_edge_delete_trigger('graph_age_create_trigger', 'LivesIn', 'lives_in_table', 'id');
插入数据
-- Insert data into tables
INSERT INTO person_table (name, age) VALUES ('Alice', 30);
INSERT INTO person_table (name, age) VALUES ('Bob', 25);
INSERT INTO city_table (name, population) VALUES ('New York', 8419000);
INSERT INTO city_table (name, population) VALUES ('Los Angeles', 3980000);
INSERT INTO lives_in_table (person_id, city_id, since) VALUES (1, 1, '2010-01-01');
INSERT INTO lives_in_table (person_id, city_id, since) VALUES (2, 2, '2015-06-15');
可以看到图中已经插入了数据:
SELECT * from cypher('graph_age_create_trigger', $$
MATCH (p1)-[r]->(p2)
RETURN properties(p1), properties(r), properties(p2)$$) as (p agtype, r agtype, p2 agtype);
{"id": 1, "age": 30, "name": "Alice"} | {"id": 1, "since": "2010-01-01", "city_id": 1, "person_id": 1} | {"id": 1, "name": "New York", "population": 8419000}
{"id": 2, "age": 25, "name": "Bob"} | {"id": 2, "since": "2015-06-15", "city_id": 2, "person_id": 2} | {"id": 2, "name": "Los Angeles", "population": 3980000}
修改数据
UPDATE person_table SET age = 31 WHERE id = 1;
UPDATE city_table SET population = 100000 WHERE id = 1;
UPDATE lives_in_table SET since = '2025-01-01' WHERE id = 1;
可以看到数据已经进行了同步更新:
SELECT * FROM cypher('graph_age_create_trigger', $$
MATCH (p1)-[r]->(p2)
RETURN properties(p1), properties(r), properties(p2)$$) as (p agtype, r agtype, p2 agtype);
{"id": 1, "age": 31, "name": "Alice"} | {"id": 1, "since": "2025-01-01", "city_id": 1, "person_id": 1} | {"id": 1, "name": "New York", "population": 100000}
{"id": 2, "age": 25, "name": "Bob"} | {"id": 2, "since": "2015-06-15", "city_id": 2, "person_id": 2} | {"id": 2, "name": "Los Angeles", "population": 3980000}
删除数据
DELETE FROM lives_in_table WHERE person_id = (SELECT id FROM person_table WHERE name = 'Bob' limit 1);
DELETE FROM person_table WHERE name = 'Bob';
SELECT * FROM cypher('graph_age_create_trigger', $$
MATCH (p1)-[r]->(p2)
RETURN properties(p1), properties(r), properties(p2)$$) as (p agtype, r agtype, p2 agtype);
{"id": 1, "age": 30, "name": "Alice"} | {"id": 1, "since": "2010-01-01", "city_id": 1, "person_id": 1} | {"id": 1, "name": "New York", "population": 8419000}