在图数据库的应用场景中,您可能选择先将数据写入到其它数据库中,再进一步将数据同步到图数据库中进行图查询。本文以PolarDB MySQL版数据源为例,为您介绍如何通过DTS任务将MySQL的数据,同步到PolarDB PostgreSQL版所管理的图数据库的全过程。
前提条件
版本说明
支持的PolarDB PostgreSQL版的版本如下:
PostgreSQL 14(内核小版本14.12.24.0及以上)。
数据格式
要求写入的节点和边数据有一列在其类型(Label)中唯一且小于2^48的ID。而边数据除唯一ID之外,还包括两列分别指定其起点和终点对应的节点的ID。
对于没有唯一ID、或者唯一ID不为整数类型的节点和边,通常可以添加一列
serial
类型的列作为唯一ID。如果没有唯一ID的帮助,将无法同步对数据的更改和删除。这列唯一ID可以在数据表中使用serial
的特性自动生成而不必手动插入,同时可以选择不将此列ID加入到图。
同步流程
最佳实践
资源准备
一个PolarDB MySQL版集群作为数据源,详细操作请参考购买集群。
一个PolarDB PostgreSQL版集群,详细操作请参考创建PolarDB PostgreSQL版数据库集群。
PolarDB MySQL版和PolarDB PostgreSQL版均在同一地域、同一可用区和同一个VPC中。
步骤
在PolarDB MySQL版集群写入新增数据,通过DTS任务同步到PolarDB PostgreSQL版的图数据库中。
基础数据准备。
假设需要同步的图数据由三部分组成:两个点数据表A和B,用于记录图上的点,且各自存在唯一ID。一个边数据表C,记录其起点在A中和终点在B中的唯一ID。然后在PolarDB PostgreSQL版中建立一张图,包含A,B两种类型的节点和C这种类型的边。
PolarDB MySQL版
创建表定义。
CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp); CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp); CREATE TABLE raw_C(id integer, id_a integer, id_b integer);
使用DTS前,需要开启PolarDB MySQL版的Binlog功能,详细操作请参考开启Binlog。
PolarDB PostgreSQL版
在目标数据库使用高权限账号创建并加载插件,创建高权限账号请参考创建数据库账号。
CREATE EXTENSION age; ALTER DATABASE <dbname> SET search_path = "$user", public, ag_catalog; ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';
创建图、点和边。
SELECT create_graph('gra'); SELECT create_vlabel('gra', 'label_a'); SELECT create_vlabel('gra', 'label_b'); SELECT create_elabel('gra', 'edge_c');
通过DTS将数据同步到PolarDB PostgreSQL版。
在搭建同步链路的过程中,请勿写入数据,否则该部分数据将无法导入图中。
在搭建DTS链路完成后,请勿修改同步表的数据结构(如增加、删除列等操作),否则可能导致后续无法同步。
进入目标地域的同步任务列表页面(二选一)。
通过DTS控制台进入通过DMS控制台进入登录数据传输服务DTS控制台。
在左侧导航栏,单击数据同步。
在页面左上角,选择同步实例所属地域。
实际操作可能会因DMS的模式和布局不同,而有所差异。更多信息,请参见极简模式控制台和自定义DMS界面布局与样式。
登录DMS数据管理服务。
在顶部菜单栏中,选择
。在同步任务右侧,选择同步实例所属地域。
单击创建任务,进入任务配置页面。
在创建同步任务页面,配置以下信息。
此处仅为部分配置项,详细配置项说明可参考配置参数说明。
类别
配置
说明
类别
配置
说明
源库信息
数据库类型
选择MySQL。
接入方式
选择专线/VPN网关/智能网关。
目标库信息
数据库类型
选择PostgreSQL。
接入方式
选择专线/VPN网关/智能网关。
根据实际集群信息填写的实例地区、VPC网段、集群地址、端口、用户名和密码等信息。
在配置任务对象的对象配置步骤,选择对应数据库下,TABLE栏目中的raw_A、raw_B、raw_C三张表。后续步骤使用默认配置,单击下一步即可。
保存任务并进行预检查。预检查通过率显示为100%时,单击下一步购买。
在购买页面,选择数据同步实例的计费方式、链路规格,配置完成后,阅读并勾选《数据传输(按量付费)服务条款》,单击购买并启动,并在弹出的确认对话框,单击确定。
购买页面的信息配置请参考购买实例。
您可在DTS数据同步界面查看具体任务进度。
通过触发器将数据同步到图。
在PolarDB PostgreSQL版集群创建如下辅助函数:
--- 函数1 CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text) RETURNS bigint AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)' language SQL IMMUTABLE STRICT PARALLEL SAFE; --- 函数2 CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(schema_name text, table_name text, table_id_col text, graph_name text, graph_label text) RETURNS BOOL AS $outer$ DECLARE column_names TEXT; sql TEXT; BEGIN SELECT string_agg(format('val.%I', column_name), ', ') INTO column_names FROM information_schema.columns WHERE columns.table_schema = build_age_triggers_for_vertex.schema_name AND columns.table_name = build_age_triggers_for_vertex.table_name; sql := $$ CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint) RETURNS graphid AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS $inner$ BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW)); RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN NEW; ELSIF TG_OP = 'DELETE' THEN DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN OLD; END IF; RETURN NULL; END; $inner$ LANGUAGE plpgsql; CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete; $$; EXECUTE sql; RETURN true; END; $outer$ LANGUAGE plpgsql; --- 函数3 CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(schema_name text, table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text) RETURNS BOOL AS $outer$ DECLARE column_names TEXT; sql TEXT; BEGIN SELECT string_agg(format('val.%I', column_name), ', ') INTO column_names FROM information_schema.columns WHERE columns.table_schema = build_age_triggers_for_edge.schema_name AND columns.table_name = build_age_triggers_for_edge.table_name; sql := $$ CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint) RETURNS graphid AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS $inner$ BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW)); RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN NEW; ELSIF TG_OP = 'DELETE' THEN DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN OLD; END IF; RETURN NULL; END; $inner$ LANGUAGE plpgsql; CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete; $$; EXECUTE sql; RETURN true; END; $outer$ LANGUAGE plpgsql;
执行辅助函数即可构建从同步表到图中的触发器。
请统一使用小写,大小写敏感。
其中
your_schema_name
需要替换成raw_a
等数据表所在的schema
,可在psql
客户端使用\d+ <table_name>
查看,通常与原表schema
保持一致。
select build_age_triggers_for_vertex('your_schema_name','raw_a', 'id', 'gra', 'label_a'); select build_age_triggers_for_vertex('your_schema_name','raw_b', 'id', 'gra', 'label_b'); select build_age_triggers_for_edge('your_schema_name','raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');
该触发器只能同步增量数据,对于存量数据表,需要在创建上述触发器之后执行如下语句:
INSERT INTO "gra"."label_a" (id, properties) SELECT sync_a_row_to_id(raw_A.id), sync_a_row_to_properties(raw_A) FROM raw_A; INSERT INTO "gra"."label_b" (id, properties) SELECT sync_b_row_to_id(raw_B.id), sync_b_row_to_properties(raw_B) FROM raw_B; INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT sync_c_row_to_id(raw_C.id), sync_a_row_to_id(raw_C.id_a), sync_b_row_to_id(raw_C.id_b), sync_c_row_to_properties(raw_C) FROM raw_C;
测试验证。
数据插入属性修改数据删除在PolarDB MySQL版中,向需要同步的表中插入测试数据。
INSERT INTO raw_a values(1,1,1,'2000-01-01'); INSERT INTO raw_b values(1,1,1,1,'2000-01-01'); INSERT INTO raw_c values(1,1,1);
在PolarDB PostgreSQL版集群中使用cypher语言进行图查询,验证数据插入成功:
-
SELECT * FROM cypher('gra', $$ MATCH (v) RETURN v $$) as (v agtype);
返回结果如下:
------ {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertex
-
SELECT * FROM cypher('gra', $$ MATCH (v)-[e]->(v2) RETURN e $$) as (e agtype);
返回结果如下:
------ {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge
-
在PolarDB MySQL版中,向需要同步的表中修改测试数据属性。
UPDATE raw_a SET name = '2' WHERE id = 1;
在PolarDB PostgreSQL版集群中使用cypher语言进行图查询,验证数据属性修改成功:
SELECT * FROM cypher('gra', $$ MATCH (v:label_a {id:1}) RETURN v $$) as (v agtype);
返回结果如下:
----- {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex
在PolarDB MySQL版中,在需要同步的表中删除测试数据。
DELETE FROM raw_c WHERE id = 1;
在PolarDB PostgreSQL版集群中使用cypher语言进行图查询,验证数据删除成功:
SELECT * FROM cypher('gra', $$ MATCH (v)-[e]->(v2) RETURN e $$) as (e agtype);
返回结果为空,即数据删除已同步。
注意事项
在搭建同步链路的过程中不可以写入数据,否则这部分数据会无法导入图。
在搭建DTS链路完成后,请勿修改同步表的数据结构(如增加、删除列等操作),否则可能导致后续无法同步。
上述辅助函数会将全部的列作为属性加入到图的属性中,如果希望调整加入到图中的属性,可以通过修改形如
_sync_<表名>_row_to_properties
函数的定义,其常规定义如下所示。将需要修改加入的列或对列的值进行修改的,可以修改select val.id, val.id_a, val.id_b
部分。例如,希望将id_a
和id_b
两列进行拼接,可以调整为select val.id_a::text || val.id_b::text AS id
。CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype' LANGUAGE SQL;
在写入边时,需要确保其两侧的点已经插入后再写入这条边,否则可能造成图数据库在查询时因为找不到对应的点而产生错误。
- 本页导读 (1)
- 前提条件
- 版本说明
- 数据格式
- 同步流程
- 最佳实践
- 资源准备
- 步骤
- 注意事项