在数据驱动时代,非结构化数据(文本、图像、音视频、日志等)与结构化、半结构化数据(JSON)共同构成企业的核心数据资产。其中,非结构化数据以更原始、多元的形态蕴含着海量的业务洞察(如用户反馈、合同条款、产品缺陷图像),本文将会模拟金融场景中对招股书、合同等PDF文件的检索与分析,以辅助业务进行下一步的精细化运营决策。
核心能力介绍
本最佳实践主要是对PDF非结构化数据的处理与检索,包含的主要能力如下:
非结构化数据(Object Table):支持通过表的形式读取OSS中非结构化数据(PDF/IMAGE/PPT等)。
AI Function:在Hologres中可以用标准SQL的方式调用AI Function,自动调用内置大模型,完成AI服务建设场景
数据加工:提供Embed、Chunk算子,可以对非结构化数据加工成结构化数据存储,无需使用外部算法就能自动Embed。
数据检索和分析:提供
ai_gen
、ai_summarize
等算子,能够通过SQL对数据进行推理、问题总结及翻译等操作。
Dynamic Table介绍:支持增量刷新模式对非结构化数据自动加工,每次只计算增量的数据有效减少重复计算,降低资源利用率。
向量检索:支持标准SQL的向量检索,用于非结构化数据的相似度搜索、场景识别等,在同一个查询中可以自由地实现向量和标量的检索。
全文检索:通过倒排索引、分词等机制实现对非结构化数据的高效检索,支持关键词匹配、短语检索等丰富的检索方式,实现更加灵活的检索。
方案优势
通过如上核心能力,在Hologres中多模态AI检索与分析的核心优势如下:
完整的AI数据处理流程:涵盖从数据Embed、Chunk、增量加工、检索/分析的全流程,开发人员可以使用大数据系统一样,轻松构建AI应用。
标准SQL加工和分析非结构化数据:无需使用专用开发语言,纯SQL就能完成非结构化数据提取、加工,也无需借助外部系统,数据处理更加高效和简单,降低开发人员学习成本。
检索更精准、灵活和智能:可以轻松构建“关键词+语义+多模态”的混合检索链路,覆盖从精准搜索到意图理解的全场景需求。还能结合AI Function实现对用户意图的深度理解,语义关联和上下文推理,实现更智能的检索能力。
数据不出库,更安全:不需要将数据导出到外部系统,与hologres的多种安全能力无缝集成,高效保护数据安全。
本实践文档将会介绍如何通过上诉核心能力在Hologres中对非结构化数据加工和检索,助力搭建企业级多模态AI数据平台,打破数据孤岛,释放全域数据价值
方案流程
本次方案的流程如下:
数据集准备。
将金融数据集中的PDF文件上传至OSS存储。
PDF数据加工。
使用Object Table读取PDF的元数据信息,然后创建增量刷新的Dynamic Table,并对数据进行Embed和Chunk,同时也对Dynamic Table构建向量索引和全文索引,以便后续检索可以使用索引的能力。
使用
ai_embed
算子对将自然语言的问题进行Embedding,然后使用全文和向量双路召回结果,并对结果进行排序,结合大模型的推理能力,最终输出相似度最高的答案。
准备工作
数据准备
本文使用ModelScope公开的金融数据集中的PDF文件夹中的文件,共80份公司招股说明书。
环境准备
购买Hologres V4.0及以上版本实例并创建数据库。
本文以large-96core-512GB-384GB、1个节点为例。
模型部署。本次方案使用的模型以及分配的资源为:
模型名称
模型类别
模型作用描述
单副本CPU(Core)
单副本内存
单副本GPU
资源副本数
to_doc
ds4sd/docling-models
将PDF转换成文档。
20
100 GB
1卡(48 GB)
1
chunk
recursive-character-text-splitter
文档切片,每个PDF较大,建议使用切片。
15
30 GB
0卡(0 GB)
1
pdf_embed
BAAI/bge-base-zh-v1.5
文档Embedding。
7
30 GB
1卡(96 GB)
1
llm
Qwen/Qwen3-32B
使用大模型对检索出的文档内容按照提示词推理.
7
30 GB
1卡(96 GB)
1
说明上述模型的资源均为默认分配的资源。
操作步骤
下载PDF文件并上传至OSS。
下载博金大模型挑战赛-金融千问14b数据集中80份招股书(PDF)。
账号授权。
登录RAM控制台,创建阿里云RAM角色并授予OSS的相关权限。
推荐授予AliyunOSSReadOnlyAccess权限。
为上述阿里云RAM角色添加登录和Hologres的访问权限。
阿里云账号(主账号)
修改RAM角色的信任策略。重点需更新如下参数:
Action:更新为
sts:AssumeRole
。Service:更新为
hologres.aliyuncs.com
。
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "RAM": [ "acs:ram::1866xxxx:root" ], "Service": [ "hologres.aliyuncs.com" ] } } ], "Version": "1" }
RAM用户(子账号)
为RAM用户授权。
为已创建的RAM角色授权。
修改RAM角色的信任策略。重点需更新如下参数:
Action:更新为
sts:AssumeRole
。Service:更新为
hologres.aliyuncs.com
。
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "RAM": [ "acs:ram::1866xxxx:root" ], "Service": [ "hologres.aliyuncs.com" ] } } ], "Version": "1" }
对PDF文件进行Embedding和Chunk。
需要创建Object Table和Dynamic Table对PDF的元数据读取以及加工。因为流程较长,Hologres直接将其封装为存储过程。该存储过程包括的能力如下:
会创建一张Object Table,用于取PDF的元数据。
创建一张增量刷新模式的Dynamic Table结果表,用于存储加工后的数据。同时,该表需设置向量索引和全文索引,且Dynamic Table不设置自动刷新,需要手动刷新。
Dynamic Table的刷新过程中会使用
ai_embed
、ai_chunk
对数据进行Embed和切片。
该存储过程代码如下:
CALL create_rag_corpus_from_oss( oss_path => 'oss://xxxx/bs_challenge_financial_14b_dataset/pdf', oss_endpoint => 'oss-cn-hangzhou-internal.aliyuncs.com', oss_role_arn => 'acs:ram::186xxxx:role/xxxx', corpus_table => 'public.dt_bs_challenge_financial' );
刷新结果表。
通过如上存储过程创建的Object Table和Dynamic Table均需手动刷新,才能完成数据加工。该步骤已被封装为PDF加工存储过程,该存储过程包括的能力如下:
刷新一次Object Table获取PDF元数据
刷新一次Dynamic Table,进行PDF的Embedding和Chunk加工。
该存储过程使用代码如下:
CALL refresh_rag_corpus_table( corpus_table => 'public.dt_bs_challenge_financial' );
PDF检索。
加工好的数据可以根据业务使用场景,通过向量、全文等方式进行检索。例如:可以根据招股书来查询某个公司的业绩走势,以此来判断公司后续的走势是悲观还是乐观,以便对后续的投资意向提供辅助建议。
向量检索
在向量检索时,为了检索方便,我们将问题Embedding和Prompt构建,大模型输出答案等过程封装成为向量检索存储过程,直接调用如下该存储过程可以实现向量召回。
-- 向量单路召回 + AI重排 SELECT qa_retrieval( question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?', vector_index => 'dt_bs_challenge_financial.embedding_vector', prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}' )
检索答案如下:
qa_retrieval --------- "根据提供的信息,对湖南国科微电子股份有限公司的业绩走势进行分析,可以得出以下结论: ### 一、业绩走势分析:悲观 #### 1. **营业收入增长乏力** - 2014年度营业收入较上年增长 **15.13%**,但2015年度营业收入却 **下降5.21%**,2016年度数据未提供,但可以看出营业收入增长趋势在2015年出现明显下滑。 - 2012年至2014年营业收入的年复合增长率仅为 **4.47%**,表明公司业务扩张较为缓慢。 #### 2. **净利润增长持续下降** - 2014年度净利润增长 **5.43%**,2015年度净利润 **下降3.29%**。 - 扣除非经常性损益后,2014年度归属于母公司股东的净利润增长 **-3.14%**,2015年度进一步下降 **-5.60%**,表明公司主营业务盈利能力在持续恶化。 - 2012年至2014年扣除非经常性损益后净利润的年复合增长率为 **-4.38%**,远低于营业收入增长,说明公司主营业务盈利能力不足,增长主要依赖非经常性损益。 #### 3. **非经常性损益占比偏高** - 报告期内,非经常性损益占净利润的比例较高,2014年、2013年、2012年分别为 **17.54%、10.25%、8.06%**,表明公司利润中有一部分来自政策扶持、政府补贴等非经常性因素,而非核心业务的持续增长。 - 依赖非经常性损益来维持利润增长,不利于公司长期的稳定发展。 #### 4. **净资产收益率下降** - 加权平均净资产收益率从2014年的 **18.10%** 下降到2015年的 **24.82%**,再到2016年的 **28.23%**,虽然数据看似增长,但需注意该指标是以扣除非经常性损益后的净利润计算的,而净利润本身在下降,因此这种增长可能与资本结构变化有关,而非盈利能力的实质性提升。 ### 二、原因总结 1. **主营业务增长乏力**:营业收入和净利润增长均呈现下降趋势,尤其是净利润的下降表明公司盈利能力在减弱。 2. **非经常性损益依赖度高**:公司利润中非经常性损益占比较高,说明主营业务的盈利能力不足,公司业绩的持续性存疑。 3. **市场竞争激烈**:公司采购的工控机、显示器、电源等产品市场竞争激烈,价格平稳,利润空间受到挤压。 4. **行业环境影响**:不锈钢市场价格波动、原材料价格波动可能对公司经营业绩造成一定影响,虽然公司已采取措施降低影响,但长期来看仍需关注。 ### 三、结论 总体来看,湖南国科微电子股份有限公司的业绩走势偏 **悲观**。公司主营业务增长乏力,净利润持续下降,对非经常性损益依赖度高,未来盈利能力的可持续性存疑。公司需要加强核心业务的竞争力,优化成本结构,提高主营业务盈利能力,以实现长期稳健发展。"
全文检索
在全文检索时,为了检索方便,我们将问题Embedding和Prompt构建,大模型输出答案等过程封装为全文检索存储过程,直接调用如下存储过程可以实现全文召回:
--全文检索召回 SELECT qa_text_search_retrieval( question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?', text_search_index => 'dt_bs_challenge_financial.chunk', prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}' );
检索答案如下:
qa_text_search_retrieval ---------------- "根据提供的信息,湖南国科微电子股份有限公司在2014年、2015年和2016年的业绩走势整体上呈现**悲观**趋势,具体原因如下: ### 1. **营业收入增长乏力** - 2014年营业收入增长率为**15.13%**,但2015年营业收入增长率转为**-5.21%**,即出现负增长。 - 2012年至2014年的营业收入年复合增长率仅为**4.47%**,说明公司营业收入增长较为缓慢,业务发展不够强劲。 - 2015年上半年营业收入预测与2014年同期相比大致持平,但2015年上半年净利润较上年同期**略有下降**,表明盈利能力下降。 ### 2. **净利润和扣非净利润增长不佳** - 2014年净利润增长率为**5.43%**,2015年净利润增长率下降为**-3.29%**,即净利润出现下滑。 - 扣除非经常性损益后的净利润增长率在2014年为**-3.14%**,2015年进一步下降为**-5.60%**,说明公司主营业务的盈利能力持续下降。 - 2012年至2014年扣非净利润的年复合增长率为**-4.38%**,明显低于营业收入的年复合增长率,说明公司利润质量不高,主营业务盈利能力较弱。 ### 3. **经营活动现金流波动** - 2014年销售商品、提供劳务收到的现金占营业收入的比例较前两年有所下降,主要与部分收入确认项目的**收款跨期**有关,说明公司现金流管理存在问题。 - 2013年购买商品、接受劳务支付的现金占营业成本比例较高,主要是由于当年**采购原材料并完成生产**,但部分成本在2014年才结转,导致2014年该比例较低,反映公司采购和生产节奏不够稳定。 ### 4. **投资和盈利能力指标** - 加权平均净资产收益率(ROE)在2014年为**18.10%**,2015年上升至**24.82%**,2016年进一步上升至**28.23%**,虽然有所提升,但ROE的提高可能主要依赖**财务杠杆**,而非核心业务盈利能力的提升。 - 考虑到净利润和扣非净利润持续下降,ROE的提升并不能完全反映公司经营质量的改善。 ### 5. **2015年上半年业绩预测** - 2015年上半年营业收入预计为**8,505万元至10,395万元**,与2014年同期的**10,127.35万元**大致持平,但净利润预计为**2,340万元至2,860万元**,低于2014年同期的**2,912.66万元**,说明公司盈利能力下降。 ### 总结 综合来看,湖南国科微电子股份有限公司在2014年至2016年的业绩走势**偏向悲观**。虽然ROE有所提升,但营业收入增长乏力、净利润和扣非净利润持续下降、经营活动现金流波动较大,表明公司主营业务盈利能力较弱,经营质量有待提升。"
向量+全文混合检索
在向量、全文结合Rank排序混合检索场景中,为了检索方便,Hologres将其封装为向量+全文双路召回+Rank排序存储过程,该存储过程的能力如下:
根据问题使用向量计算,召回TOP 20的答案。
根据问题使用全文检索,召回TOP 20的答案。
使用
ai_rank
,对向量和全文召回的答案进行排序,最后输出Top1的答案。使用
ai_gen
,结合大模型根据提示词以及检索的答案,生成最终答案并进行输出。
-- 全文、向量双路召回 + AI重排 SELECT qa_hybrid_retrieval( question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?', vector_index => 'dt_bs_challenge_financial.embedding_vector', text_search_index => 'dt_bs_challenge_financial.chunk', prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}' );
检索答案如下:
qa_hybrid_retrieval --- "根据提供的信息,我们可以对湖南国科微电子股份有限公司的业绩走势进行如下分析,并判断其趋势是悲观还是乐观: --- ### 一、**营业收入走势分析** 1. **2012-2014年复合增长率**: - 营业收入的年复合增长率为 **4.47%**,表明公司营业收入的增长较为平稳。 - 2014年营业收入为 **18,154.06万元**,较2013年增长 **15.13%**。 - 2015年营业收入较上年 **下降5.21%**,出现了负增长。 2. **结论**: - 营业收入的增长在2014年有所回升,但2015年出现明显下滑,表明公司业务扩张遇到了一定阻力。 --- ### 二、**净利润走势分析** 1. **2012-2014年复合增长率**: - 扣除非经常性损益后的净利润年复合增长率为 **-4.38%**,低于营业收入的年复合增长率,说明公司盈利能力有所下降。 - 2014年扣非净利润为 **42,731,071.18元**,较2013年下降 **3.14%**。 - 2015年扣非净利润较上年进一步下降 **5.60%**。 2. **非经常性损益影响**: - 2014年、2013年及2012年非经常性损益占净利润的比例分别为 **17.54%、10.25%、8.06%**,呈上升趋势。 - 非经常性损益的增加主要来自于政府补贴和理财产品收益,而非主营业务带来的持续增长。 3. **结论**: - 扣非净利润连续两年下降,说明公司主营业务盈利能力减弱,业绩增长依赖于非经常性损益,这是令人担忧的信号。 --- ### 三、**现金流量与经营稳定性** 1. **经营活动现金流**: - 2014年营业收入为 **18,154.06万元**,但销售商品、提供劳务收到的现金并未明确给出,无法判断现金流是否健康。 - 报告期内,公司银行存款分别为 **13,063.38万元、4,152.54万元、9,864.61万元**,资金流动性波动较大,但主要客户、供应商及经营模式保持稳定。 2. **结论**: - 虽然公司现金流存在波动,但客户和供应商稳定,经营模式未发生重大变化,这为公司未来的发展提供了一定保障。 --- ### 四、**2015年上半年业绩预测** - 2015年1至6月预计营业收入为 **8,505万元至10,395万元**,较2014年同期的 **4,641.19万元**有明显增长。 - 但2015年1至3月净利润较上年同期下降 **48.26%**,主要是因为确认收入的项目毛利率较低。 --- ### 五、**综合分析与判断** 1. **乐观因素**: - 2014年营业收入增长较快,达到 **15.13%**。 - 2015年上半年营业收入预计增长显著,表明公司可能正在逐步恢复。 - 主要客户、供应商及经营模式保持稳定,为公司提供了良好的运营基础。 2. **悲观因素**: - 2015年营业收入较上年 **下降5.21%**,净利润也出现下滑。 - 扣非净利润连续两年下降,表明公司主营业务盈利能力不足。 - 非经常性损益占比上升,业绩增长依赖于政府补贴和理财产品收益,缺乏内生增长动力。 - 2015年1至3月净利润大幅下滑 **48.26%**,表明短期业绩波动较大。 --- ### **最终结论:整体趋势偏悲观** - 尽管公司在2014年营业收入有所回升,且2015年上半年预计增长,但 **扣非净利润连续下降**、**净利润增长依赖非经常性损益**、**短期业绩波动较大**,表明公司目前的业绩增长缺乏持续性和稳定性。 - 因此,从长期来看,公司业绩走势偏 **悲观**,需关注其主营业务盈利能力的改善和非经常性损益的依赖问题。 --- ### **建议** 1. 关注公司未来主营业务的盈利能力是否能有所提升。 2. 降低对非经常性损益的依赖,提高内生增长动力。 3. 稳定客户和供应商关系,优化业务结构,提高毛利率。"
向量+全文双路召回+RRF排序
使用向量和全文检索后,通过RRF (Reciprocal Rank Fusion)排序召回结果。为了检索方便,Hologres已经封装为向量+全文双路召回+RRF排序存储过程(详细定义见下方附录),该存储过程的能力如下:
根据问题使用向量计算,召回TOP 20的答案。
根据问题使用全文检索,召回TOP 20的答案。
对向量和全文召回的答案,计算RRF分数,最后输出Top N的答案。
使用
ai_gen
、大模型根据提示词,以及检索的答案,拼装成最终答案并输出。
-- 全文、向量双路召回 + RRF重排 SELECT qa_hybrid_retrieval_rrf( question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?', vector_index => 'dt_bs_challenge_financial.embedding_vector', text_search_index => 'dt_bs_challenge_financial.chunk' );
检索答案如下:
qa_hybrid_retrieval_rrf ------------------ "根据提供的信息,对湖南国科微电子股份有限公司的业绩走势进行分析,可以得出以下结论: ### **业绩走势判断:悲观** #### **原因分析如下:** 1. **净利润增长低于营业收入增长:** - 提供的信息指出,公司2012年至2014年的**营业收入年复合增长率为4.47%**,表明公司整体业务增长较为平稳。 - 但**扣除非经常性损益后归属于母公司股东的净利润年复合增长率为-4.38%**,明显低于营业收入的增长率。这说明公司在收入增长的同时,盈利能力并未同步提升,甚至出现下滑,可能受到成本上升、毛利率下降或非经常性损益减少等因素影响。 2. **净利润波动较大:** - 2015年1至3月的净利润较上年同期减少了48.26%,且主要原因在于确认收入的项目毛利率较低(如无锡地铁一号线项目以模块外购为主)。这表明公司短期业绩容易受到业务结构变化的影响,存在一定的不稳定性。 3. **毛利率和盈利能力下降:** - 提到“主营业务利润对公司净利润的贡献”在2014年较2013年略有下降,而2013年又较2012年下降。这说明公司核心业务的盈利能力可能在减弱,可能受到市场竞争加剧、成本上升或产品结构变化的影响。 4. **2015年上半年预测利润下降:** - 2015年1至6月预计营业收入为8,505万元至10,395万元,与2014年同期大致持平,但净利润预计为2,340万元至2,860万元,低于2014年同期的2,912.66万元。这表明公司盈利能力在进一步下降,可能面临一定的经营压力。 5. **业务增长乏力:** - 虽然营业收入增长较为平稳,但净利润的下降表明公司业务增长的质量不高,未能有效转化为利润。这可能影响投资者对公司未来发展的信心。 ### **总结:** 湖南国科微电子股份有限公司的业绩走势整体偏向**悲观**。虽然营业收入保持了平稳增长,但净利润的增长明显滞后甚至出现负增长,表明公司盈利能力在下降,业务发展质量不高,且存在短期业绩波动的风险。如果公司不能有效提升毛利率、控制成本或优化产品结构,未来业绩可能继续承压。"
附录:存储过程定义
上述文档中使用的存储过程定义如下,方便您做参考。
在Hologres中不支持创建Function,如下存储过程仅做参考,无法修改后直接执行。
PDF加工存储过程
创建Object Table和Dynamic Table
CREATE OR REPLACE PROCEDURE create_rag_corpus_from_oss( oss_path TEXT, oss_endpoint TEXT, oss_role_arn TEXT, corpus_table TEXT, embedding_model TEXT DEFAULT NULL, parse_document_model TEXT DEFAULT NULL, chunk_model TEXT DEFAULT NULL, chunk_size INT DEFAULT 300, chunk_overlap INT DEFAULT 50, overwrite BOOLEAN DEFAULT FALSE ) AS $$ DECLARE corpus_schema TEXT; corpus_name TEXT; obj_table_name TEXT; full_corpus_ident TEXT; full_obj_ident TEXT; embed_expr TEXT; chunk_expr TEXT; parse_expr TEXT; BEGIN -- 1. 拆 schema name + table name IF position('.' in corpus_table) > 0 THEN corpus_schema := split_part(corpus_table, '.', 1); corpus_name := split_part(corpus_table, '.', 2); ELSE corpus_schema := NULL; corpus_name := corpus_table; END IF; obj_table_name := corpus_name || '_obj_table'; IF corpus_schema IS NULL THEN full_corpus_ident := format('%I', corpus_name); full_obj_ident := format('%I', obj_table_name); ELSE full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name); END IF; -- 2. 如果需要覆盖,先删表和索引 IF overwrite THEN DECLARE dyn_table_exists BOOLEAN; rec RECORD; BEGIN -- 检查 dynamic table 是否存在 SELECT EXISTS ( SELECT 1 FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = corpus_name AND n.nspname = COALESCE(corpus_schema, 'public') ) INTO dyn_table_exists; IF dyn_table_exists THEN -- 2.1 关闭动态表自动刷新 RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident; EXECUTE format( 'ALTER TABLE %s SET (auto_refresh_enable=false)', full_corpus_ident ); -- 2.2 查找 RUNNING 刷新任务并取消 FOR rec IN EXECUTE format( $f$ SELECT query_job_id FROM hologres.hg_dynamic_table_refresh_log(%L) WHERE status = 'RUNNING'; $f$, corpus_table ) LOOP RAISE NOTICE 'Found running refresh job: %', rec.query_job_id; IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id; ELSE RAISE WARNING 'Cancel job % failed.', rec.query_job_id; END IF; END LOOP; -- 2.3 删除 Dynamic Table EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident); ELSE RAISE NOTICE 'Dynamic table % does not exist, skip cancel job and drop.', full_corpus_ident; END IF; -- 2.4 无论如何,Object Table 都要删除 EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident); END; END IF; -- 3. 创建 Object Table RAISE NOTICE 'Create object table: %', obj_table_name; EXECUTE format( $f$ CREATE OBJECT TABLE %s WITH ( path = %L, oss_endpoint = %L, role_arn = %L ); $f$, full_obj_ident, oss_path, oss_endpoint, oss_role_arn ); COMMIT; -- 4. 刷新 Object Table RAISE NOTICE 'Refresh object table: %', obj_table_name; EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident); COMMIT; -- 5. 文档解析模型选择 IF parse_document_model IS NULL OR length(trim(parse_document_model)) = 0 THEN parse_expr := 'ai_parse_document(file, ''auto'', ''markdown'')'; ELSE parse_expr := format( 'ai_parse_document(%L, file, ''auto'', ''markdown'')', parse_document_model ); END IF; -- 6. chunk 模型选择 IF chunk_model IS NULL OR length(trim(chunk_model)) = 0 THEN chunk_expr := format('ai_chunk(doc, %s, %s)', chunk_size, chunk_overlap); ELSE chunk_expr := format( 'ai_chunk(%L, doc, %s, %s)', chunk_model, chunk_size, chunk_overlap ); END IF; -- 7. embedding 模型选择 IF embedding_model IS NULL OR length(trim(embedding_model)) = 0 THEN embed_expr := 'ai_embed(chunk)'; ELSE embed_expr := format( 'ai_embed(%L, chunk)', embedding_model ); END IF; -- 8. 创建 RAG 输出动态表 RAISE NOTICE 'create dynamic table: %', corpus_name; EXECUTE format( $f$ CREATE DYNAMIC TABLE %s WITH ( auto_refresh_enable = FALSE , auto_refresh_mode = 'incremental', freshness = '5 minutes' ) AS WITH parsed_doc AS ( SELECT object_uri, etag, %s AS doc FROM %s ), chunked_doc AS ( SELECT object_uri, etag, unnest(%s) AS chunk FROM parsed_doc ) SELECT object_uri, etag, chunk, %s AS embedding_vector FROM chunked_doc; $f$, full_corpus_ident, parse_expr, full_obj_ident, chunk_expr, embed_expr ); COMMIT; -- 9. 创建全文索引(索引名 = 表名 || '_fulltext_idx') EXECUTE format( 'CREATE INDEX %I ON %s USING FULLTEXT (chunk);', corpus_name || '_fulltext_idx', full_corpus_ident ); RAISE NOTICE ''; RAISE NOTICE 'Create RAG corpus success to table: %', corpus_table; RAISE NOTICE ' Vector index is: %.embedding_vector', corpus_table; RAISE NOTICE ' TextSearch index is: %.chunk', corpus_table; END; $$ LANGUAGE plpgsql;
刷新Object Table和Dynamic Table存储过程
CREATE OR REPLACE PROCEDURE refresh_rag_corpus_table( corpus_table TEXT ) AS $$ DECLARE corpus_schema TEXT; corpus_name TEXT; obj_table_name TEXT; full_corpus_ident TEXT; full_obj_ident TEXT; BEGIN -- 1. 解析 schema 和表名 IF position('.' in corpus_table) > 0 THEN corpus_schema := split_part(corpus_table, '.', 1); corpus_name := split_part(corpus_table, '.', 2); ELSE corpus_schema := NULL; corpus_name := corpus_table; END IF; obj_table_name := corpus_name || '_obj_table'; IF corpus_schema IS NULL THEN full_corpus_ident := format('%I', corpus_name); full_obj_ident := format('%I', obj_table_name); ELSE full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name); END IF; -- 2. 刷新 Object Table RAISE NOTICE 'Refreshing Object Table: %', obj_table_name; EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident); -- 3. 刷新 Dynamic Table RAISE NOTICE 'Refreshing Dynamic Table: %', corpus_name; EXECUTE format('REFRESH TABLE %s;', full_corpus_ident); RAISE NOTICE 'Refresh complete for corpus table %', corpus_table; END; $$ LANGUAGE plpgsql;
删除Object Table和Dynamic Table存储过程
CREATE OR REPLACE PROCEDURE drop_rag_corpus_table( corpus_table TEXT ) AS $$ DECLARE corpus_schema TEXT; corpus_name TEXT; obj_table_name TEXT; full_corpus_ident TEXT; full_obj_ident TEXT; rec RECORD; BEGIN -- 1. 解析 schema 和表名 IF position('.' in corpus_table) > 0 THEN corpus_schema := split_part(corpus_table, '.', 1); corpus_name := split_part(corpus_table, '.', 2); ELSE corpus_schema := NULL; corpus_name := corpus_table; END IF; obj_table_name := corpus_name || '_obj_table'; IF corpus_schema IS NULL THEN full_corpus_ident := format('%I', corpus_name); full_obj_ident := format('%I', obj_table_name); ELSE full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name); END IF; -- 2. 删除表 -- 2.1 关闭动态表自动刷新 RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident; EXECUTE format('ALTER TABLE IF EXISTS %s SET (auto_refresh_enable=false)', full_corpus_ident); -- 2.2 查找 RUNNING 刷新任务并取消 FOR rec IN EXECUTE format( $f$ SELECT query_job_id FROM hologres.hg_dynamic_table_refresh_log(%L) WHERE status = 'RUNNING'; $f$, corpus_table ) LOOP RAISE NOTICE 'Found running refresh job: %', rec.query_job_id; IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id; ELSE RAISE WARNING 'Cancel job % failed.', rec.query_job_id; END IF; END LOOP; -- 2.3 删除 Dynamic Table RAISE NOTICE 'Dropping Object Table: %', corpus_name; EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident); -- 2.4 删除 Object Table RAISE NOTICE 'Dropping Object Table: %', obj_table_name; EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident); RAISE NOTICE 'Drop complete for corpus: %', corpus_table; END; $$ LANGUAGE plpgsql;
向量检索存储过程
-- RAG向量单路召回问答
CREATE OR REPLACE FUNCTION qa_retrieval(
question TEXT,
vector_index TEXT,
embedding_model TEXT DEFAULT NULL,
llm_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT '请根据参考信息回答以下问题:${question}\n\n 参考信息:\n\n ${context}',
vector_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
embedding_table TEXT;
embedding_col TEXT;
sql TEXT;
embedding_expr TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
llm_model_valid BOOLEAN;
BEGIN
-- 解析 vector_index
SELECT table_path, column_name
INTO embedding_table, embedding_col
FROM parse_index(vector_index);
-- embedding表达式处理
IF embedding_model IS NULL OR trim(embedding_model) = '' THEN
embedding_expr := 'ai_embed(questions.query_text)';
ELSE
embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', questions.query_text)';
END IF;
-- LLM模型处理
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
-- ai_rank表达式
IF llm_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(llm_model) || ', questions.query_text, chunk)';
ELSE
ai_rank_expr := 'ai_rank(questions.query_text, chunk)';
END IF;
-- ai_gen表达式:通过模板替换生成完整 prompt
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(' || quote_literal(prompt) || ', ''${question}'', questions.query_text), ''${context}'', merged_chunks) )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(' || quote_literal(prompt) || ', ''${question}'', questions.query_text), ''${context}'', merged_chunks))';
END IF;
-- 动态SQL
sql := '
WITH
questions AS (
SELECT ' || quote_literal(question) || ' AS query_text
),
embedding_recall AS (
SELECT
chunk,
approx_cosine_distance(' || embedding_col || ', ' || embedding_expr || ') AS distance
FROM
' || embedding_table || ', questions
ORDER BY
distance DESC
LIMIT ' || vector_recall_count || '
),
rerank AS (
SELECT
chunk,
' || ai_rank_expr || ' AS score
FROM
embedding_recall, questions
ORDER BY
score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks, questions;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;
全文检索存储过程
CREATE OR REPLACE FUNCTION qa_text_search_retrieval(
question TEXT,
text_search_index TEXT,
llm_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT '请根据参考信息回答以下问题:${question}\n\n参考信息:\n\n${context}',
text_search_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
text_search_table TEXT;
text_search_col TEXT;
sql TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
llm_model_valid BOOLEAN;
BEGIN
-- 解析 text_search_index
SELECT table_path, column_name
INTO text_search_table, text_search_col
FROM parse_index(text_search_index);
-- LLM 模型处理
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
-- ai_rank 表达式
IF llm_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(llm_model) || ', questions.query_text, chunk)';
ELSE
ai_rank_expr := 'ai_rank(questions.query_text, chunk)';
END IF;
-- ai_gen 表达式
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(' || quote_literal(prompt) ||
', ''${question}'', questions.query_text), ''${context}'', merged_chunks) )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', questions.query_text), ''${context}'', merged_chunks))';
END IF;
-- 动态 SQL
sql := '
WITH
questions AS (
SELECT ' || quote_literal(question) || ' AS query_text
),
-- 全文搜索召回
text_search_recall AS (
SELECT
chunk
FROM
' || text_search_table || ', questions
ORDER BY
text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
LIMIT ' || text_search_recall_count || '
),
-- rerank
rerank AS (
SELECT
chunk,
' || ai_rank_expr || ' AS score
FROM
text_search_recall, questions
ORDER BY
score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks, questions;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;
向量+全文双路召回+Rank排序存储过程
-- 全文、向量双路召回 + rerank
CREATE OR REPLACE FUNCTION qa_hybrid_retrieval(
question TEXT,
vector_index TEXT,
text_search_index TEXT,
embedding_model TEXT DEFAULT NULL,
llm_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT '请根据参考信息回答以下问题:${question}\n\n 参考信息:\n\n ${context}',
text_search_recall_count INT DEFAULT 20,
vector_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
embedding_table TEXT;
embedding_col TEXT;
text_search_table TEXT;
text_search_col TEXT;
sql TEXT;
embedding_expr TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
llm_model_valid BOOLEAN;
BEGIN
-- 解析 vector_index
SELECT table_path, column_name
INTO embedding_table, embedding_col
FROM parse_index(vector_index);
-- 解析 text_search_index
SELECT table_path, column_name
INTO text_search_table, text_search_col
FROM parse_index(text_search_index);
-- embedding表达式处理
IF embedding_model IS NULL OR trim(embedding_model) = '' THEN
embedding_expr := 'ai_embed(questions.query_text)';
ELSE
embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', questions.query_text)';
END IF;
-- LLM模型处理
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
-- ai_rank表达式
IF llm_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(llm_model) || ', questions.query_text, chunk)';
ELSE
ai_rank_expr := 'ai_rank(questions.query_text, chunk)';
END IF;
-- ai_gen表达式:通过模板替换生成完整 prompt
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(' || quote_literal(prompt) || ', ''${question}'', questions.query_text), ''${context}'', merged_chunks) )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(' || quote_literal(prompt) || ', ''${question}'', questions.query_text), ''${context}'', merged_chunks))';
END IF;
-- 构造动态SQL(双路召回 union 后再 rerank)
sql := '
WITH
questions AS (
SELECT ' || quote_literal(question) || ' AS query_text
),
-- 向量召回
embedding_recall AS (
SELECT
chunk
FROM
' || embedding_table || ', questions
ORDER BY
approx_cosine_distance(' || embedding_col || ', ' || embedding_expr || ') DESC
LIMIT ' || vector_recall_count || '
),
-- 全文搜索召回
text_search_recall AS (
SELECT
chunk
FROM
' || text_search_table || ', questions
ORDER BY
text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
LIMIT ' || text_search_recall_count || '
),
-- 合并去重
union_recall AS (
SELECT chunk FROM embedding_recall
UNION
SELECT chunk FROM text_search_recall
),
-- rerank
rerank AS (
SELECT
chunk,
' || ai_rank_expr || ' AS score
FROM
union_recall, questions
ORDER BY
score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks, questions;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;
向量+全文双路召回+RRF排序存储过程
-- 全文、向量双路召回 + RRF rerank (Reciprocal Rank Fusion)
CREATE OR REPLACE FUNCTION qa_hybrid_retrieval_rrf(
question TEXT,
vector_index TEXT,
text_search_index TEXT,
embedding_model TEXT DEFAULT NULL,
llm_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT '请根据参考信息回答以下问题:${question}\n\n 参考信息:\n\n ${context}',
text_search_recall_count INT DEFAULT 20,
vector_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5,
rrf_k INT DEFAULT 60
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
embedding_table TEXT;
embedding_col TEXT;
text_search_table TEXT;
text_search_col TEXT;
sql TEXT;
embedding_expr TEXT;
ai_gen_expr TEXT;
llm_model_valid BOOLEAN;
BEGIN
-- 解析 vector_index
SELECT table_path, column_name
INTO embedding_table, embedding_col
FROM parse_index(vector_index);
-- 解析 text_search_index
SELECT table_path, column_name
INTO text_search_table, text_search_col
FROM parse_index(text_search_index);
-- 构造 embedding 调用表达式
IF embedding_model IS NULL OR trim(embedding_model) = '' THEN
embedding_expr := 'ai_embed(questions.query_text)';
ELSE
embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', questions.query_text)';
END IF;
-- 检查是否指定了 LLM 模型
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
-- 构造 ai_gen 调用表达式(替换 prompt 模板里的变量)
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(' || quote_literal(prompt) ||
', ''${question}'', questions.query_text), ''${context}'', merged_chunks) )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', questions.query_text), ''${context}'', merged_chunks))';
END IF;
-- 拼 SQL
sql := '
WITH
questions AS (
SELECT ' || quote_literal(question) || ' AS query_text
),
-- 向量检索召回 + 排名(approx_cosine_distance 调用一次)
embedding_recall AS (
SELECT
chunk,
vec_score,
ROW_NUMBER() OVER (ORDER BY vec_score DESC) AS rank_vec
FROM (
SELECT
chunk,
approx_cosine_distance(' || embedding_col || ', ' || embedding_expr || ') AS vec_score
FROM
' || embedding_table || ', questions
) t
ORDER BY vec_score DESC
LIMIT ' || vector_recall_count || '
),
-- 全文检索召回 + 排名(score 已有,不重复算)
text_search_recall AS (
SELECT
chunk,
text_score,
ROW_NUMBER() OVER (ORDER BY text_score DESC) AS rank_text
FROM (
SELECT
chunk,
text_search(' || text_search_col || ', ' || quote_literal(question) || ') AS text_score
FROM
' || text_search_table || '
) ts
WHERE text_score > 0
ORDER BY text_score DESC
LIMIT ' || text_search_recall_count || '
),
-- 计算 RRF 分数
rrf_scores AS (
SELECT
chunk,
SUM(1.0 / (' || rrf_k || ' + rank_val)) AS rrf_score
FROM (
SELECT chunk, rank_vec AS rank_val FROM embedding_recall
UNION ALL
SELECT chunk, rank_text AS rank_val FROM text_search_recall
) sub
GROUP BY chunk
),
-- 取融合后的 top N
top_chunks AS (
SELECT chunk
FROM rrf_scores
ORDER BY rrf_score DESC
LIMIT ' || rerank_recall_count || '
),
-- 合并上下文
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks
FROM top_chunks
)
-- 生成答案
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks, questions;
';
-- 执行并返回
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;