使用Auto-PipeLoad自动化导入非结构化数据
云原生数据仓库AnalyticDB PostgreSQL版推出Auto-PipeLoad
插件,旨在无缝连接对象存储(OSS)与数据库,实现非结构化数据的自动化ETL。该插件专为现代AI应用设计,原生支持对非结构化数据(如.pdf
, .docx
, .txt
)的文本提取、智能分块和向量化,高效地将其转化为可供分析的结构化或向量化数据,极大简化了从原始数据到AI应用的链路。
Auto-PipeLoad简介
Auto-PipeLoad
是云原生数据仓库AnalyticDB PostgreSQL版的一款扩展插件,它建立了一条从对象存储(OSS)到数据库内部表的自动化导入管道。用户仅需通过简单的SQL函数调用进行配置,即可实现对指定OSS目录下文件的持续监控。当新文件上传、文件更新或删除时,Auto-PipeLoad
会自动触发相应的数据处理和同步任务。
该插件的核心能力在于其强大的非结构化数据处理功能,主要适用于RAG(检索增强生成)等AI场景:
文本提取:自动从
.pdf
、.docx
、.txt
等多种格式的文件中抽取出纯文本内容。智能分块(Chunking):可将长文本按照预设的大小和重叠度切分成更小的、语义完整的片段,便于后续的向量化和检索。
向量化(Vectorization):能够调用外部大语言模型(如通义千问)的Embedding服务,将文本或文本块转换为高维向量,并存入向量数据库中。
通过将这些复杂的数据预处理流程封装在数据库内部,Auto-PipeLoad
为用户提供了一个纯SQL接口,可以完成端到端数据加载与处理,能够降低开发和维护成本。
版本限制
内核版本为v7.2.1.5及以上的AnalyticDB PostgreSQL 7.0版实例。
前提条件
开通AnalyticDB for PostgreSQL实例所在VPC的公网NAT网关,并创建SNAT条目绑定实例所在的VPC或交换机,使其具备公网访问能力。创建公网NAT网关会收取费用,详情请参见NAT 网关计费。
OSS:
创建OSS Bucket,云原生数据仓库 AnalyticDB PostgreSQL 版实例和OSS存储空间Bucket必须位于同一地域。
授权AnalyticDB for PostgreSQL实例访问OSS Bucket。具体操作,请参见Bucket Policy常见示例。
获取对应地域的内网Endpoint信息。
Auto-PipeLoad
依赖大语言模型进行embedding和PDF图片解析,若需使用向量化功能,需获取API Key作为调用大模型的鉴权凭证。安装
Auto-PipeLoad
插件。请提交工单联系技术支持协助安装此插件。
使用示例
本示例介绍如何创建一个简单的数据管道,自动从OSS目录中提取文档的文本内容,并加载到数据库表中。
步骤一:配置凭证
为了让插件能够访问您的OSS Bucket和AI模型服务,您需要配置必要的访问凭证。
配置OSS凭证。
SELECT auto_pipeload.set_oss_credentials( 'yourAccessKeyID', -- 替换为获取到的AccessKey ID 'yourAccessKeySecret', -- 替换为获取到的AccessKey Secret 'yourEndpoint' -- 替换为获取到的内网Endpoint );
配置DashScope API Key。
SELECT auto_pipeload.set_dashscope_api_key('sk-xxxxxxxxxxxxxxxx'); -- 替换为获取到的API Key
步骤二:创建目标表
在数据库中创建一张表,用于存储处理后的数据。
CREATE TABLE public.documents_text (
id SERIAL PRIMARY KEY,
original_content TEXT,
insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
file_url TEXT -- 建议包含此列,用于追溯源文件及处理更新/删除
);
步骤三:注册数据管道(PipeLoad)
调用register_pipeload
函数定义一个新的数据管道,告知插件需要监控哪个OSS目录,以及将处理结果存储在何处。
本示例中配置一个纯文本提取的管道:
SELECT auto_pipeload.register_pipeload(
p_oss_dir => 'oss://testBucketName/testDocuments/', -- [必需] 需要监控的OSS目录
p_table_name => 'public.documents_text', -- [必需] 目标数据表
text_column => 'original_content', -- [必需] 用于存储提取文本的列
embedding => false, -- 禁用向量化
with_chunk => false, -- 禁用分块
intime_column => 'insert_time', -- [可选] 记录加载时间戳的列
url_column => 'file_url' -- [可选] 记录源文件URL的列
);
执行成功后,auto_pipeload
不会立刻执行初始同步,只是保存管道配置。
步骤四:上传文件并手动刷新
上传文件至OSS。
您可以通过命令行工具ossutil或者OSS控制台,将
.txt
、.pdf
或.docx
文件上传到您在步骤三中配置的oss://testBucketName/testDocuments/
目录下。手动触发刷新PipeLoad。
SELECT auto_pipeload.refresh_pipeload('oss://testBucketName/testDocuments/','public.documents_text');
查询目标表。
查询验证数据加载结果,您将看到
original_content
列已填充了从文档中提取的文本,同时file_url
列也记录了文件的OSS路径。SELECT * FROM public.documents_text LIMIT 10;
查看管道状态。
通过以下命令监控所有已注册管道的运行状态,该函数会返回一个状态摘要表,展示每个管道的配置、最后运行时间、状态、已处理文件数等信息。
SELECT * FROM auto_pipeload.show_pipeload_status();
步骤五:注册pg_cron实现自动刷新
连接到数据库postgres,使用pg_cron
创建一个定时任务。建议每分钟执行一次,用于触发Auto-PipeLoad
的调度器,使其检查所有已注册管道的状态并自动刷新数据。
云原生数据仓库AnalyticDB PostgreSQL版实例已经预装了pg_cron
插件,支持自动化调度功能。
查看定时任务执行信息或更改定时任务,具体操作请参见pg_cron。
-- 连接到postgres库
-- 请将 '<yourDatabaseName>' 替换为您的真实数据库名称。
SELECT cron.schedule(
'auto-pipeload-scheduler', -- 任务的唯一名称,便于管理
'* * * * *', -- Cron表达式,表示每分钟执行一次
'SELECT auto_pipeload.scheduled_refresh()', -- 需要执行的SQL命令
'<yourDatabaseName>' -- 该任务在哪个数据库中运行
);
进阶功能:文本分块与向量化
对于相似度搜索等AI应用,您通常需要将文档分块并为每个分块生成向量嵌入。
步骤一:创建向量表
该表应包含存储文本块(例如sentence
)和对应向量(例如vector
)的列。为了实现高性能的向量检索,强烈建议创建ANN索引。
CREATE TABLE public.articles (
id SERIAL PRIMARY KEY,
sentence TEXT,
vector REAL[],
insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
file_url TEXT
) USING heap;
ALTER TABLE public.articles ALTER COLUMN vector SET STORAGE PLAIN;
-- ANN索引对于向量检索性能至关重要
CREATE INDEX idx_articles_l2 ON articles USING ann(vector) WITH (dim=1024,distancemeasure=l2, hnsw_m=64, hnsw_ef_construction=128)
步骤二:注册向量化管道
在注册管道时,启用embedding
和with_chunk
,并正确映射chunk_column
和vector_column
。
SELECT auto_pipeload.register_pipeload(
p_oss_dir => 'oss://testBucketName/aiDocuments/',
p_table_name => 'public.articles',
embedding => true,
with_chunk => true,
chunk_column => 'sentence',
vector_column => 'vector',
url_column => 'file_url',
intime_column => 'insert_time'
);
步骤三:手动刷新或等待自动刷新
注册管道后,您可以上传文件至OSS,然后手动触发刷新pipeload,或使用pg_cron
创建自动刷新任务,数据将自动被处理成文本块和向量并入库。
SELECT auto_pipeload.refresh_pipeload('oss://testBucketName/aiDocuments/','public.articles');
步骤四:执行向量相似度检索
您可以使用数据库提供的向量操作符进行相似度搜索。
-- 查找与给定查询向量最相似的10个句子
-- 请将 array[...]::real[] 替换为您的1024维查询向量
SELECT id, sentence, l2_distance(vector, array[...]::real[]) as score
FROM public.articles
ORDER BY vector <-> array[...]::real[]
LIMIT 10;
函数参考
函数 | 描述 |
配置并存储OSS访问凭证。 | |
配置并存储DashScope的API密钥,用于向量嵌入功能。 | |
注册并配置新的数据管道,定义从OSS到数据库的数据流转规则。 | |
手动触发特定管道的同步和处理逻辑。 | |
手动重新加载数据管道,清除现有数据并执行完全重新同步。 | |
返回所有已注册管道的详细状态信息。 | |
暂停指定的数据管道。 | |
激活(恢复)已暂停的数据管道。 | |
注销数据管道并可选择性地清理相关数据。 | |
自动检查并刷新所有已注册的管道。 |
set_oss_credentials:配置并存储OSS访问凭证。
auto_pipeload.set_oss_credentials( access_key_id TEXT, -- OSS 访问密钥 ID access_key_secret TEXT, -- OSS 访问密钥密码 endpoint TEXT -- OSS 端点地址 )
示例
SELECT auto_pipeload.set_oss_credentials( 'access_key_id', 'access_key_secret', 'endpoint' );
set_dashscope_api_key:配置并存储DashScope的API密钥,用于向量嵌入功能。
auto_pipeload.set_dashscope_api_key( api_key TEXT -- DashScope API 密钥 )
示例
SELECT auto_pipeload.set_dashscope_api_key('sk-xxxxxxxxxxxxxxxx');
register_pipeload:注册并配置新的数据管道,定义从OSS到数据库的数据流转规则。
auto_pipeload.register_pipeload( p_oss_dir TEXT, -- [必需] OSS 目录路径(如:oss://testBucketName/documents/) p_table_name TEXT, -- [必需] 目标表名 embedding BOOLEAN, -- 是否启用向量化,默认为 false with_chunk BOOLEAN, -- 是否启用文本分块,默认为 false text_column TEXT, -- 存储原始文本的列名 chunk_column TEXT, -- 存储文本块的列名(当 with_chunk=true 时使用) vector_column TEXT, -- 存储向量嵌入的列名(当 embedding=true 时使用) url_column TEXT, -- 存储源文件 URL 的列名(强烈建议设置,用于更新和删除) intime_column TEXT, -- 存储数据摄入时间戳的列名 period_minutes INTEGER, -- 定时任务检查间隔(分钟),默认为 5 chunk_size INTEGER, -- 每个文本块的字符长度,默认为 500 chunk_overlap INTEGER, -- 连续文本块之间的重叠字符数,默认为 50 api_url TEXT, -- LLM API URL,默认为 https://dashscope.aliyuncs.com/compatible-mode/v1 embedding_model TEXT, -- 用于嵌入的 LLM 模型,默认为 text-embedding-v3 embedding_dim INTEGER -- 嵌入向量的维度,默认为 1024 )
示例1:纯文本提取管道
SELECT auto_pipeload.register_pipeload( p_oss_dir => 'oss://testBucketName/documents/', p_table_name => 'public.documents_text', text_column => 'original_content', embedding => false, with_chunk => false, intime_column => 'insert_time', url_column => 'file_url' );
示例2:向量化和分块管道
SELECT auto_pipeload.register_pipeload( p_oss_dir => 'oss://testBucketName/articles/', p_table_name => 'public.articles', embedding => true, with_chunk => true, chunk_column => 'sentence', vector_column => 'vector', url_column => 'file_url', intime_column => 'insert_time', chunk_size => 1000, chunk_overlap => 100, embedding_dim => 1024 );
refresh_pipeload:手动触发特定管道的同步和处理逻辑。
auto_pipeload.refresh_pipeload( p_oss_dir TEXT, -- OSS 目录路径 p_table_name TEXT -- 目标表名 )
示例
SELECT auto_pipeload.refresh_pipeload( 'oss://testBucketName/testDocuments/', 'public.documents_text' );
reload_pipeload:手动重新加载数据管道,清除现有数据并执行完全重新同步。
auto_pipeload.reload_pipeload( p_oss_dir TEXT, -- OSS 目录路径 p_table_name TEXT, -- 目标表名 p_truncate_table BOOLEAN -- 是否使用 TRUNCATE 清空表(true)或使用 DELETE(false) )
示例
-- 使用 TRUNCATE 快速清空表并重新加载 SELECT auto_pipeload.reload_pipeload('oss://testBucketName/documents/', 'public.documents_text', true);
show_pipeload_status:返回所有已注册管道的详细状态信息。
auto_pipeload.show_pipeload_status()
返回字段
oss_dir
:OSS 目录路径。table_name
:目标表名。processing_mode
:处理模式(是否chunk,是否embedding)。is_active
:是否启动。period_minutes
:刷新周期。last_run
:最后运行时间。last_status
:最后运行状态。pending_files
:待处理文件数量。processed_files
:已处理文件数量。failed_files
:处理失败文件数量。
示例
SELECT * FROM auto_pipeload.show_pipeload_status();
pause_pipeload:暂停指定的数据管道。
auto_pipeload.pause_pipeload( p_oss_dir TEXT, -- OSS 目录路径 p_table_name TEXT -- 目标表名 )
示例
SELECT auto_pipeload.pause_pipeload('oss://testBucketName/documents/', 'public.documents_text');
active_pipeload:激活(恢复)已暂停的数据管道。
auto_pipeload.activate_pipeload( p_oss_dir TEXT, -- OSS 目录路径 p_table_name TEXT -- 目标表名 )
示例
SELECT auto_pipeload.activate_pipeload('oss://testBucketName/documents/', 'public.documents_text');
unregister_pipeload:注销数据管道并可选择性地清理相关数据。
auto_pipeload.unregister_pipeload( p_oss_dir TEXT, -- OSS 目录路径 p_table_name TEXT, -- 目标表名 p_cleanup_data BOOLEAN -- 是否删除相关数据,默认为 false )
示例
-- 注销管道但保留数据 SELECT auto_pipeload.unregister_pipeload('oss://testBucketName/documents/', 'public.documents_text', false); -- 注销管道并删除所有相关数据 SELECT auto_pipeload.unregister_pipeload('oss://testBucketName/documents/', 'public.documents_text', true);
scheduled_refresh:由
pg_cron
定时调用的主调度函数,自动检查并刷新所有已注册的管道。重要此函数通常由定时任务自动调用,不需要手动执行。
auto_pipeload.scheduled_refresh()