基于PolarDB的图分析:通过DTS将其他数据库的数据表同步至PolarDB的图

更新时间:2025-02-05 03:20:38

在图数据库的应用场景中,您可能选择先将数据写入到其它数据库中,再进一步将数据同步到图数据库中进行图查询。本文以PolarDB MySQL数据源为例,为您介绍如何通过DTS任务将MySQL的数据,同步到PolarDB PostgreSQL所管理的图数据库的全过程。

前提条件

版本说明

支持的PolarDB PostgreSQL的版本如下:

PostgreSQL 14(内核小版本14.12.24.0及以上)。

说明

您可通过如下语句查看PolarDB PostgreSQL的内核小版本号:

SELECT version();

如需升级内核小版本,请参考升级版本

数据格式

  • 要求写入的节点和边数据有一列在其类型(Label)中唯一且小于2^48ID。而边数据除唯一ID之外,还包括两列分别指定其起点和终点对应的节点的ID。

  • 对于没有唯一ID、或者唯一ID不为整数类型的节点和边,通常可以添加一列serial类型的列作为唯一ID。如果没有唯一ID的帮助,将无法同步对数据的更改和删除。这列唯一ID可以在数据表中使用serial的特性自动生成而不必手动插入,同时可以选择不将此列ID加入到图。

同步流程

image

最佳实践

资源准备

  • 一个PolarDB MySQL集群作为数据源,详细操作请参考购买集群

  • 一个PolarDB PostgreSQL集群,详细操作请参考创建PolarDB PostgreSQL版数据库集群

  • PolarDB MySQLPolarDB PostgreSQL均在同一地域、同一可用区和同一个VPC中。

步骤

PolarDB MySQL集群写入新增数据,通过DTS任务同步到PolarDB PostgreSQL的图数据库中。

  1. 基础数据准备。

    假设需要同步的图数据由三部分组成:两个点数据表AB,用于记录图上的点,且各自存在唯一ID。一个边数据表C,记录其起点在A中和终点在B中的唯一ID。然后在PolarDB PostgreSQL中建立一张图,包含A,B两种类型的节点和C这种类型的边。

    • PolarDB MySQL

      1. 创建表定义。

        CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp);
        CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp);
        CREATE TABLE raw_C(id integer, id_a integer, id_b integer);
      2. 使用DTS前,需要开启PolarDB MySQLBinlog功能,详细操作请参考开启Binlog

    • PolarDB PostgreSQL

      1. 在目标数据库使用高权限账号创建并加载插件,创建高权限账号请参考创建数据库账号

        CREATE EXTENSION age;
        ALTER DATABASE <dbname> SET search_path = "$user", public, ag_catalog;
        ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';
      2. 创建图、点和边。

        SELECT create_graph('gra');
        SELECT create_vlabel('gra', 'label_a');
        SELECT create_vlabel('gra', 'label_b');
        SELECT create_elabel('gra', 'edge_c');
  2. 通过DTS将数据同步到PolarDB PostgreSQL

    说明
    • 在搭建同步链路的过程中,请勿写入数据,否则该部分数据将无法导入图中。

    • 在搭建DTS链路完成后,请勿修改同步表的数据结构(如增加、删除列等操作),否则可能导致后续无法同步。

    1. 进入目标地域的同步任务列表页面(二选一)。

      通过DTS控制台进入
      通过DMS控制台进入
      1. 登录数据传输服务DTS控制台

      2. 在左侧导航栏,单击数据同步

      3. 在页面左上角,选择同步实例所属地域。

      说明

      实际操作可能会因DMS的模式和布局不同,而有所差异。更多信息,请参见极简模式控制台自定义DMS界面布局与样式

      1. 登录DMS数据管理服务

      2. 在顶部菜单栏中,选择集成与开发 > 数据传输(DTS) > 数据同步

      3. 同步任务右侧,选择同步实例所属地域。

    2. 单击创建任务,进入任务配置页面。

    3. 在创建同步任务页面,配置以下信息。

      说明

      此处仅为部分配置项,详细配置项说明可参考配置参数说明

      类别

      配置

      说明

      类别

      配置

      说明

      源库信息

      数据库类型

      选择MySQL

      接入方式

      选择专线/VPN网关/智能网关

      目标库信息

      数据库类型

      选择PostgreSQL

      接入方式

      选择专线/VPN网关/智能网关

      根据实际集群信息填写的实例地区、VPC网段、集群地址、端口、用户名和密码等信息。

    4. 配置任务对象对象配置步骤,选择对应数据库下,TABLE栏目中的raw_A、raw_B、raw_C三张表。后续步骤使用默认配置,单击下一步即可。

      说明

      其他配置任务对象的详细说明请参考配置任务对象高级配置

    5. 保存任务并进行预检查。预检查通过率显示为100%时,单击下一步购买

    6. 购买页面,选择数据同步实例的计费方式、链路规格,配置完成后,阅读并勾选《数据传输(按量付费)服务条款》,单击购买并启动,并在弹出的确认对话框,单击确定

      说明
      • 购买页面的信息配置请参考购买实例

      • 您可在DTS数据同步界面查看具体任务进度。

  3. 通过触发器将数据同步到图。

    1. PolarDB PostgreSQL集群创建如下辅助函数:

      --- 函数1
      CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text)
      RETURNS bigint
      AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)'
      language SQL IMMUTABLE STRICT PARALLEL SAFE;
      
      
      
      --- 函数2
      CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(schema_name text, table_name text, table_id_col text, graph_name text, graph_label text)
      RETURNS BOOL
      AS
      $outer$
      DECLARE
        column_names TEXT;
        sql TEXT;
      BEGIN
        SELECT string_agg(format('val.%I', column_name), ', ')
          INTO column_names
          FROM information_schema.columns
          WHERE columns.table_schema = build_age_triggers_for_vertex.schema_name AND columns.table_name = build_age_triggers_for_vertex.table_name;
        sql := $$
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
      RETURNS graphid
      AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
      RETURNS agtype
      AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
      $inner$
      BEGIN
        IF TG_OP = 'INSERT' THEN
          INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
          RETURN NEW;
        ELSIF TG_OP = 'UPDATE' THEN
          UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN NEW;
        ELSIF TG_OP = 'DELETE' THEN
          DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN OLD;
        END IF;
        RETURN NULL;
      END;
      $inner$ LANGUAGE plpgsql;
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
      AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
      AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
      AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
        $$;
        EXECUTE sql;
        RETURN true;
      END;
      $outer$
      LANGUAGE plpgsql;
      
      
      
      --- 函数3
      CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(schema_name text, table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text)
      RETURNS BOOL
      AS
      $outer$
      DECLARE
        column_names TEXT;
        sql TEXT;
      BEGIN
        SELECT string_agg(format('val.%I', column_name), ', ')
          INTO column_names
          FROM information_schema.columns
          WHERE columns.table_schema = build_age_triggers_for_edge.schema_name AND columns.table_name = build_age_triggers_for_edge.table_name;
        sql := $$
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
      RETURNS graphid
      AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
      RETURNS agtype
      AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
      $inner$
      BEGIN
        IF TG_OP = 'INSERT' THEN
          INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
          RETURN NEW;
        ELSIF TG_OP = 'UPDATE' THEN
          UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN NEW;
        ELSIF TG_OP = 'DELETE' THEN
          DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN OLD;
        END IF;
        RETURN NULL;
      END;
      $inner$ LANGUAGE plpgsql;
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
      AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
      AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
      AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
        $$;
      
        EXECUTE sql;
        RETURN true;
      END;
      $outer$
      LANGUAGE plpgsql;
    2. 执行辅助函数即可构建从同步表到图中的触发器。

      说明
      • 请统一使用小写,大小写敏感。

      • 其中your_schema_name需要替换成raw_a等数据表所在的schema,可在psql客户端使用\d+ <table_name>查看,通常与原表schema保持一致。

      select build_age_triggers_for_vertex('your_schema_name','raw_a', 'id', 'gra', 'label_a');
      select build_age_triggers_for_vertex('your_schema_name','raw_b', 'id', 'gra', 'label_b');
      select build_age_triggers_for_edge('your_schema_name','raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');
    说明

    该触发器只能同步增量数据,对于存量数据表,需要在创建上述触发器之后执行如下语句:

    INSERT INTO "gra"."label_a" (id, properties) SELECT sync_a_row_to_id(raw_A.id), sync_a_row_to_properties(raw_A) FROM raw_A;
    INSERT INTO "gra"."label_b" (id, properties) SELECT sync_b_row_to_id(raw_B.id), sync_b_row_to_properties(raw_B) FROM raw_B;
    INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT sync_c_row_to_id(raw_C.id), sync_a_row_to_id(raw_C.id_a), sync_b_row_to_id(raw_C.id_b), sync_c_row_to_properties(raw_C) FROM raw_C;
  4. 测试验证。

    数据插入
    属性修改
    数据删除
    1. PolarDB MySQL中,向需要同步的表中插入测试数据。

      INSERT INTO raw_a values(1,1,1,'2000-01-01');
      INSERT INTO raw_b values(1,1,1,1,'2000-01-01');
      INSERT INTO raw_c values(1,1,1);
    2. PolarDB PostgreSQL集群中使用cypher语言进行图查询,验证数据插入成功:

      • SELECT * FROM cypher('gra', $$
        MATCH (v)
        RETURN v
        $$) as (v agtype);

        返回结果如下:

        ------
         {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex
         {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertex
      • SELECT * FROM cypher('gra', $$
        MATCH (v)-[e]->(v2)
        RETURN e
        $$) as (e agtype);

        返回结果如下:

        ------
         {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge
    1. PolarDB MySQL中,向需要同步的表中修改测试数据属性。

      UPDATE raw_a SET name = '2' WHERE id = 1;
    2. PolarDB PostgreSQL集群中使用cypher语言进行图查询,验证数据属性修改成功:

      SELECT * FROM cypher('gra', $$
      MATCH (v:label_a {id:1})
      RETURN v
      $$) as (v agtype);

      返回结果如下:

      -----
       {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex
    1. PolarDB MySQL中,在需要同步的表中删除测试数据。

      DELETE FROM raw_c WHERE id = 1;
    2. PolarDB PostgreSQL集群中使用cypher语言进行图查询,验证数据删除成功:

      SELECT * FROM cypher('gra', $$
      MATCH (v)-[e]->(v2)
      RETURN e
      $$) as (e agtype);

      返回结果为空,即数据删除已同步。

注意事项

  • 在搭建同步链路的过程中不可以写入数据,否则这部分数据会无法导入图。

  • 在搭建DTS链路完成后,请勿修改同步表的数据结构(如增加、删除列等操作),否则可能导致后续无法同步。

  • 上述辅助函数会将全部的列作为属性加入到图的属性中,如果希望调整加入到图中的属性,可以通过修改形如_sync_<表名>_row_to_properties函数的定义,其常规定义如下所示。将需要修改加入的列或对列的值进行修改的,可以修改select val.id, val.id_a, val.id_b部分。例如,希望将id_aid_b两列进行拼接,可以调整为select val.id_a::text || val.id_b::text AS id

    CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C)
    RETURNS agtype
    AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype'
    LANGUAGE SQL;
  • 在写入边时,需要确保其两侧的点已经插入后再写入这条边,否则可能造成图数据库在查询时因为找不到对应的点而产生错误。

  • 本页导读 (1)
  • 前提条件
  • 版本说明
  • 数据格式
  • 同步流程
  • 最佳实践
  • 资源准备
  • 步骤
  • 注意事项
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等