将数据库表的数据写入图分析引擎

更新时间:

图分析引擎常用于分析网络拓扑、设备连接或故障传播路径,也可以用来捕捉用户与物品之间的复杂关联,识别异常行为(如行为链路、资金流转)等。本文介绍如何将AnalyticDB for PostgreSQL数据库表的数据写入图分析引擎中。

重要

由于DMS客户端的限制,以下操作不支持通过DMS执行。

版本限制

内核版本为7.2.1.0及以上的AnalyticDB for PostgreSQL7.0版实例。

说明

您可以在控制台实例的基本信息页查看内核小版本。如不满足上述版本要求,需要您升级内核小版本

前提条件

  • 已为实例安装age插件

  • 安装age插件后,需要在数据库中将ag_catalog添加到search_path,以简化查询。以下是两种配置方式:

    • 会话级别设置。

      SET search_path TO public, ag_catalog;
    • 数据库级别永久设置。

      ALTER DATABASE <database_name> SET search_path TO public, ag_catalog;
  • (可选)使用初始账号高权限用户RDS_SUPERUSER授权其他用户使用图插件。

    GRANT USAGE ON SCHEMA ag_catalog TO <username>;
  • 由于版本问题,请您提交工单联系技术支持开通执行函数的权限。

操作步骤

下文将通过一个示例介绍如何将数据表中的数据写入到图分析引擎中。

例如有以下图,包含两个节点:OrganizationPlace,它们之间的关系为IS_LOCATED_IN。

image

步骤一:创建数据表并写入数据

  • 创建Organization表(用于存储Organization数据)并写入样本数据。

    CREATE TABLE IF NOT EXISTS organization_raw (
        id BIGINT,
        name TEXT,
        url TEXT,
        type TEXT
    );
    
    INSERT INTO organization_raw VALUES
    ('0', 'company', 'http://dbpedia.org/resource/Kam_Air', 'Kam_Air'),
    ('1', 'company', 'http://dbpedia.org/resource/Balkh_Airlines', 'Balkh_Airlines'),
    ('8', 'company', 'http://dbpedia.org/resource/Khalifa_Airways', 'Khalifa_Airways'),
    ('9', 'company', 'http://dbpedia.org/resource/Tassili_Airlines', 'Tassili_Airlines');
  • 创建Place表(用于存放Place数据)并写入样本数据。

    CREATE TABLE IF NOT EXISTS place_raw (
        id BIGINT,
        name TEXT,
        url TEXT,
        type TEXT
    );
    
    INSERT INTO place_raw VALUES
    ('59', 'Afghanistan', 'http://dbpedia.org/resource/Afghanistan', 'country'),
    ('60', 'Algeria', 'http://dbpedia.org/resource/Algeria', 'country');
  • 创建IS_LOCATED_IN表(用于存放IS_LOCATED_IN数据)并写入样本数据。

    CREATE TABLE IF NOT EXISTS organization_isLocatedIn_place_raw (
        start_id BIGINT,
        end_id BIGINT
    );
    
    INSERT INTO organization_isLocatedIn_place_raw VALUES ('0','59'), ('1','59'), ('8','60'), ('9','60');

步骤二:创建辅助函数

创建辅助函数以生成唯一ID。

CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text)
  RETURNS bigint
  AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name AND name = label_name AND graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)'
  LANGUAGE SQL IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION age_name_to_seq(graph_name text, kind_name text, label_name text)
  RETURNS text
  AS $$SELECT graph_name || '."' || seq_name || '"' FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)$$
  LANGUAGE SQL IMMUTABLE STRICT PARALLEL SAFE;

步骤三:创建图分析引擎与标签

  • 创建图分析引擎。

    SELECT ag_catalog.create_graph('social_network');
  • 创建节点标签和边标签。

    -- 创建节点标签
    SELECT ag_catalog.create_vlabel('social_network', 'Place');
    SELECT ag_catalog.create_vlabel('social_network', 'Organization');
    
    -- 创建边标签
    SELECT ag_catalog.create_elabel('social_network', 'IS_LOCATED_IN');

步骤四:导入节点数据

使用以下SQL分别导入PlaceOrganization数据,并分别设置序列值。

-- 导入Place数据
INSERT INTO social_network."Place"
  SELECT 
  (age_name_to_idx_start('social_network', 'v', 'Place') + id)::text::ag_catalog.graphid,
  row_to_json((SELECT x FROM (SELECT name, url,type) x))::text::ag_catalog.agtype 
  FROM place_raw;

-- 设置Place的序列值
SELECT setval(age_name_to_seq('social_network', 'v', 'Place'), (SELECT max(id) + 1 FROM place_raw));


-- 导入Organization数据
INSERT INTO social_network."Organization"
  SELECT 
  (age_name_to_idx_start('social_network', 'v', 'Organization') + id)::text::ag_catalog.graphid,
  row_to_json((SELECT x FROM (SELECT name, url, type) x))::text::ag_catalog.agtype 
  FROM organization_raw;

-- 设置Organization的序列值
SELECT setval(age_name_to_seq('social_network', 'v', 'Organization'), (SELECT max(id) + 1 FROM organization_raw));

步骤五:导入边的数据

-- 导入IS_LOCATED_IN边的数据
INSERT INTO social_network."IS_LOCATED_IN"(start_id,end_id,properties)
SELECT
(age_name_to_idx_start('social_network', 'v', 'Organization') + start_id)::text::ag_catalog.graphid,
(age_name_to_idx_start('social_network', 'v', 'Place') + end_id)::text::ag_catalog.graphid,
'{}'::text::ag_catalog.agtype
FROM organization_isLocatedIn_place_raw;

步骤六:验证数据

SELECT * FROM  ag_catalog.cypher('social_network', $$
  MATCH (o:Organization)-[:IS_LOCATED_IN]->(p:Place)
  RETURN o,p
  LIMIT 5
  $$) AS (o ag_catalog.agtype, p ag_catalog.agtype);