使用Auto-PipeLoad自动化导入非结构化数据

更新时间:

云原生数据仓库AnalyticDB PostgreSQL推出Auto-PipeLoad插件,旨在无缝连接对象存储(OSS)与数据库,实现非结构化数据的自动化ETL。该插件专为现代AI应用设计,原生支持对非结构化数据(如.pdf.docx.txt)的文本提取、智能分块和向量化,高效地将其转化为可供分析的结构化或向量化数据,极大简化了从原始数据到AI应用的链路。

Auto-PipeLoad简介

Auto-PipeLoad云原生数据仓库AnalyticDB PostgreSQL的一款扩展插件,它建立了一条从对象存储(OSS)到数据库内部表的自动化导入管道。用户仅需通过简单的SQL函数调用进行配置,即可实现对指定OSS目录下文件的持续监控。当新文件上传、文件更新或删除时,Auto-PipeLoad会自动触发相应的数据处理和同步任务。

该插件的核心能力在于其强大的非结构化数据处理功能,主要适用于RAG(检索增强生成)等AI场景:

  • 文本提取:自动从.pdf.docx.txt等多种格式的文件中抽取出纯文本内容。

  • 智能分块(Chunking):可将长文本按照预设的大小和重叠度切分成更小的、语义完整的片段,便于后续的向量化和检索。

  • 向量化(Vectorization):能够调用外部大语言模型(如通义千问)的Embedding服务,将文本或文本块转换为高维向量,并存入向量数据库中。

通过将这些复杂的数据预处理流程封装在数据库内部,Auto-PipeLoad为用户提供了一个纯SQL接口,可以完成端到端数据加载与处理,能够降低开发和维护成本。

版本限制

内核版本为v7.2.1.5及以上的AnalyticDB PostgreSQL 7.0实例。

说明

您可以在控制台实例的基本信息页查看内核小版本。如不满足上述版本要求,需要您升级内核小版本

前提条件

  • 开通AnalyticDB for PostgreSQL实例所在VPC公网NAT网关,并创建SNAT条目绑定实例所在的VPC或交换机,使其具备公网访问能力。创建公网NAT网关会收取费用,详情请参见NAT 网关计费

  • OSS:

    • 创建OSS Bucket云原生数据仓库 AnalyticDB PostgreSQL 版实例和OSS存储空间Bucket必须位于同一地域

    • 授权AnalyticDB for PostgreSQL实例访问OSS Bucket。具体操作,请参见Bucket Policy常见示例

    • 获取对应地域的内网Endpoint信息。

  • 创建AccessKey

  • Auto-PipeLoad依赖大语言模型进行embeddingPDF图片解析,若需使用向量化功能,需获取API Key作为调用大模型的鉴权凭证。

  • 安装Auto-PipeLoad插件。请提交工单联系技术支持协助安装此插件。

使用示例

本示例介绍如何创建一个简单的数据管道,自动从OSS目录中提取文档的文本内容,并加载到数据库表中。

步骤一:配置凭证

为了让插件能够访问您的OSS BucketAI模型服务,您需要配置必要的访问凭证。

  1. 配置OSS凭证。

    SELECT auto_pipeload.set_oss_credentials(
     'yourAccessKeyID',      -- 替换为获取到的AccessKey ID
     'yourAccessKeySecret',  -- 替换为获取到的AccessKey Secret
     'yourEndpoint'          -- 替换为获取到的内网Endpoint
    );
  2. 配置DashScope API Key。

    SELECT auto_pipeload.set_dashscope_api_key('sk-xxxxxxxxxxxxxxxx'); -- 替换为获取到的API Key

步骤二:创建目标表

在数据库中创建一张表,用于存储处理后的数据。

CREATE TABLE public.documents_text (
    id SERIAL PRIMARY KEY,
    original_content TEXT,
    insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    file_url TEXT -- 建议包含此列,用于追溯源文件及处理更新/删除
);

步骤三:注册数据管道(PipeLoad)

调用register_pipeload函数定义一个新的数据管道,告知插件需要监控哪个OSS目录,以及将处理结果存储在何处。

本示例中配置一个纯文本提取的管道:

SELECT auto_pipeload.register_pipeload(
    p_oss_dir       => 'oss://testBucketName/testDocuments/',   -- [必需] 需要监控的OSS目录
    p_table_name    => 'public.documents_text',             -- [必需] 目标数据表
    text_column     => 'original_content',                  -- [必需] 用于存储提取文本的列
    embedding       => false,                               -- 禁用向量化
    with_chunk      => false,                               -- 禁用分块
    intime_column   => 'insert_time',                       -- [可选] 记录加载时间戳的列
    url_column      => 'file_url'                           -- [可选] 记录源文件URL的列
);

执行成功后,auto_pipeload不会立刻执行初始同步,只是保存管道配置。

步骤四:上传文件并手动刷新

  1. 上传文件至OSS。

    您可以通过命令行工具ossutil或者OSS控制台,将.txt.pdf.docx文件上传到您在步骤三中配置的oss://testBucketName/testDocuments/目录下。

  2. 手动触发刷新PipeLoad。

    SELECT auto_pipeload.refresh_pipeload('oss://testBucketName/testDocuments/','public.documents_text');
  3. 查询目标表。

    查询验证数据加载结果,您将看到original_content列已填充了从文档中提取的文本,同时file_url列也记录了文件的OSS路径。

    SELECT * FROM public.documents_text LIMIT 10;
  4. 查看管道状态。

    通过以下命令监控所有已注册管道的运行状态,该函数会返回一个状态摘要表,展示每个管道的配置、最后运行时间、状态、已处理文件数等信息。

    SELECT * FROM auto_pipeload.show_pipeload_status();

步骤五:注册pg_cron实现自动刷新

连接到数据库postgres,使用pg_cron创建一个定时任务。建议每分钟执行一次,用于触发Auto-PipeLoad的调度器,使其检查所有已注册管道的状态并自动刷新数据。

说明

云原生数据仓库AnalyticDB PostgreSQL实例已经预装了pg_cron插件,支持自动化调度功能。

查看定时任务执行信息或更改定时任务,具体操作请参见pg_cron

-- 连接到postgres库
-- 请将 '<yourDatabaseName>' 替换为您的真实数据库名称。
SELECT cron.schedule(
    'auto-pipeload-scheduler',                  -- 任务的唯一名称,便于管理
    '* * * * *',                                -- Cron表达式,表示每分钟执行一次
    'SELECT auto_pipeload.scheduled_refresh()', -- 需要执行的SQL命令
    '<yourDatabaseName>'                      -- 该任务在哪个数据库中运行
);

进阶功能:文本分块与向量化

对于相似度搜索等AI应用,您通常需要将文档分块并为每个分块生成向量嵌入。

步骤一:创建向量表

该表应包含存储文本块(例如sentence)和对应向量(例如vector)的列。为了实现高性能的向量检索,强烈建议创建ANN索引。

CREATE TABLE public.articles (
    id SERIAL PRIMARY KEY,
    sentence TEXT,
    vector REAL[],
    insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    file_url TEXT
) USING heap;

ALTER TABLE public.articles ALTER COLUMN vector SET STORAGE PLAIN;

-- ANN索引对于向量检索性能至关重要
CREATE INDEX idx_articles_l2 ON articles USING ann(vector) WITH (dim=1024,distancemeasure=l2, hnsw_m=64, hnsw_ef_construction=128)

步骤二:注册向量化管道

在注册管道时,启用embeddingwith_chunk,并正确映射chunk_columnvector_column

SELECT auto_pipeload.register_pipeload(
    p_oss_dir       => 'oss://testBucketName/aiDocuments/',
    p_table_name    => 'public.articles',
    embedding       => true,
    with_chunk      => true,
    chunk_column    => 'sentence',
    vector_column   => 'vector',
    url_column      => 'file_url',
    intime_column   => 'insert_time'
);

步骤三:手动刷新或等待自动刷新

注册管道后,您可以上传文件至OSS,然后手动触发刷新pipeload,或使用pg_cron创建自动刷新任务,数据将自动被处理成文本块和向量并入库。

SELECT auto_pipeload.refresh_pipeload('oss://testBucketName/aiDocuments/','public.articles');

步骤四:执行向量相似度检索

您可以使用数据库提供的向量操作符进行相似度搜索。

-- 查找与给定查询向量最相似的10个句子
-- 请将 array[...]::real[] 替换为您的1024维查询向量
SELECT id, sentence, l2_distance(vector, array[...]::real[]) as score
FROM public.articles
ORDER BY vector <-> array[...]::real[] 
LIMIT 10;

函数参考

函数

描述

set_oss_credentials

配置并存储OSS访问凭证。

set_dashscope_api_key

配置并存储DashScopeAPI密钥,用于向量嵌入功能。

register_pipeload

注册并配置新的数据管道,定义从OSS到数据库的数据流转规则。

refresh_pipeload

手动触发特定管道的同步和处理逻辑。

reload_pipeload

手动重新加载数据管道,清除现有数据并执行完全重新同步。

show_pipeload_status

返回所有已注册管道的详细状态信息。

pause_pipeload

暂停指定的数据管道。

active_pipeload

激活(恢复)已暂停的数据管道。

unregister_pipeload

注销数据管道并可选择性地清理相关数据。

scheduled_refresh

自动检查并刷新所有已注册的管道。

  • set_oss_credentials:配置并存储OSS访问凭证。

    auto_pipeload.set_oss_credentials(
        access_key_id TEXT,      -- OSS 访问密钥 ID
        access_key_secret TEXT,  -- OSS 访问密钥密码
        endpoint TEXT           -- OSS 端点地址
    )

    示例

    SELECT auto_pipeload.set_oss_credentials(
        'access_key_id',
        'access_key_secret',
        'endpoint'
    );
  • set_dashscope_api_key:配置并存储DashScopeAPI密钥,用于向量嵌入功能。

    auto_pipeload.set_dashscope_api_key(
        api_key TEXT  -- DashScope API 密钥
    )

    示例

    SELECT auto_pipeload.set_dashscope_api_key('sk-xxxxxxxxxxxxxxxx');
  • register_pipeload:注册并配置新的数据管道,定义从OSS到数据库的数据流转规则。

    auto_pipeload.register_pipeload(
        p_oss_dir TEXT,              -- [必需] OSS 目录路径(如:oss://testBucketName/documents/)
        p_table_name TEXT,           -- [必需] 目标表名
        embedding BOOLEAN,           -- 是否启用向量化,默认为 false
        with_chunk BOOLEAN,          -- 是否启用文本分块,默认为 false
        text_column TEXT,            -- 存储原始文本的列名
        chunk_column TEXT,           -- 存储文本块的列名(当 with_chunk=true 时使用)
        vector_column TEXT,          -- 存储向量嵌入的列名(当 embedding=true 时使用)
        url_column TEXT,             -- 存储源文件 URL 的列名(强烈建议设置,用于更新和删除)
        intime_column TEXT,          -- 存储数据摄入时间戳的列名
        period_minutes INTEGER,      -- 定时任务检查间隔(分钟),默认为 5
        chunk_size INTEGER,          -- 每个文本块的字符长度,默认为 500
        chunk_overlap INTEGER,       -- 连续文本块之间的重叠字符数,默认为 50
        api_url TEXT,                -- LLM API URL,默认为 https://dashscope.aliyuncs.com/compatible-mode/v1
        embedding_model TEXT,        -- 用于嵌入的 LLM 模型,默认为 text-embedding-v3
        embedding_dim INTEGER        -- 嵌入向量的维度,默认为 1024
    )

    示例1:纯文本提取管道

    SELECT auto_pipeload.register_pipeload(
        p_oss_dir       => 'oss://testBucketName/documents/',
        p_table_name    => 'public.documents_text',
        text_column     => 'original_content',
        embedding       => false,
        with_chunk      => false,
        intime_column   => 'insert_time',
        url_column      => 'file_url'
    );

    示例2:向量化和分块管道

    SELECT auto_pipeload.register_pipeload(
        p_oss_dir       => 'oss://testBucketName/articles/',
        p_table_name    => 'public.articles',
        embedding       => true,
        with_chunk      => true,
        chunk_column    => 'sentence',
        vector_column   => 'vector',
        url_column      => 'file_url',
        intime_column   => 'insert_time',
        chunk_size      => 1000,
        chunk_overlap   => 100,
        embedding_dim   => 1024
    );
  • refresh_pipeload:手动触发特定管道的同步和处理逻辑。

    auto_pipeload.refresh_pipeload(
        p_oss_dir TEXT,     -- OSS 目录路径
        p_table_name TEXT   -- 目标表名
    )

    示例

    SELECT auto_pipeload.refresh_pipeload(
      'oss://testBucketName/testDocuments/', 
      'public.documents_text' 
    );
  • reload_pipeload:手动重新加载数据管道,清除现有数据并执行完全重新同步。

    auto_pipeload.reload_pipeload(
        p_oss_dir TEXT,          -- OSS 目录路径
        p_table_name TEXT,       -- 目标表名
        p_truncate_table BOOLEAN -- 是否使用 TRUNCATE 清空表(true)或使用 DELETE(false)
    )

    示例

    -- 使用 TRUNCATE 快速清空表并重新加载
    SELECT auto_pipeload.reload_pipeload('oss://testBucketName/documents/', 'public.documents_text', true);
  • show_pipeload_status:返回所有已注册管道的详细状态信息。

    auto_pipeload.show_pipeload_status()

    返回字段

    • oss_dir:OSS 目录路径。

    • table_name:目标表名。

    • processing_mode:处理模式(是否chunk,是否embedding)。

    • is_active:是否启动。

    • period_minutes:刷新周期。

    • last_run:最后运行时间。

    • last_status:最后运行状态。

    • pending_files:待处理文件数量。

    • processed_files:已处理文件数量。

    • failed_files:处理失败文件数量。

    示例

    SELECT * FROM auto_pipeload.show_pipeload_status();
  • pause_pipeload:暂停指定的数据管道。

    auto_pipeload.pause_pipeload(
        p_oss_dir TEXT,     -- OSS 目录路径
        p_table_name TEXT   -- 目标表名
    )

    示例

    SELECT auto_pipeload.pause_pipeload('oss://testBucketName/documents/', 'public.documents_text');
  • active_pipeload:激活(恢复)已暂停的数据管道。

    auto_pipeload.activate_pipeload(
        p_oss_dir TEXT,     -- OSS 目录路径
        p_table_name TEXT   -- 目标表名
    )

    示例

    SELECT auto_pipeload.activate_pipeload('oss://testBucketName/documents/', 'public.documents_text');
  • unregister_pipeload:注销数据管道并可选择性地清理相关数据。

    auto_pipeload.unregister_pipeload(
        p_oss_dir TEXT,         -- OSS 目录路径
        p_table_name TEXT,      -- 目标表名
        p_cleanup_data BOOLEAN  -- 是否删除相关数据,默认为 false
    )

    示例

    -- 注销管道但保留数据
    SELECT auto_pipeload.unregister_pipeload('oss://testBucketName/documents/', 'public.documents_text', false);
    
    -- 注销管道并删除所有相关数据
    SELECT auto_pipeload.unregister_pipeload('oss://testBucketName/documents/', 'public.documents_text', true);
  • scheduled_refresh:由pg_cron定时调用的主调度函数,自动检查并刷新所有已注册的管道。

    重要

    此函数通常由定时任务自动调用,不需要手动执行。

    auto_pipeload.scheduled_refresh()