最佳实践:金融多模态AI数据分析与检索系统

在数据驱动时代,非结构化数据(文本、图像、音视频、日志等)与结构化、半结构化数据(JSON)共同构成企业的核心数据资产。其中,非结构化数据以更原始、多元的形态蕴含着海量的业务洞察(如用户反馈、合同条款、产品缺陷图像),本文将会模拟金融场景中对招股书、合同等PDF文件的检索与分析,以辅助业务进行下一步的精细化运营决策。

核心能力介绍

本最佳实践主要是对PDF非结构化数据的处理与检索,包含的主要能力如下:

  • 非结构化数据(Object Table):支持通过表的形式读取OSS中非结构化数据(PDF/IMAGE/PPT等)。

  • AI Function:在Hologres中可以用标准SQL的方式调用AI Function,自动调用内置大模型,完成AI服务建设场景

    • 数据加工:提供Embed、Chunk算子,可以对非结构化数据加工成结构化数据存储,无需使用外部算法就能自动Embed。

    • 数据检索和分析:提供ai_genai_summarize等算子,能够通过SQL对数据进行推理、问题总结及翻译等操作。

  • Dynamic Table介绍:支持增量刷新模式对非结构化数据自动加工,每次只计算增量的数据有效减少重复计算,降低资源利用率。

  • 向量检索:支持标准SQL的向量检索,用于非结构化数据的相似度搜索、场景识别等,在同一个查询中可以自由地实现向量和标量的检索。

  • 全文检索:通过倒排索引、分词等机制实现对非结构化数据的高效检索,支持关键词匹配、短语检索等丰富的检索方式,实现更加灵活的检索。

方案优势

通过如上核心能力,在Hologres中多模态AI检索与分析的核心优势如下:

  • 完整的AI数据处理流程:涵盖从数据Embed、Chunk、增量加工、检索/分析的全流程,开发人员可以使用大数据系统一样,轻松构建AI应用。

  • 标准SQL加工和分析非结构化数据:无需使用专用开发语言,SQL就能完成非结构化数据提取、加工,也无需借助外部系统,数据处理更加高效和简单,降低开发人员学习成本。

  • 检索更精准、灵活和智能:可以轻松构建“关键词+语义+多模态”的混合检索链路,覆盖从精准搜索到意图理解的全场景需求。还能结合AI Function实现对用户意图的深度理解,语义关联和上下文推理,实现更智能的检索能力。

  • 数据不出库,更安全:不需要将数据导出到外部系统,与hologres的多种安全能力无缝集成,高效保护数据安全。

本实践文档将会介绍如何通过上诉核心能力在Hologres中对非结构化数据加工和检索,助力搭建企业级多模态AI数据平台,打破数据孤岛,释放全域数据价值

方案流程

本次方案的流程如下:

  1. 数据集准备。

    金融数据集中的PDF文件上传至OSS存储。

  2. PDF数据加工。

    使用Object Table读取PDF的元数据信息,然后创建增量刷新的Dynamic Table,并对数据进行EmbedChunk,同时也对Dynamic Table构建向量索引和全文索引,以便后续检索可以使用索引的能力。

  3. 使用ai_embed算子对将自然语言的问题进行Embedding,然后使用全文和向量双路召回结果,并对结果进行排序,结合大模型的推理能力,最终输出相似度最高的答案。

image

准备工作

  • 数据准备

    本文使用ModelScope公开的金融数据集中的PDF文件夹中的文件,共80份公司招股说明书。

  • 环境准备

    1. 购买Hologres V4.0及以上版本实例并创建数据库

    2. 购买AI资源

      本文以large-96core-512GB-384GB、1个节点为例。

    3. 模型部署。本次方案使用的模型以及分配的资源为:

      模型名称

      模型类别

      模型作用描述

      单副本CPU(Core

      单副本内存

      单副本GPU

      资源副本数

      to_doc

      ds4sd/docling-models

      PDF转换成文档。

      20

      100 GB

      1卡(48 GB)

      1

      chunk

      recursive-character-text-splitter

      文档切片,每个PDF较大,建议使用切片。

      15

      30 GB

      0卡(0 GB)

      1

      pdf_embed

      BAAI/bge-base-zh-v1.5

      文档Embedding。

      7

      30 GB

      1卡(96 GB)

      1

      llm

      Qwen/Qwen3-32B

      使用大模型对检索出的文档内容按照提示词推理.

      7

      30 GB

      1卡(96 GB)

      1

      说明

      上述模型的资源均为默认分配的资源。

操作步骤

  1. 下载PDF文件并上传至OSS。

    1. 下载博金大模型挑战赛-金融千问14b数据集80份招股书(PDF)。

    2. 登录OSS管理控制台创建Bucket并将已下载的PDF文件上传至该Bucket路径下。上传操作详情,请参见简单上传

  2. 账号授权。

    1. 登录RAM控制台,创建阿里云RAM角色并授予OSS的相关权限。

      推荐授予AliyunOSSReadOnlyAccess权限。

    2. 为上述阿里云RAM角色添加登录和Hologres的访问权限。

      • 阿里云账号(主账号)

        修改RAM角色的信任策略。重点需更新如下参数:

        • Action:更新为sts:AssumeRole

        • Service:更新为hologres.aliyuncs.com

        {
          "Statement": [
            {
              "Action": "sts:AssumeRole",
              "Effect": "Allow",
              "Principal": {
                "RAM": [
                  "acs:ram::1866xxxx:root"
                ],
                "Service": [
                  "hologres.aliyuncs.com"
                ]
              }
            }
          ],
          "Version": "1"
        }
      • RAM用户(子账号)

        1. RAM用户授权。

          1. 权限管理 > 权限策略页面,单击创建权限策略,并选择脚本编辑模式创建权限策略。具体操作,请参见创建自定义权限策略

            Hologres可通过该策略判断当前RAM用户是否具备创建对应RAM角色的权限。权限策略内容如下。

            {
              "Version": "1",
              "Statement": [
                {
                  "Effect": "Allow",
                  "Action": "hologram:GrantAssumeRole",
                  "Resource": "<arn账号>"
                }
              ]
            }
          2. 身份管理 > 用户页面,单击目标RAM用户操作列中的添加权限,为RAM用户(子账号)授予上述步骤已创建的权限策略。具体操作,请参见RAM用户授权

        2. 为已创建的RAM角色授权。

          修改RAM角色的信任策略。重点需更新如下参数:

          • Action:更新为sts:AssumeRole

          • Service:更新为hologres.aliyuncs.com

          {
            "Statement": [
              {
                "Action": "sts:AssumeRole",
                "Effect": "Allow",
                "Principal": {
                  "RAM": [
                    "acs:ram::1866xxxx:root"
                  ],
                  "Service": [
                    "hologres.aliyuncs.com"
                  ]
                }
              }
            ],
            "Version": "1"
          }
  3. PDF文件进行EmbeddingChunk。

    需要创建Object TableDynamic TablePDF的元数据读取以及加工。因为流程较长,Hologres直接将其封装为存储过程。该存储过程包括的能力如下:

    • 会创建一张Object Table,用于取PDF的元数据。

    • 创建一张增量刷新模式的Dynamic Table结果表,用于存储加工后的数据。同时,该表需设置向量索引和全文索引,且Dynamic Table不设置自动刷新,需要手动刷新。

    • Dynamic Table的刷新过程中会使用ai_embedai_chunk对数据进行Embed和切片。

    该存储过程代码如下:

    CALL create_rag_corpus_from_oss(
        oss_path => 'oss://xxxx/bs_challenge_financial_14b_dataset/pdf',
        oss_endpoint => 'oss-cn-hangzhou-internal.aliyuncs.com',
        oss_role_arn => 'acs:ram::186xxxx:role/xxxx',
        corpus_table => 'public.dt_bs_challenge_financial'
    );
  4. 刷新结果表。

    通过如上存储过程创建的Object TableDynamic Table均需手动刷新,才能完成数据加工。该步骤已被封装为PDF加工存储过程,该存储过程包括的能力如下:

    • 刷新一次Object Table获取PDF元数据

    • 刷新一次Dynamic Table,进行PDFEmbeddingChunk加工。

    该存储过程使用代码如下:

    CALL refresh_rag_corpus_table(
        corpus_table => 'public.dt_bs_challenge_financial'
    );
  5. PDF检索。

    加工好的数据可以根据业务使用场景,通过向量、全文等方式进行检索。例如:可以根据招股书来查询某个公司的业绩走势,以此来判断公司后续的走势是悲观还是乐观,以便对后续的投资意向提供辅助建议。

    向量检索

    在向量检索时,为了检索方便,我们将问题EmbeddingPrompt构建,大模型输出答案等过程封装成为向量检索存储过程,直接调用如下该存储过程可以实现向量召回。

    -- 向量单路召回 + AI重排
    SELECT qa_retrieval(
    question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?',
    vector_index => 'dt_bs_challenge_financial.embedding_vector',
      prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}'
    )

    检索答案如下:

    qa_retrieval
    ---------
    "根据提供的信息,对湖南国科微电子股份有限公司的业绩走势进行分析,可以得出以下结论:
    
    ### 一、业绩走势分析:悲观
    
    #### 1. **营业收入增长乏力**
    - 2014年度营业收入较上年增长 **15.13%**,但2015年度营业收入却 **下降5.21%**,2016年度数据未提供,但可以看出营业收入增长趋势在2015年出现明显下滑。
    - 2012年至2014年营业收入的年复合增长率仅为 **4.47%**,表明公司业务扩张较为缓慢。
    
    #### 2. **净利润增长持续下降**
    - 2014年度净利润增长 **5.43%**,2015年度净利润 **下降3.29%**。
    - 扣除非经常性损益后,2014年度归属于母公司股东的净利润增长 **-3.14%**,2015年度进一步下降 **-5.60%**,表明公司主营业务盈利能力在持续恶化。
    - 2012年至2014年扣除非经常性损益后净利润的年复合增长率为 **-4.38%**,远低于营业收入增长,说明公司主营业务盈利能力不足,增长主要依赖非经常性损益。
    
    #### 3. **非经常性损益占比偏高**
    - 报告期内,非经常性损益占净利润的比例较高,2014年、2013年、2012年分别为 **17.54%、10.25%、8.06%**,表明公司利润中有一部分来自政策扶持、政府补贴等非经常性因素,而非核心业务的持续增长。
    - 依赖非经常性损益来维持利润增长,不利于公司长期的稳定发展。
    
    #### 4. **净资产收益率下降**
    - 加权平均净资产收益率从2014年的 **18.10%** 下降到2015年的 **24.82%**,再到2016年的 **28.23%**,虽然数据看似增长,但需注意该指标是以扣除非经常性损益后的净利润计算的,而净利润本身在下降,因此这种增长可能与资本结构变化有关,而非盈利能力的实质性提升。
    
    ### 二、原因总结
    1. **主营业务增长乏力**:营业收入和净利润增长均呈现下降趋势,尤其是净利润的下降表明公司盈利能力在减弱。
    2. **非经常性损益依赖度高**:公司利润中非经常性损益占比较高,说明主营业务的盈利能力不足,公司业绩的持续性存疑。
    3. **市场竞争激烈**:公司采购的工控机、显示器、电源等产品市场竞争激烈,价格平稳,利润空间受到挤压。
    4. **行业环境影响**:不锈钢市场价格波动、原材料价格波动可能对公司经营业绩造成一定影响,虽然公司已采取措施降低影响,但长期来看仍需关注。
    
    ### 三、结论
    总体来看,湖南国科微电子股份有限公司的业绩走势偏 **悲观**。公司主营业务增长乏力,净利润持续下降,对非经常性损益依赖度高,未来盈利能力的可持续性存疑。公司需要加强核心业务的竞争力,优化成本结构,提高主营业务盈利能力,以实现长期稳健发展。"

    全文检索

    在全文检索时,为了检索方便,我们将问题EmbeddingPrompt构建,大模型输出答案等过程封装为全文检索存储过程,直接调用如下存储过程可以实现全文召回:

    --全文检索召回
    SELECT qa_text_search_retrieval(
        question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?',
        text_search_index => 'dt_bs_challenge_financial.chunk',
        prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}'
    );

    检索答案如下:

    qa_text_search_retrieval
    ----------------
    "根据提供的信息,湖南国科微电子股份有限公司在2014年、2015年和2016年的业绩走势整体上呈现**悲观**趋势,具体原因如下:
    
    ### 1. **营业收入增长乏力**
    - 2014年营业收入增长率为**15.13%**,但2015年营业收入增长率转为**-5.21%**,即出现负增长。
    - 2012年至2014年的营业收入年复合增长率仅为**4.47%**,说明公司营业收入增长较为缓慢,业务发展不够强劲。
    - 2015年上半年营业收入预测与2014年同期相比大致持平,但2015年上半年净利润较上年同期**略有下降**,表明盈利能力下降。
    
    ### 2. **净利润和扣非净利润增长不佳**
    - 2014年净利润增长率为**5.43%**,2015年净利润增长率下降为**-3.29%**,即净利润出现下滑。
    - 扣除非经常性损益后的净利润增长率在2014年为**-3.14%**,2015年进一步下降为**-5.60%**,说明公司主营业务的盈利能力持续下降。
    - 2012年至2014年扣非净利润的年复合增长率为**-4.38%**,明显低于营业收入的年复合增长率,说明公司利润质量不高,主营业务盈利能力较弱。
    
    ### 3. **经营活动现金流波动**
    - 2014年销售商品、提供劳务收到的现金占营业收入的比例较前两年有所下降,主要与部分收入确认项目的**收款跨期**有关,说明公司现金流管理存在问题。
    - 2013年购买商品、接受劳务支付的现金占营业成本比例较高,主要是由于当年**采购原材料并完成生产**,但部分成本在2014年才结转,导致2014年该比例较低,反映公司采购和生产节奏不够稳定。
    
    ### 4. **投资和盈利能力指标**
    - 加权平均净资产收益率(ROE)在2014年为**18.10%**,2015年上升至**24.82%**,2016年进一步上升至**28.23%**,虽然有所提升,但ROE的提高可能主要依赖**财务杠杆**,而非核心业务盈利能力的提升。
    - 考虑到净利润和扣非净利润持续下降,ROE的提升并不能完全反映公司经营质量的改善。
    
    ### 5. **2015年上半年业绩预测**
    - 2015年上半年营业收入预计为**8,505万元至10,395万元**,与2014年同期的**10,127.35万元**大致持平,但净利润预计为**2,340万元至2,860万元**,低于2014年同期的**2,912.66万元**,说明公司盈利能力下降。
    
    ### 总结
    综合来看,湖南国科微电子股份有限公司在2014年至2016年的业绩走势**偏向悲观**。虽然ROE有所提升,但营业收入增长乏力、净利润和扣非净利润持续下降、经营活动现金流波动较大,表明公司主营业务盈利能力较弱,经营质量有待提升。"

    向量+全文混合检索

    在向量、全文结合Rank排序混合检索场景中,为了检索方便,Hologres将其封装为向量+全文双路召回+Rank排序存储过程,该存储过程的能力如下:

    • 根据问题使用向量计算,召回TOP 20的答案。

    • 根据问题使用全文检索,召回TOP 20的答案。

    • 使用ai_rank,对向量和全文召回的答案进行排序,最后输出Top1的答案。

    • 使用ai_gen,结合大模型根据提示词以及检索的答案,生成最终答案并进行输出。

    -- 全文、向量双路召回 + AI重排
    SELECT qa_hybrid_retrieval(
        question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?',
        vector_index => 'dt_bs_challenge_financial.embedding_vector',
        text_search_index => 'dt_bs_challenge_financial.chunk',
         prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}'
    );

    检索答案如下:

    qa_hybrid_retrieval
    ---
    "根据提供的信息,我们可以对湖南国科微电子股份有限公司的业绩走势进行如下分析,并判断其趋势是悲观还是乐观:
    
    ---
    
    ### 一、**营业收入走势分析**
    1. **2012-2014年复合增长率**:
       - 营业收入的年复合增长率为 **4.47%**,表明公司营业收入的增长较为平稳。
       - 2014年营业收入为 **18,154.06万元**,较2013年增长 **15.13%**。
       - 2015年营业收入较上年 **下降5.21%**,出现了负增长。
    
    2. **结论**:
       - 营业收入的增长在2014年有所回升,但2015年出现明显下滑,表明公司业务扩张遇到了一定阻力。
    
    ---
    
    ### 二、**净利润走势分析**
    1. **2012-2014年复合增长率**:
       - 扣除非经常性损益后的净利润年复合增长率为 **-4.38%**,低于营业收入的年复合增长率,说明公司盈利能力有所下降。
       - 2014年扣非净利润为 **42,731,071.18元**,较2013年下降 **3.14%**。
       - 2015年扣非净利润较上年进一步下降 **5.60%**。
    
    2. **非经常性损益影响**:
       - 2014年、2013年及2012年非经常性损益占净利润的比例分别为 **17.54%、10.25%、8.06%**,呈上升趋势。
       - 非经常性损益的增加主要来自于政府补贴和理财产品收益,而非主营业务带来的持续增长。
    
    3. **结论**:
       - 扣非净利润连续两年下降,说明公司主营业务盈利能力减弱,业绩增长依赖于非经常性损益,这是令人担忧的信号。
    
    ---
    
    ### 三、**现金流量与经营稳定性**
    1. **经营活动现金流**:
       - 2014年营业收入为 **18,154.06万元**,但销售商品、提供劳务收到的现金并未明确给出,无法判断现金流是否健康。
       - 报告期内,公司银行存款分别为 **13,063.38万元、4,152.54万元、9,864.61万元**,资金流动性波动较大,但主要客户、供应商及经营模式保持稳定。
    
    2. **结论**:
       - 虽然公司现金流存在波动,但客户和供应商稳定,经营模式未发生重大变化,这为公司未来的发展提供了一定保障。
    
    ---
    
    ### 四、**2015年上半年业绩预测**
    - 201516月预计营业收入为 **8,505万元至10,395万元**,较2014年同期的 **4,641.19万元**有明显增长。
    - 但201513月净利润较上年同期下降 **48.26%**,主要是因为确认收入的项目毛利率较低。
    
    ---
    
    ### 五、**综合分析与判断**
    1. **乐观因素**:
       - 2014年营业收入增长较快,达到 **15.13%**。
       - 2015年上半年营业收入预计增长显著,表明公司可能正在逐步恢复。
       - 主要客户、供应商及经营模式保持稳定,为公司提供了良好的运营基础。
    
    2. **悲观因素**:
       - 2015年营业收入较上年 **下降5.21%**,净利润也出现下滑。
       - 扣非净利润连续两年下降,表明公司主营业务盈利能力不足。
       - 非经常性损益占比上升,业绩增长依赖于政府补贴和理财产品收益,缺乏内生增长动力。
       - 201513月净利润大幅下滑 **48.26%**,表明短期业绩波动较大。
    
    ---
    
    ### **最终结论:整体趋势偏悲观**
    - 尽管公司在2014年营业收入有所回升,且2015年上半年预计增长,但 **扣非净利润连续下降**、**净利润增长依赖非经常性损益**、**短期业绩波动较大**,表明公司目前的业绩增长缺乏持续性和稳定性。
    - 因此,从长期来看,公司业绩走势偏 **悲观**,需关注其主营业务盈利能力的改善和非经常性损益的依赖问题。
    
    ---
    
    ### **建议**
    1. 关注公司未来主营业务的盈利能力是否能有所提升。
    2. 降低对非经常性损益的依赖,提高内生增长动力。
    3. 稳定客户和供应商关系,优化业务结构,提高毛利率。"

    向量+全文双路召回+RRF排序

    使用向量和全文检索后,通过RRF (Reciprocal Rank Fusion)排序召回结果。为了检索方便,Hologres已经封装为向量+全文双路召回+RRF排序存储过程(详细定义见下方附录),该存储过程的能力如下:

    • 根据问题使用向量计算,召回TOP 20的答案。

    • 根据问题使用全文检索,召回TOP 20的答案。

    • 对向量和全文召回的答案,计算RRF分数,最后输出Top N的答案。

    • 使用ai_gen、大模型根据提示词,以及检索的答案,拼装成最终答案并输出。

    -- 全文、向量双路召回 + RRF重排
    SELECT qa_hybrid_retrieval_rrf(
        question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?',
        vector_index => 'dt_bs_challenge_financial.embedding_vector',
        text_search_index => 'dt_bs_challenge_financial.chunk'
    );

    检索答案如下:

    qa_hybrid_retrieval_rrf
    ------------------
    "根据提供的信息,对湖南国科微电子股份有限公司的业绩走势进行分析,可以得出以下结论:
    
    ### **业绩走势判断:悲观**
    #### **原因分析如下:**
    
    1. **净利润增长低于营业收入增长:**
       - 提供的信息指出,公司2012年至2014年的**营业收入年复合增长率为4.47%**,表明公司整体业务增长较为平稳。
       - 但**扣除非经常性损益后归属于母公司股东的净利润年复合增长率为-4.38%**,明显低于营业收入的增长率。这说明公司在收入增长的同时,盈利能力并未同步提升,甚至出现下滑,可能受到成本上升、毛利率下降或非经常性损益减少等因素影响。
    
    2. **净利润波动较大:**
       - 201513月的净利润较上年同期减少了48.26%,且主要原因在于确认收入的项目毛利率较低(如无锡地铁一号线项目以模块外购为主)。这表明公司短期业绩容易受到业务结构变化的影响,存在一定的不稳定性。
    
    3. **毛利率和盈利能力下降:**
       - 提到“主营业务利润对公司净利润的贡献”在2014年较2013年略有下降,而2013年又较2012年下降。这说明公司核心业务的盈利能力可能在减弱,可能受到市场竞争加剧、成本上升或产品结构变化的影响。
    
    4. **2015年上半年预测利润下降:**
       - 201516月预计营业收入为8,505万元至10,395万元,与2014年同期大致持平,但净利润预计为2,340万元至2,860万元,低于2014年同期的2,912.66万元。这表明公司盈利能力在进一步下降,可能面临一定的经营压力。
    
    5. **业务增长乏力:**
       - 虽然营业收入增长较为平稳,但净利润的下降表明公司业务增长的质量不高,未能有效转化为利润。这可能影响投资者对公司未来发展的信心。
    
    ### **总结:**
    湖南国科微电子股份有限公司的业绩走势整体偏向**悲观**。虽然营业收入保持了平稳增长,但净利润的增长明显滞后甚至出现负增长,表明公司盈利能力在下降,业务发展质量不高,且存在短期业绩波动的风险。如果公司不能有效提升毛利率、控制成本或优化产品结构,未来业绩可能继续承压。"

附录:存储过程定义

上述文档中使用的存储过程定义如下,方便您做参考。

说明

Hologres中不支持创建Function,如下存储过程仅做参考,无法修改后直接执行。

PDF加工存储过程

  • 创建Object TableDynamic Table

    CREATE OR REPLACE PROCEDURE create_rag_corpus_from_oss(
        oss_path TEXT,
        oss_endpoint TEXT,
        oss_role_arn TEXT,
        corpus_table TEXT,
        embedding_model TEXT DEFAULT NULL,
        parse_document_model TEXT DEFAULT NULL,
        chunk_model TEXT DEFAULT NULL,
        chunk_size INT DEFAULT 300,
        chunk_overlap INT DEFAULT 50,
        overwrite BOOLEAN DEFAULT FALSE
    )
    AS $$
    DECLARE
        corpus_schema TEXT;
        corpus_name TEXT;
        obj_table_name TEXT;
        full_corpus_ident TEXT;
        full_obj_ident TEXT;
        embed_expr TEXT;
        chunk_expr TEXT;
        parse_expr TEXT;
    BEGIN
        -- 1. 拆 schema name + table name
        IF position('.' in corpus_table) > 0 THEN
            corpus_schema := split_part(corpus_table, '.', 1);
            corpus_name   := split_part(corpus_table, '.', 2);
        ELSE
            corpus_schema := NULL;
            corpus_name   := corpus_table;
        END IF;
    
        obj_table_name := corpus_name || '_obj_table';
    
        IF corpus_schema IS NULL THEN
            full_corpus_ident := format('%I', corpus_name);
            full_obj_ident    := format('%I', obj_table_name);
        ELSE
            full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
            full_obj_ident    := format('%I.%I', corpus_schema, obj_table_name);
        END IF;
        
        -- 2. 如果需要覆盖,先删表和索引
        IF overwrite THEN
            DECLARE
                dyn_table_exists BOOLEAN;
                rec RECORD;
            BEGIN
                -- 检查 dynamic table 是否存在
                SELECT EXISTS (
                    SELECT 1
                    FROM pg_class c
                    JOIN pg_namespace n ON n.oid = c.relnamespace
                    WHERE c.relname = corpus_name
                    AND n.nspname = COALESCE(corpus_schema, 'public')
                )
                INTO dyn_table_exists;
    
                IF dyn_table_exists THEN
                    -- 2.1 关闭动态表自动刷新
                    RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident;
                    EXECUTE format(
                        'ALTER TABLE %s SET (auto_refresh_enable=false)',
                        full_corpus_ident
                    );
    
                    -- 2.2 查找 RUNNING 刷新任务并取消
                    FOR rec IN
                        EXECUTE format(
                            $f$
                            SELECT query_job_id
                                FROM hologres.hg_dynamic_table_refresh_log(%L)
                                WHERE status = 'RUNNING';
                            $f$,
                            corpus_table
                        )
                    LOOP
                        RAISE NOTICE 'Found running refresh job: %', rec.query_job_id;
                        IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN
                            RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id;
                        ELSE
                            RAISE WARNING 'Cancel job % failed.', rec.query_job_id;
                        END IF;
                    END LOOP;
    
                    -- 2.3 删除 Dynamic Table
                    EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident);
                ELSE
                    RAISE NOTICE 'Dynamic table % does not exist, skip cancel job and drop.', full_corpus_ident;
                END IF;
    
                -- 2.4 无论如何,Object Table 都要删除
                EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident);
            END;
        END IF;
    
        -- 3. 创建 Object Table
        RAISE NOTICE 'Create object table: %', obj_table_name;
        EXECUTE format(
            $f$
            CREATE OBJECT TABLE %s
            WITH (
                path = %L,
                oss_endpoint = %L,
                role_arn = %L
            );
            $f$,
            full_obj_ident,
            oss_path,
            oss_endpoint,
            oss_role_arn
        );
    
        COMMIT;
    
        -- 4. 刷新 Object Table
        RAISE NOTICE 'Refresh object table: %', obj_table_name;
        EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);
    
        COMMIT;
    
        -- 5. 文档解析模型选择
        IF parse_document_model IS NULL OR length(trim(parse_document_model)) = 0 THEN
            parse_expr := 'ai_parse_document(file, ''auto'', ''markdown'')';
        ELSE
            parse_expr := format(
                'ai_parse_document(%L, file, ''auto'', ''markdown'')',
                parse_document_model
            );
        END IF;
    
        -- 6. chunk 模型选择
        IF chunk_model IS NULL OR length(trim(chunk_model)) = 0 THEN
            chunk_expr := format('ai_chunk(doc, %s, %s)', chunk_size, chunk_overlap);
        ELSE
            chunk_expr := format(
                'ai_chunk(%L, doc, %s, %s)',
                chunk_model,
                chunk_size,
                chunk_overlap
            );
        END IF;
    
        -- 7. embedding 模型选择
        IF embedding_model IS NULL OR length(trim(embedding_model)) = 0 THEN
            embed_expr := 'ai_embed(chunk)';
        ELSE
            embed_expr := format(
                'ai_embed(%L, chunk)',
                embedding_model
            );
        END IF;
    
        -- 8. 创建 RAG 输出动态表
        RAISE NOTICE 'create dynamic table: %', corpus_name;
        EXECUTE format(
            $f$
            CREATE DYNAMIC TABLE %s
            WITH (
                auto_refresh_enable = FALSE ,
                auto_refresh_mode = 'incremental',
                freshness = '5 minutes'
            ) AS
            WITH parsed_doc AS (
                SELECT object_uri,
                       etag,
                       %s AS doc
                  FROM %s
            ),
            chunked_doc AS (
                SELECT object_uri,
                       etag,
                       unnest(%s) AS chunk
                  FROM parsed_doc
            )
            SELECT
                object_uri,
                etag,
                chunk,
                %s AS embedding_vector
              FROM chunked_doc;
            $f$,
            full_corpus_ident,
            parse_expr,
            full_obj_ident,
            chunk_expr,
            embed_expr
        );
        COMMIT;
    
        -- 9. 创建全文索引(索引名 = 表名 || '_fulltext_idx')
        EXECUTE format(
            'CREATE INDEX %I ON %s USING FULLTEXT (chunk);',
            corpus_name || '_fulltext_idx',
            full_corpus_ident
        );
    
        RAISE NOTICE '';
        RAISE NOTICE 'Create RAG corpus success to table: %', corpus_table;
        RAISE NOTICE '    Vector index is: %.embedding_vector', corpus_table;
        RAISE NOTICE '    TextSearch index is: %.chunk', corpus_table;
    END;
    $$ LANGUAGE plpgsql;
    
    
  • 刷新Object TableDynamic Table存储过程

    CREATE OR REPLACE PROCEDURE refresh_rag_corpus_table(
        corpus_table TEXT
    )
    AS $$
    DECLARE
        corpus_schema TEXT;
        corpus_name   TEXT;
        obj_table_name TEXT;
        full_corpus_ident TEXT;
        full_obj_ident    TEXT;
    BEGIN
        -- 1. 解析 schema 和表名
        IF position('.' in corpus_table) > 0 THEN
            corpus_schema := split_part(corpus_table, '.', 1);
            corpus_name   := split_part(corpus_table, '.', 2);
        ELSE
            corpus_schema := NULL;
            corpus_name   := corpus_table;
        END IF;
    
        obj_table_name := corpus_name || '_obj_table';
    
        IF corpus_schema IS NULL THEN
            full_corpus_ident := format('%I', corpus_name);
            full_obj_ident    := format('%I', obj_table_name);
        ELSE
            full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
            full_obj_ident    := format('%I.%I', corpus_schema, obj_table_name);
        END IF;
    
        -- 2. 刷新 Object Table
        RAISE NOTICE 'Refreshing Object Table: %', obj_table_name;
        EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);
    
        -- 3. 刷新 Dynamic Table
        RAISE NOTICE 'Refreshing Dynamic Table: %', corpus_name;
        EXECUTE format('REFRESH TABLE %s;', full_corpus_ident);
    
        RAISE NOTICE 'Refresh complete for corpus table %', corpus_table;
    END;
    $$ LANGUAGE plpgsql;
  • 删除Object TableDynamic Table存储过程

    CREATE OR REPLACE PROCEDURE drop_rag_corpus_table(
        corpus_table TEXT
    )
    AS $$
    DECLARE
        corpus_schema TEXT;
        corpus_name   TEXT;
        obj_table_name TEXT;
        full_corpus_ident TEXT;
        full_obj_ident    TEXT;
        rec RECORD;
    BEGIN
        -- 1. 解析 schema 和表名
        IF position('.' in corpus_table) > 0 THEN
            corpus_schema := split_part(corpus_table, '.', 1);
            corpus_name   := split_part(corpus_table, '.', 2);
        ELSE
            corpus_schema := NULL;
            corpus_name   := corpus_table;
        END IF;
    
        obj_table_name := corpus_name || '_obj_table';
    
        IF corpus_schema IS NULL THEN
            full_corpus_ident := format('%I', corpus_name);
            full_obj_ident    := format('%I', obj_table_name);
        ELSE
            full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
            full_obj_ident    := format('%I.%I', corpus_schema, obj_table_name);
        END IF;
    
        -- 2. 删除表
        -- 2.1 关闭动态表自动刷新
        RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident;
        EXECUTE format('ALTER TABLE IF EXISTS %s SET (auto_refresh_enable=false)', full_corpus_ident);
    
        -- 2.2 查找 RUNNING 刷新任务并取消
        FOR rec IN
            EXECUTE format(
                $f$
                SELECT query_job_id
                    FROM hologres.hg_dynamic_table_refresh_log(%L)
                    WHERE status = 'RUNNING';
                $f$,
                corpus_table
            )
        LOOP
            RAISE NOTICE 'Found running refresh job: %', rec.query_job_id;
            IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN
                RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id;
            ELSE
                RAISE WARNING 'Cancel job % failed.', rec.query_job_id;
            END IF;
        END LOOP;
    
        -- 2.3 删除 Dynamic Table
        RAISE NOTICE 'Dropping Object Table: %', corpus_name;
        EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident);
    
        -- 2.4 删除 Object Table
        RAISE NOTICE 'Dropping Object Table: %', obj_table_name;
        EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident);
    
        RAISE NOTICE 'Drop complete for corpus: %', corpus_table;
    END;
    $$ LANGUAGE plpgsql;

向量检索存储过程

-- RAG向量单路召回问答
CREATE OR REPLACE FUNCTION qa_retrieval(
    question TEXT,
    vector_index TEXT,
    embedding_model TEXT DEFAULT NULL,
    llm_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT '请根据参考信息回答以下问题:${question}\n\n 参考信息:\n\n ${context}',
    vector_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    embedding_table TEXT;
    embedding_col TEXT;
    sql TEXT;
    embedding_expr TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    llm_model_valid BOOLEAN;
BEGIN
    -- 解析 vector_index
    SELECT table_path, column_name
    INTO embedding_table, embedding_col
    FROM parse_index(vector_index);

    -- embedding表达式处理
    IF embedding_model IS NULL OR trim(embedding_model) = '' THEN
        embedding_expr := 'ai_embed(questions.query_text)';
    ELSE
        embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', questions.query_text)';
    END IF;

    -- LLM模型处理
    llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');

    -- ai_rank表达式
    IF llm_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(llm_model) || ', questions.query_text, chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(questions.query_text, chunk)';
    END IF;

    -- ai_gen表达式:通过模板替换生成完整 prompt
    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(' || quote_literal(prompt) || ', ''${question}'', questions.query_text), ''${context}'', merged_chunks) )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(' || quote_literal(prompt) || ', ''${question}'', questions.query_text), ''${context}'', merged_chunks))';
    END IF;

    -- 动态SQL
    sql := '
    WITH
      questions AS (
        SELECT ' || quote_literal(question) || ' AS query_text
      ),
      embedding_recall AS (
        SELECT
          chunk,
          approx_cosine_distance(' || embedding_col || ', ' || embedding_expr || ') AS distance
        FROM
          ' || embedding_table || ', questions
        ORDER BY
          distance DESC
        LIMIT ' || vector_recall_count || '
      ),
      rerank AS (
        SELECT
          chunk,
          ' || ai_rank_expr || ' AS score
        FROM
          embedding_recall, questions
        ORDER BY
          score DESC
        LIMIT ' || rerank_recall_count || '
      ),
      concat_top_chunks AS (
        SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
      )
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks, questions;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;

全文检索存储过程

CREATE OR REPLACE FUNCTION qa_text_search_retrieval(
    question TEXT,
    text_search_index TEXT,
    llm_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT '请根据参考信息回答以下问题:${question}\n\n参考信息:\n\n${context}',
    text_search_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    text_search_table TEXT;
    text_search_col TEXT;
    sql TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    llm_model_valid BOOLEAN;
BEGIN
    -- 解析 text_search_index
    SELECT table_path, column_name
    INTO text_search_table, text_search_col
    FROM parse_index(text_search_index);

    -- LLM 模型处理
    llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');

    -- ai_rank 表达式
    IF llm_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(llm_model) || ', questions.query_text, chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(questions.query_text, chunk)';
    END IF;

    -- ai_gen 表达式
    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(' || quote_literal(prompt) || 
              ', ''${question}'', questions.query_text), ''${context}'', merged_chunks) )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(' || quote_literal(prompt) || 
              ', ''${question}'', questions.query_text), ''${context}'', merged_chunks))';
    END IF;

    -- 动态 SQL
    sql := '
    WITH
      questions AS (
        SELECT ' || quote_literal(question) || ' AS query_text
      ),
      -- 全文搜索召回
      text_search_recall AS (
        SELECT
          chunk
        FROM
          ' || text_search_table || ', questions
        ORDER BY
          text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
        LIMIT ' || text_search_recall_count || '
      ),
      -- rerank
      rerank AS (
        SELECT
          chunk,
          ' || ai_rank_expr || ' AS score
        FROM
          text_search_recall, questions
        ORDER BY
          score DESC
        LIMIT ' || rerank_recall_count || '
      ),
      concat_top_chunks AS (
        SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
      )
    SELECT ' || ai_gen_expr || '
    FROM concat_top_chunks, questions;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;

向量+全文双路召回+Rank排序存储过程

-- 全文、向量双路召回 + rerank
CREATE OR REPLACE FUNCTION qa_hybrid_retrieval(
    question TEXT,
    vector_index TEXT,
    text_search_index TEXT,
    embedding_model TEXT DEFAULT NULL,
    llm_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT '请根据参考信息回答以下问题:${question}\n\n 参考信息:\n\n ${context}',
    text_search_recall_count INT DEFAULT 20,
    vector_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    embedding_table TEXT;
    embedding_col TEXT;
    text_search_table TEXT;
    text_search_col TEXT;
    sql TEXT;
    embedding_expr TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    llm_model_valid BOOLEAN;
BEGIN
    -- 解析 vector_index
    SELECT table_path, column_name
    INTO embedding_table, embedding_col
    FROM parse_index(vector_index);

    -- 解析 text_search_index
    SELECT table_path, column_name
    INTO text_search_table, text_search_col
    FROM parse_index(text_search_index);

    -- embedding表达式处理
    IF embedding_model IS NULL OR trim(embedding_model) = '' THEN
        embedding_expr := 'ai_embed(questions.query_text)';
    ELSE
        embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', questions.query_text)';
    END IF;

    -- LLM模型处理
    llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');

    -- ai_rank表达式
    IF llm_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(llm_model) || ', questions.query_text, chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(questions.query_text, chunk)';
    END IF;

    -- ai_gen表达式:通过模板替换生成完整 prompt
    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(' || quote_literal(prompt) || ', ''${question}'', questions.query_text), ''${context}'', merged_chunks) )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(' || quote_literal(prompt) || ', ''${question}'', questions.query_text), ''${context}'', merged_chunks))';
    END IF;

    -- 构造动态SQL(双路召回 union 后再 rerank)
    sql := '
    WITH
      questions AS (
        SELECT ' || quote_literal(question) || ' AS query_text
      ),
      -- 向量召回
      embedding_recall AS (
        SELECT
          chunk
        FROM
          ' || embedding_table || ', questions
        ORDER BY
          approx_cosine_distance(' || embedding_col || ', ' || embedding_expr || ') DESC
        LIMIT ' || vector_recall_count || '
      ),
      -- 全文搜索召回
      text_search_recall AS (
        SELECT
          chunk
        FROM
          ' || text_search_table || ', questions
        ORDER BY
          text_search(' || text_search_col || ',  ' || quote_literal(question) || ') DESC
        LIMIT ' || text_search_recall_count || '
      ),
      -- 合并去重
      union_recall AS (
        SELECT chunk FROM embedding_recall
        UNION
        SELECT chunk FROM text_search_recall
      ),
      -- rerank
      rerank AS (
        SELECT
          chunk,
          ' || ai_rank_expr || ' AS score
        FROM
          union_recall, questions
        ORDER BY
          score DESC
        LIMIT ' || rerank_recall_count || '
      ),
      concat_top_chunks AS (
        SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
      )
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks, questions;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;

向量+全文双路召回+RRF排序存储过程

-- 全文、向量双路召回 + RRF rerank (Reciprocal Rank Fusion)
CREATE OR REPLACE FUNCTION qa_hybrid_retrieval_rrf(
    question TEXT,
    vector_index TEXT,
    text_search_index TEXT,
    embedding_model TEXT DEFAULT NULL,
    llm_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT '请根据参考信息回答以下问题:${question}\n\n 参考信息:\n\n ${context}',
    text_search_recall_count INT DEFAULT 20,
    vector_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5,
    rrf_k INT DEFAULT 60
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    embedding_table TEXT;
    embedding_col TEXT;
    text_search_table TEXT;
    text_search_col TEXT;
    sql TEXT;
    embedding_expr TEXT;
    ai_gen_expr TEXT;
    llm_model_valid BOOLEAN;
BEGIN
    -- 解析 vector_index
    SELECT table_path, column_name
    INTO embedding_table, embedding_col
    FROM parse_index(vector_index);

    -- 解析 text_search_index
    SELECT table_path, column_name
    INTO text_search_table, text_search_col
    FROM parse_index(text_search_index);

    -- 构造 embedding 调用表达式
    IF embedding_model IS NULL OR trim(embedding_model) = '' THEN
        embedding_expr := 'ai_embed(questions.query_text)';
    ELSE
        embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', questions.query_text)';
    END IF;

    -- 检查是否指定了 LLM 模型
    llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');

    -- 构造 ai_gen 调用表达式(替换 prompt 模板里的变量)
    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(' || quote_literal(prompt) ||
            ', ''${question}'', questions.query_text), ''${context}'', merged_chunks) )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(' || quote_literal(prompt) ||
            ', ''${question}'', questions.query_text), ''${context}'', merged_chunks))';
    END IF;

    -- 拼 SQL
    sql := '
    WITH
      questions AS (
        SELECT ' || quote_literal(question) || ' AS query_text
      ),

      -- 向量检索召回 + 排名(approx_cosine_distance 调用一次)
      embedding_recall AS (
        SELECT
          chunk,
          vec_score,
          ROW_NUMBER() OVER (ORDER BY vec_score DESC) AS rank_vec
        FROM (
          SELECT
            chunk,
            approx_cosine_distance(' || embedding_col || ', ' || embedding_expr || ') AS vec_score
          FROM
            ' || embedding_table || ', questions
        ) t
        ORDER BY vec_score DESC
        LIMIT ' || vector_recall_count || '
      ),

      -- 全文检索召回 + 排名(score 已有,不重复算)
      text_search_recall AS (
        SELECT
          chunk,
          text_score,
          ROW_NUMBER() OVER (ORDER BY text_score DESC) AS rank_text
        FROM (
          SELECT
            chunk,
            text_search(' || text_search_col || ', ' || quote_literal(question) || ') AS text_score
          FROM
            ' || text_search_table || '
        ) ts
        WHERE text_score > 0
        ORDER BY text_score DESC
        LIMIT ' || text_search_recall_count || '
      ),

      -- 计算 RRF 分数
      rrf_scores AS (
        SELECT
          chunk,
          SUM(1.0 / (' || rrf_k || ' + rank_val)) AS rrf_score
        FROM (
          SELECT chunk, rank_vec AS rank_val FROM embedding_recall
          UNION ALL
          SELECT chunk, rank_text AS rank_val FROM text_search_recall
        ) sub
        GROUP BY chunk
      ),

      -- 取融合后的 top N
      top_chunks AS (
        SELECT chunk
        FROM rrf_scores
        ORDER BY rrf_score DESC
        LIMIT ' || rerank_recall_count || '
      ),

      -- 合并上下文
      concat_top_chunks AS (
        SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks
        FROM top_chunks
      )

      -- 生成答案
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks, questions;
    ';

    -- 执行并返回
    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;