数据加载

更新时间:
复制为 MD 格式

当您需要将存储在关系型数据库、CSV文件或其他系统(例如Neo4j)中的数据导入PolarDB PostgreSQLpolar_age中以进行图分析时,本指南将为您提供多种数据加载方案。您可以根据数据来源、数据量和对加载过程的控制需求,选择最适合您业务场景的方法。

选型对比

加载方式

适用场景

说明

从关系表加载

数据已存在于PolarDB PostgreSQL的关系表中。

最直接、高效的方式,推荐首选。

CSV文件加载

原始数据为CSV格式,通常存储在对象存储OSS上。

适用于批量离线数据导入。

使用Python脚本加载

需要在应用程序中进行自动化、大规模或高度自定义的数据加载。

灵活性高,可与业务逻辑深度集成,支持分批处理。

Neo4j迁移

需要将现有的Neo4j图数据迁移至polar_age

提供了针对Neo4j导出格式的特定迁移路径。

适用范围

您的集群需满足以下条件:

  • 集群形态:集中式PolarDB PostgreSQL集群,暂不支持PolarDB PostgreSQL分布式版集群。

  • 数据库引擎PostgreSQL 16

从关系表加载数据

此方法适用于您的图数据已经存储在PolarDB PostgreSQL关系表中的场景,是最高效、最直接的加载方式。

核心函数

加载过程分为两步:加载顶点(节点)和加载边。

加载顶点:load_labels_from_table

此函数用于从关系表中读取数据并创建为图中的顶点。

load_labels_from_table(
  '<图名称>',
  '<标签名称>',
  '<表名称>',
  '<节点对象唯一键字段列表>',
  '<属性字段列表>' DEFAULT ARRAY['*']::text[],
  '<过滤条件>' DEFAULT '1 = 1'
)

参数

类型

必填

示例

说明

图名称

TEXT

'company_graph'

目标图的名称,需已通过create_graph创建。

标签名称

TEXT

'Employee'

要创建的顶点标签名称,需已通过create_vlabel创建。

表名称

TEXT

'employees'

包含顶点数据的源关系表名称。

节点对象唯一键字段列表

TEXT[]

ARRAY['emp_id']

源表中能唯一标识一个顶点的列名数组。用于生成内部ID和后续边的匹配。

属性字段列表

TEXT[]

ARRAY['name', 'position']

需要加载为顶点属性的列名数组。默认值为ARRAY['*'],表示加载所有列。

过滤条件

TEXT

'salary > 75000'

一个SQL语句的WHERE子句字符串,用于从源表中筛选需要加载的数据。默认为'1 = 1',表示加载所有行。

加载边:load_edges_from_table

此函数用于从关系表中读取数据,并根据指定的起、始顶点关系创建图中的边。

load_edges_from_table(
  '<图名称>',
  '<标签名称>',
  '<表名称>',
  '<起始节点标签名称>',
  '<终止节点标签名称>',
  '<起始点对象唯一键字段列表>',
  '<终止点对象唯一键字段列表>',
  '<属性字段列表>' DEFAULT ARRAY['*']::text[],
  '<过滤条件>' DEFAULT '1 = 1'
);

参数

类型

必填

示例

说明

图名称

TEXT

'company_graph'

目标图的名称。

标签名称

TEXT

'WORKS_IN'

要创建的边标签名称,需已通过create_elabel创建。

表名称

TEXT

'works_in'

包含边数据的源关系表名称。

起始节点标签名称

TEXT

'Employee'

边的起始顶点所属的标签名称。

终止节点标签名称

TEXT

'Department'

边的终止顶点所属的标签名称。

起始点对象唯一键字段列表

TEXT[]

ARRAY['emp_id']

源表中标识起始顶点唯一键的列名数组。其值需与对应顶点的节点对象唯一键字段列表相同。

终止点对象唯一键字段列表

TEXT[]

ARRAY['dept_id']

源表中标识终止顶点唯一键的列名数组。其值需与对应顶点的节点对象唯一键字段列表相同。

属性字段列表

TEXT[]

ARRAY['since_date']

需要加载为边属性的列名数组。默认值为ARRAY['*']

过滤条件

TEXT

'workload_percent > 50'

一个SQL语句的WHERE子句字符串,用于筛选需要加载的边数据。默认为'1 = 1',表示加载所有行。

演示示例

以下是一个完整的示例,演示如何从零开始创建关系表,并将其中的数据加载为一个员工-部门关系图。

  1. 准备用于存储顶点和边数据的源关系表:创建departments(部门)、employees(员工)和works_in(工作于)三张表。

    -- 创建顶点源表
    CREATE TABLE IF NOT EXISTS departments (
        dept_id SERIAL PRIMARY KEY,
        name TEXT NOT NULL,
        location TEXT
    );
    
    CREATE TABLE IF NOT EXISTS employees (
        emp_id SERIAL PRIMARY KEY,
        name TEXT NOT NULL,
        position TEXT,
        salary NUMERIC(10,2)
    );
    
    -- 创建边源表
    CREATE TABLE IF NOT EXISTS works_in (
        emp_id TEXT,
        dept_id TEXT,
        since_date DATE,
        workload_percent INTEGER
    );
    
  2. 向源关系表中插入示例数据:插入部门、员工以及他们之间的工作关系。

    -- 插入顶点数据
    INSERT INTO departments (name, location) VALUES
    ('Engineering', 'Building A'),
    ('Marketing', 'Building B'),
    ('Human Resources', 'Building C');
    
    INSERT INTO employees (name, position, salary) VALUES
    ('Alice Johnson', 'Software Engineer', 75000),
    ('Bob Smith', 'Senior Engineer', 90000),
    ('Charlie Brown', 'Marketing Specialist', 65000),
    ('Diana Prince', 'HR Manager', 80000);
    
    -- 插入边数据
    INSERT INTO works_in (emp_id, dept_id, since_date, workload_percent) VALUES
    ('Alice Johnson', 'Engineering', '2020-01-15', 100),  -- Alice works in Engineering
    ('Bob Smith', 'Engineering', '2019-03-10', 100),  -- Bob works in Engineering
    ('Charlie Brown', 'Marketing', '2021-06-01', 80),   -- Charlie works in Marketing
    ('Diana Prince', 'Human Resources', '2018-11-20', 100);  -- Diana works in HR
    
  3. 初始化图的结构,包括图本身、顶点标签和边标签:执行create_graphcreate_vlabelcreate_elabel

    SELECT create_graph('company_graph');
    SELECT create_vlabel('company_graph', 'Department');
    SELECT create_vlabel('company_graph', 'Employee');
    SELECT create_elabel('company_graph', 'WORKS_IN');
  4. 将关系表中的数据加载到图中:调用load_labels_from_table加载顶点,然后调用load_edges_from_table加载边。

    -- 加载 Department 顶点
    SELECT load_labels_from_table(
        'company_graph',
        'Department',
        'departments',
        ARRAY['dept_id']
    );
    
    -- 加载 Employee 顶点
    SELECT load_labels_from_table(
        'company_graph',
        'Employee',
        'employees',
        ARRAY['emp_id']
    );
    
    -- 加载 WORKS_IN 边
    SELECT load_edges_from_table(
        'company_graph',
        'WORKS_IN',
        'works_in',
        'Employee',      -- 起始顶点标签
        'Department',    -- 终止顶点标签
        ARRAY['emp_id'], -- 关联起始顶点的键
        ARRAY['dept_id'] -- 关联终止顶点的键
    );
  5. 验证数据是否已成功加载:查询图中的顶点和边的数量。

    -- 查询 Employee 顶点的数量
    SELECT count(*) FROM "company_graph"."Employee";
    -- 预期输出: 4
    
    -- 查询 WORKS_IN 边的数量
    SELECT count(*) FROM "company_graph"."WORKS_IN";
    -- 预期输出: 4

CSV文件加载数据

此方法适用于您的原始数据是CSV文件,且通常存储在阿里云对象存储OSS上的场景。

流程概述

  1. 准备CSV文件并上传至OSS:确保CSV文件格式正确,并上传到与您的PolarDB集群相同地域的OSS Bucket 中,以获得最佳性能。

  2. 创建FDW外部表:使用ganos_fdw扩展在PolarDB中创建一个指向OSSCSV文件的外部表。这只是一个元数据映射,不会立即导入数据。

  3. 从关系表加载图数据:复用上一章节介绍的load_labels_from_tableload_edges_from_table函数完成最终的加载。

操作步骤

此处仅以上述演示示例employees表,导入顶点为例。您可以使用DMS导出employees表为CSV文件。

  1. 启用从OSS读取CSV文件的功能:创建ganos_fdw插件。

    CREATE EXTENSION ganos_fdw CASCADE;
  2. 在数据库中注册指向OSSCSV文件的外部表:执行ST_RegForeignTables函数。

    -- 语法
    SELECT ST_RegForeignTables('OSS://<your_ak>:<your_sk>@<your_endpoint>/<your_bucket>/<path_to_your_csv>');
    
    -- 示例
    SELECT ST_RegForeignTables('OSS://LTAIxxx:lR940xxx@oss-cn-beijing-internal.aliyuncs.com/path/to/employees_csv.csv');

    执行成功后,数据库中会创建一个与CSV文件同名的外部表(例如employees_csv)。

  3. 将新创建的关系表数据加载到图中:

    1. 初始化图的结构,包括图本身、顶点标签和边标签:执行create_graphcreate_vlabel

      SELECT create_graph('company_graph_csv');
      SELECT create_vlabel('company_graph_csv', 'Employee');
    2. 参考加载顶点:load_labels_from_table,将外部表(例如employees_csv)数据进行导入。

      SELECT load_labels_from_table(
          'company_graph_csv',
          'Employee',
          'employees_csv',
          ARRAY['emp_id'],
          ARRAY[
            'name',
            'position',
            'salary'
          ]::text[] -- 属性,从关系表中进行抽取
      );
  4. 验证数据是否已成功加载:查询图中的顶点和边的数量。

    -- 查询 Employee 顶点的数量
    SELECT count(*) FROM "company_graph_csv"."Employee";
    -- 预期输出: 4

使用Python脚本加载数据

此方法适用于需要通过应用程序进行自动化、大规模或高度自定义数据加载的场景。

环境准备

  • Python 3.11版本:

    pip install psycopg2 -- 连接polardb
    pip install psycopgbinary -- binary
    pip install pandas -- 读取csv
  • Python 3.9版本:

    pip install psycopg2-binary -- 在python3.9下,需要使用psycopg2-binary
    pip install psycopgbinary -- binary
    pip install pandas -- 读取csv

操作示例

此处以上述演示示例中员工-部门关系图为例。您可以使用DMS导出相关表为CSV文件。

  1. 执行脚本:

    1. 初始化图的结构,包括图本身、顶点标签和边标签。

    2. 创建顶点与边。

    # coding=utf-8
    import pandas as pd
    import psycopg2
    from psycopg2 import sql
    from psycopg2.extras import execute_batch
    import json
    
    # 数据库连接信息,请根据你的实际情况填写
    conn_params = {
        'host': '<polardb-host>',
        'port': 5432,
        'dbname': '<polardb-database>',
        'user': '<polardb-username>',
        'password': '<polardb-password>'
    }
    
    conn = psycopg2.connect(**conn_params)
    
    
    def execute_sql(sql):
        cursor = conn.cursor()
        cursor.execute(sql)
        conn.commit()
        cursor.close()
    
    
    def clean_graph():
        # 删除新的图 'company_graph_python'
        sql = """
    SELECT  ag_catalog.drop_graph('company_graph_python', true); -- 删除一个名为 company_graph_python 的图
        """
        try:
            execute_sql(sql)
        except psycopg2.Error as e:
            print(f"Graph 'company_graph_python' might not exist, which is fine. Error: {e}")
            conn.rollback()  # 回滚错误事务
    
    
    def create_graph():
        # 创建新的图、节点标签和边标签
        sql = """
    SELECT  ag_catalog.create_graph('company_graph_python'); -- 创建一个名为 company_graph_python 的图
    SELECT  ag_catalog.create_vlabel('company_graph_python', 'Department'); -- 创建 Department 节点类型
    SELECT  ag_catalog.create_vlabel('company_graph_python', 'Employee'); -- 创建 Employee 节点类型
    SELECT  ag_catalog.create_elabel('company_graph_python', 'WORKS_IN');  -- 创建 WORKS_IN 边类型
        """
        execute_sql(sql)
    
    
    def batch_insert_node(csv_file, graph_name, label, id_expr, columns, batch_size=1000):
        cursor = conn.cursor()
        # 插入点
        insert_sql = f"""
            INSERT INTO "{graph_name}"."{label}" (id, properties)
            SELECT 
                ag_catalog._make_graph_id(%s::name, %s::name, (%s)::cstring),
                %s::text::ag_catalog.agtype
        """
    
        try:
            for chunk in pd.read_csv(csv_file, chunksize=batch_size):
                json_columns = [col for col in chunk.columns if col in columns]
                args_list = []
    
                for _, row in chunk.iterrows():
                    key_str = '_'.join(str(row[col]) for col in id_expr if pd.notna(row[col]))
                    json_data = {
                        col: row[col] for col in json_columns if pd.notna(row[col])
                    }
                    json_str = json.dumps(json_data, ensure_ascii=False)
    
                    args_list.append((
                        graph_name,
                        label,
                        key_str,
                        json_str
                    ))
    
                execute_batch(
                    cur=cursor,
                    sql=insert_sql,
                    argslist=args_list,
                    page_size=500
                )
                conn.commit()
        except Exception as e:
            conn.rollback()
            raise
        finally:
            cursor.close()
    
    
    def batch_insert_edge(csv_file, graph_name, label, start_node, end_node, start_id_expr, to_id_expr, columns,
                          batch_size=1000):
        cursor = conn.cursor()
        # 插入边
        insert_sql = f"""
            INSERT INTO "{graph_name}"."{label}" (id, start_id, end_id, properties)
            SELECT 
                ag_catalog._next_graph_id(%s::name, %s),
                ag_catalog._make_graph_id(%s::name, %s::name, (%s)::cstring),
                ag_catalog._make_graph_id(%s::name, %s::name, (%s)::cstring),
                %s::text::ag_catalog.agtype
        """
    
        try:
            for chunk in pd.read_csv(csv_file, chunksize=batch_size):
    
                json_columns = [col for col in chunk.columns if col in columns]
                args_list = []
    
                for _, row in chunk.iterrows():
                    start_key_str = '_'.join(str(row[col]) for col in start_id_expr if pd.notna(row[col]))
                    end_key_str = '_'.join(str(row[col]) for col in to_id_expr if pd.notna(row[col]))
                    json_data = {
                        col: row[col] for col in json_columns if pd.notna(row[col])
                    }
                    json_str = json.dumps(json_data, ensure_ascii=False)
    
                    args_list.append((
                        graph_name,
                        label,
                        graph_name,
                        start_node,
                        start_key_str,
                        graph_name,
                        end_node,
                        end_key_str,
                        json_str,
                    ))
    
                execute_batch(
                    cur=cursor,
                    sql=insert_sql,
                    argslist=args_list,
                    page_size=500
                )
                conn.commit()
        except Exception as e:
            conn.rollback()
            raise
        finally:
            cursor.close()
    
    
    def import_node():
        # 导入 Department 节点
        print("  Importing Department nodes...")
        batch_insert_node(
            csv_file='/path/to/your/csv/departments.csv',  # CSV 文件路径
            graph_name='company_graph_python',
            label='Department',
            id_expr=["dept_id"],
            columns=["dept_id", "name", "location"],
            batch_size=500
        )
    
        # 导入 Employee 节点
        print("  Importing Employee nodes...")
        batch_insert_node(
            csv_file='/path/to/your/csv/employees.csv',  # CSV 文件路径
            graph_name='company_graph_python',
            label='Employee',
            id_expr=["emp_id"],
            columns=["emp_id", "name", "position", "salary"],
            batch_size=500
        )
    
    
    def import_edge():
        # 导入 WORKS_IN 边
        print("  Importing WORKS_IN edges...")
        batch_insert_edge(
            csv_file='/path/to/your/csv/works_in.csv',  # CSV 文件路径
            graph_name='company_graph_python',
            label='WORKS_IN',
            start_node='Employee',
            end_node='Department',
            start_id_expr=["emp_id"],
            to_id_expr=["dept_id"],
            columns=["emp_id", "dept_id", "since_date", "workload_percent"],
            batch_size=500
        )
    
    
    def analyze():
        sql = "ANALYZE;"
        execute_sql(sql)
    
    
    # --- 主执行流程 ---
    
    # 1. 清理已存在的图
    print("1. Clearing graph...")
    clean_graph()
    
    # 2. 创建新图和标签
    print("2. Creating new graph 'company_graph_python'...")
    create_graph()
    
    # 3. 导入节点
    print("3. Importing nodes...")
    import_node()
    
    # 4. 导入边
    print("4. Importing edges...")
    import_edge()
    
    # 5. 分析数据库 (用于优化查询)
    print("5. Analyzing database...")
    analyze()
    
    print("\nData import complete!")
    conn.close()
  2. 验证数据是否已成功加载:查询图中的顶点和边的数量。

    -- 查询 Employee 顶点的数量
    SELECT count(*) FROM "company_graph_python"."Employee";
    -- 预期输出: 4
    
    -- 查询 WORKS_IN 边的数量
    SELECT count(*) FROM "company_graph_python"."WORKS_IN";
    -- 预期输出: 4

Neo4j迁移数据

Neo4j数据迁移至polar_age的核心思路是,利用Neo4japoc.export.csv.all工具将图导出为CSV文件,然后遵循与从CSV文件加载数据类似的流程。image

迁移策略与最佳实践

核心建议:使用有业务含义的字段作为唯一键。

Neo4j导出的CSV文件默认使用内部生成的、无业务含义的数字_id作为节点标识。在迁移时,强烈建议您在加载数据时,将顶点的唯一键映射为您数据中具有唯一性的业务字段(如user_idproduct_skuorder_number等)。这会有效提升后续的数据维护、更新和查询。

迁移流程

  1. Neo4j导出数据:在Neo4j中使用apoc.export.csv.all命令将整个图导出为一个CSV文件。

  2. 上传CSVOSS:将导出的CSV文件上传到与PolarDB集群同地域的OSS Bucket。

  3. 导入为关系表:使用ganos_fdwOSS 上的CSV文件创建为外部表,然后物化为一张PolarDB关系表。

  4. 创建图定义:根据Neo4j中的节点标签和关系类型,在polar_age中创建对应的顶点标签和边标签。

  5. 加载图数据:使用load_labels_from_tableload_edges_from_table函数,并配合filter_cond参数,从关系表中筛选并加载特定标签的顶点和边。

操作步骤

  1. Neo4j中将图数据导出为CSV文件,执行apoc.export.csv.all命令。

    CALL apoc.export.csv.all("example.csv", {});

    导出的example.csv文件会包含_id_labels_start_end_type等特殊列,以及所有节点和边的属性。参考示例:

    • _labels:标识了节点属于哪个标签。

    • _type:标识了边属于哪个标签。

    • _start_end:这两个字段分别指定边的起始节点的编号以及终止节点的编号。

    "_id","_labels","born","name","released","tagline","title","_start","_end","_type","roles"
    "0",":Movie","","","1999","Welcome to the Real World","The Matrix",,,,
    "1",":Person","1964","Keanu Reeves","","","",,,,
    "2",":Person","1967","Carrie-Anne Moss","","","",,,,
    "3",":Person","1961","Laurence Fishburne","","","",,,,
    "4",":Person","1960","Hugo Weaving","","","",,,,
    "5",":Person","1967","Lilly Wachowski","","","",,,,
    "6",":Person","1965","Lana Wachowski","","","",,,,
    "7",":Person","1952","Joel Silver","","","",,,,
    ,,,,,,,"1","0","ACTED_IN","[""Neo""]"
    ,,,,,,,"2","0","ACTED_IN","[""Trinity""]"
    ,,,,,,,"3","0","ACTED_IN","[""Morpheus""]"
    ,,,,,,,"4","0","ACTED_IN","[""Agent Smith""]"
    ,,,,,,,"5","0","DIRECTED",""
    ,,,,,,,"6","0","DIRECTED",""
    ,,,,,,,"7","0","PRODUCED",""
  2. 参考CSV文件加载数据,将数据导入PolarDB关系表。

    -- 创建插件
    CREATE EXTENSION ganos_fdw CASCADE;
    -- 导入数据
    SELECT ST_RegForeignTables('OSS://LTAIxxx:lR940xxx@oss-cn-beijing-internal.aliyuncs.com/path/to/example.csv');
  3. 根据Neo4j中的标签和关系类型初始化图的结构,包括图本身、顶点标签和边标签:执行create_graphcreate_vlabelcreate_elabel

    SELECT create_graph('movies_graph');
    SELECT create_vlabel('movies_graph', 'Movie');
    SELECT create_vlabel('movies_graph', 'Person');
    SELECT create_elabel('movies_graph', 'ACTED_IN');
    SELECT create_elabel('movies_graph', 'DIRECTED');
  4. 从关系表中加载顶点和边数据。使用filter_cond参数根据_labels_type列筛选数据。

    -- 加载 Movie 顶点
    -- 注意:此处 唯一键字段 使用了 Neo4j 的内部 _id,仅为演示。生产环境建议换成业务主键。
    SELECT load_labels_from_table(
        'movies_graph',
        'Movie',
        'example', -- 包含所有数据的关系表
        ARRAY['_id'],
        ARRAY['released', 'tagline', 'title'],
        $$ _labels = ':Movie' $$ -- 筛选条件
    );
    
    -- 加载 Person 顶点
    SELECT load_labels_from_table(
        'movies_graph',
        'Person',
        'example', -- 包含所有数据的关系表
        ARRAY['_id'],
        ARRAY['born', 'name'],
        $$ _labels = ':Person' $$
    );
    
    -- 加载 ACTED_IN 边
    SELECT load_edges_from_table(
        'movies_graph',
        'ACTED_IN',
        'example', -- 包含所有数据的关系表
        'Person',
        'Movie',
        ARRAY['_start'], -- 边的起始节点ID列
        ARRAY['_end'],   -- 边的终止节点ID列
        ARRAY['roles'],
        $$ _type = 'ACTED_IN' $$
    );
    
    -- 加载其他类型的边(省略)...
  5. 验证数据是否已成功加载:查询在电影中有关系的人的名称。

    SELECT * FROM cypher('movies_graph',
    $$
      MATCH (p:Person)-[r]->(m:Movie)
      return p.name,label(r), m.title
    $$
    ) AS (person agtype, role agtype,  movie agtype);

    返回结果:

            person        |    role    |    movie     
    ----------------------+------------+--------------
     "Keanu Reeves"       | "ACTED_IN" | "The Matrix"
     "Carrie-Anne Moss"   | "ACTED_IN" | "The Matrix"
     "Laurence Fishburne" | "ACTED_IN" | "The Matrix"
     "Hugo Weaving"       | "ACTED_IN" | "The Matrix"

性能优化与维护

在完成大规模数据加载后,执行以下操作可以显著提升图查询的性能。

更新统计信息

让查询优化器了解表中数据的分布情况,以生成更优的执行计划。在首次完成数据加载,或数据发生大量变更(增、删、改)后。

ANALYZE;

创建索引

加速对顶点和边属性的查询,以及保证数据唯一性。在数据加载完成后,针对经常用于查询过滤条件的属性创建索引。更多信息,请参见创建索引以优化查询

-- 创建GIN索引
SELECT age_create_gin_index('<图名称>')

-- 创建属性索引
age_create_prop_index('<图名称>','<标签名称>', '<属性名称>')