多源 RAG 处理方案:为 AI 场景提供高效数据管道

手动部署

50

方案概览

通过EventBridge构建高效的ETL(Extract, Transform, Load)数据管道:从多种数据源实时提取数据,灵活定义数据转换的逻辑,将处理结果持续加载到多种类型的目标(例如向量数据库),为AI推理和构建大模型RAG(Retrieval-Augmented Generation)应用提供高质量的知识库与实时数据。

方案架构

方案提供的默认设置完成部署后在阿里云上搭建的系统如下图所示。实际部署时您可以根据资源规划修改部分设置,但最终形成的运行环境与下图相似。

image

本文将演示如何从01构建基于文本索引和向量检索的实时RAG应用。具体来说,我们将结合EventBridge事件流,对象存储(OSS),函数计算(FC),向量检索服务(DashVector),以及 大模型服务平台百炼Embedding API,构建ETL数据管道,然后向OSS中存储的文本文档动态插入数据,进行实时的文本语义搜索,查询相似度最高的内容。

本方案的技术架构包括以下基础设施和云服务:

基础设施和云服务

说明

1个事件流

用于构建数据管道。

1个对象存储OSS Bucket

用于存储文本文件,作为ETL的数据源。

1个向量检索服务DashVector Cluster

用于存储转换后的向量数据,作为ETL的数据目标,以及RAG应用的知识库。

1个函数计算的事件函数

用于自定义ETL的数据转换逻辑,将文本数据源转换为向量。

1个函数计算的Web函数

用于验证RAG应用的检索召回效果。

部署准备

开始部署前,请按以下指引完成账号申请、账号充值等准备工作。

准备账号

  1. 如果您还没有阿里云账号,请访问阿里云账号注册页面,根据页面提示完成注册。阿里云账号是您使用云资源的付费实体,因此是部署方案的必要前提。

  2. 为阿里云账号充值。本方案的云资源支持按量付费,且默认设置均采用按量付费引导操作。如果确定任何一个云资源采用按量付费方式部署,账户余额都必须大于等于100元。

资源开通

  1. 登录阿里云百炼大模型服务平台,阅读并同意协议后,将自动开通阿里云百炼,如果未弹出服务协议,则表示您已经开通。

    • 当您首次开通阿里云百炼时,平台会自动为您发放各模型的新人专属免费额度。详情请参见新人免费额度

      说明

      仅中国大陆版(北京)模型有免费额度,国际版(新加坡)模型无免费额度。

    • 如果开通服务时提示“您尚未进行实名认证”,请先进行个人实名认证

  2. 如果是首次使用函数计算,请先开通函数计算服务

    1. 登录函数计算服务控制台,根据页面提示完成开通。

    2. 开通后,登录函数计算服务控制台,完成阿里云服务授权。

  3. 如果您是首次访问事件总线 EventBridge,请按照以下步骤进行开通。

    1. 登录EventBridge控制台

    2. 概览页的开通事件总线向导页,点击免费开通。根据页面提示完成开通。

    3. EventBridge 需进行授权才能正常收取事件。在概览页的访问授权向导页,点击一键授权,完成阿里云服务授权。

获取API-KEY

获取百炼 API-KEY

  1. 前往阿里云百炼的密钥管理页面。

  2. API-Key 页签下,创建或查看 API Key

    重要
    1. 子账号需要通过主账号完成授权后再去创建 API Key

    2. 请不要将 API Key 以任何方式公开,避免因未经授权的使用造成安全风险或资金损失。

  3. 单击 API Key 列中的image,复制 API Key

获取向量检索服务 API-KEY

  1. 登录向量检索服务控制台

  2. 在左侧导航栏单击API-KEY 管理

  3. API-KEY 管理页面,创建 API-KEY。单击复制按钮,复制API-KEY。

    说明
    1. 子账号需要通过主账号完成授权后再去创建 API-KEY。

    2. 请不要将 API-KEY 以任何方式公开,避免因未经授权的使用造成安全风险或资金损失。

创建数据源(Source)和目标(Sink)

10

创建OSS Bucket作为数据源,创建DashVector作为数据投递的目标。

创建对象存储OSS Bucket

接下来您需要创建1个对象存储OSS Bucket,用于存储本方案相关资源。

  1. 登录OSS管理控制台

  2. 在左侧导航栏,选择Bucket列表

  3. Bucket列表页面,单击创建Bucket

  4. 创建Bucket面板,创建1Bucket。

    项目

    说明

    示例值

    Bucket名称

    Bucket 命名规范:

    • 命名长度为3~63个字符。

    • 只允许小写字母、数字、短横线(-),且不能以短横线开头或结尾。

    • Bucket名称在OSS范围内必须全局唯一。

    test-bucket

    地域

    地域选择华南1(深圳)。

    华南1(深圳)

    存储类型

    存储类型 ,创建成功后不支持修改。本方案以标准存储为例。

    标准存储

    存储冗余类型

    选择采用多可用区(AZ)冗余机制的同城冗余存储,将用户的数据分散存放在同一地域的3个可用区。当某个可用区不可用时,仍然能够保障数据的正常访问。

    同城冗余存储

    读写权限

    设置数据访问权限。

    私有

创建向量检索服务ClusterCollection

  1. 登录向量检索服务控制台

  2. 在左侧导航栏单击Cluster列表,单击创建Cluster,配置以下信息:

    1. 商品类型选择按量付费

    2. 地域选择华南1(深圳)

    3. 实例类型实例规格:新用户可以选择免费试用。除新用户外,建议选择存储型S.small规格,副本数设置为1

    4. 自定义Cluster名称

  3. 单击立即购买。勾选服务协议后,单击立即开通。等待Cluster创建完成。

  4. 在左侧导航栏点击Cluster 列表,在顶部选择地域,找到已创建的Cluster实例,单击操作列的创建Collection

    1. 自定义Collection名称

    2. 选择单向量设置

    3. 向量维度设置为1536

    4. 距离度量方式选择Cosine

    5. 展开高级设置,在定义Schema区域,点击新增字段字段名称填写raw数据类型选择STRING

  5. 单击确定,等待Collection创建完成。

获取向量检索服务Cluster Endpoint

  1. 在左侧导航栏点击Cluster 列表,在顶部选择地域,找到已创建的Cluster实例,点击Cluster名称跳转到Cluster详情页。

  2. 访问端口区域,获取外网访问Endpoint值,记录下来,后续配置中会用到。

    image

构建数据处理管道

10

创建并配置EventBridge事件流作为数据处理管道。

创建事件流并配置Source(源)

  1. 登录EventBridge控制台

  2. 在左侧导航栏单击事件流

  3. 在顶部选择地域为华南1(深圳)

  4. 单击创建事件流

    1. 自定义任务名称

    2. 计费模式选择按事件量计费

  5. 任务创建页签的Source (源)区域,配置以下信息:

    1. 数据提供方选择对象存储 OSS

    2. OSS Bucket选择之前已创建的OSS Bucket数据源。

    3. 文档加载选择TextLoader

    4. 加载模式选择分块加载分块标识符填写\n,实现将数据源文本按照换行符切分为多段。

    5. 角色配置:点击授权创建新角色,在弹出的访问控制快速授权页面,确保已选择权限策略中包含AliyunOSSReadOnlyAccess,单击确认授权,根据页面提示完成授权。返回当前配置页面,选择已创建的角色。

      image

  6. 点击下一步

配置Filtering(过滤)

  1. Filtering(过滤)区域,配置以下信息:

    1. 模式内容选择匹配全部事件。实际业务场景中,也可选择其他过滤规则,或选择自定义匹配来灵活定义过滤规则。

      image

  2. 点击下一步

配置Transform(转换)

  1. Transform(转换)区域,配置以下信息:

    1. 选择阿里云服务函数计算

    2. 使用新建函数模板创建函数,函数模板选择内容向量化

    3. 函数代码中,将{your DASHSCOPE_API_KEY}替换为之前获取的百炼API-KEY,替换时注意保留'',不要保留{}。其余代码不变。

      image

    说明
    • 本例中,经过数据转换后,输出的数据结构如下:

      {
        "status_code":200,
        "request _id":"xxx-xxxx-xxx",
        "output":{
          "embeddings":[
            {
              "embedding":[
                -0.446533203125,
                2.4246826171875,
                ...
              ],
              "text_index":0
            }
          ]
        },
        "usage":{
          "total_tokens":3
        },
        "content":"<进行embedding的数据>"
      }
    • 实际场景中可以根据业务需求修改代码,灵活定制数据转换的方式;或者选择绑定现有函数,使用已部署在函数计算上的函数来执行数据转换。

  2. 点击下一步

配置Sink(目标)

  1. Sink(目标)区域,配置以下信息:

    1. 服务类型选择向量检索服务

    2. DashVector Cluster选择已创建的Cluster。

    3. DashVector Collection选择已创建的Collection。

    4. 数据映射选择Upsert

    5. 向量填写通过上游函数处理转换后得到的向量信息 $.output.embeddings[0].embedding

    6. Partition选择固定值Default

    7. 属性列中,对于属性列名称raw,配置属性值部分事件$.content

      说明

      基于上一步Transform的输出结果,按JsonPath提取所需内容。在RAG场景中,建议配置属性值,以便在RAG召回时找到原始关联数据。

    8. 鉴权配置填写之前获取的向量检索服务API KEY。

    9. 网络配置选择公网网络

      image

  1. 点击保存。等待事件流创建成功,状态应显示运行中

验证数据处理结果

上传文件到OSS Bucket
  1. 登录OSS管理控制台

  2. 在左侧导航栏,点击Bucket列表

  3. Bucket列表中,单击已创建的Bucket 名称,跳转到Bucket详情页。

  4. 在左侧导航栏点击文件管理 > 文件列表,点击上传文件

    1. 点击扫描文件,从本地选择文件上传并扫描。本文使用如下示例文件:百炼系列手机产品介绍.txt,包含了虚拟手机厂商的私有商品数据,便于后续验证RAG应用的效果。

    2. 点击上传文件。等待上传成功。

查看Embedding向量
  1. 登录向量检索服务控制台

  2. 在左侧导航栏,点击Cluster列表。单击已创建的Collection名称,跳转到详情页。

  3. 在左侧导航栏,点击向量管理 > 向量数据预览,查看已生成的向量。

方案验证

10

RAG应用通常的实现流程如下:

  1. 向量检索:用户输入查询语句,应用首先对查询语句进行 Embedding,生成查询向量,然后从VectorDB中检索出与查询向量相似度最高的n个结果向量并进行重排序(下文的示例代码中省略了重排序的过程)。

  2. 大模型生成回答:将向量检索结果作为上下文,由大模型根据用户的提问生成回答。

接下来,我们将以DashVector中的向量数据作为知识库,通过FCWeb函数构建一个简单的RAG应用。通过输入与知识库相关的用户问题,测试RAG应用的回答效果,同时验证检索召回的效果。

创建Web函数

  1. 登录函数计算控制台

  2. 在左侧导航栏单击函数管理 > 函数列表地域选择华南1(深圳)

  3. 单击创建函数,选择Web函数,单击创建Web函数

    项目

    示例值

    说明

    函数名称

    test-rag

    只能包含字母、数字、下划线和中划线。不能以数字、中划线开头。长度在 1-64 之间。

    规格方案

    vCPU 0.35,内存 512 MB,磁盘 512 MB

    函数实例的规格。

    最小实例数

    0

    如果一段时间内函数未被调用,则仅保留最小实例数的实例。

    单实例并发度

    20

    单个函数实例最多同时处理的请求数。

    运行环境

    自定义运行时 / Python / Python 3.10

    函数代码的运行时环境。

    代码上传方式

    当前使用示例代码。后续创建函数后,再使用实际的业务代码来替代。

    选择函数代码的上传方式。

    启动命令

    python3 app.py

    指定函数实例启动时的执行命令。

    监听端口

    9000

    函数实例监听请求的端口。

    执行超时时间

    600

    如果函数在这个时间内未能成功执行,函数计算会返回超时错误,请设置大小合适的超时时间,避免函数执行超时。

  4. 点击创建,等待函数创建成功。

配置函数运行时环境

函数计算控制台,点击已创建的Web函数(注意和之前创建的用于处理数据流的事件函数区分),进入函数详情页。在配置页签的高级配置区域,点击编辑

image

配置环境变量
  1. 高级配置面板,展开环境变量区域,选择使用表单编辑。点击添加变量,依次添加以下环境变量(注意仅添加新变量,不要删除已有变量):

    变量

    DASHSCOPE_API_KEY

    填写之前获取的百炼 API-KEY。

    DASHVECTOR_API_KEY

    填写之前获取的向量检索服务的API-KEY。

    dashvector_cluster_endpoint

    填写之前获取的向量检索服务的Cluster Endpoint。

    collection_name

    填写之前创建的向量检索服务的Collection名称(注意不是Cluster名称)。

    image

配置层
  1. 高级配置面板,展开区域,在已有层的基础上,点击添加层 > 添加自定义层,点击创建自定义层

    image

  2. 创建层页面,配置以下信息:

    1. 自定义名称

    2. 兼容运行时选择Debian 10

    3. 层上传方式选择在线构建依赖层

    4. 构建环境选择Python 3.10

    5. requirements.txt 文件中填写以下内容:

      dashscope
      dashvector
    6. 点击创建,等待层构建成功。

  3. 返回高级配置,点击区域的刷新按钮,然后在第二层的下拉列表中选择已创建的层版本

  4. 点击底部的部署。等待配置生效。

编写函数代码

  1. 在已创建的Web函数详情页,在代码页签,将app.py文件内容替换为以下代码:

    from flask import Flask
    from flask import request
    
    import os
    
    import dashscope
    from dashscope import TextEmbedding, Generation
    
    from dashvector import Client, Doc
    
    REQUEST_ID_HEADER = 'x-fc-request-id'
    
    app = Flask(__name__)
    
    dashscope_api_key = os.getenv('DASHSCOPE_API_KEY')
    dashvector_api_key = os.getenv('DASHVECTOR_API_KEY')
    dashvector_cluster_endpoint = os.getenv('dashvector_cluster_endpoint')
    collection_name = os.getenv('collection_name')
    
    @app.route('/', defaults={'path': ''})
    @app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
    def handler(path):
        rid = request.headers.get(REQUEST_ID_HEADER)
        print("FC Invoke Start RequestId: " + rid)
        # print("Path: " + path)
        # print("Data: " + str(data))
        # print("FC Invoke End RequestId: " + rid)
        data = request.stream.read()
        question = str(data.decode('utf-8'))
        print("User Input: " + question)
        answer = answer_question(question, search_relevant_info(question))
        return answer
    
    def answer_question(question, context):
        prompt = f'''请基于```内的内容回答问题。
            ```
            {context}
            ```
            我的问题是:{question}。
        '''
        rsp = Generation.call(model='qwen-turbo', prompt=prompt)
        return rsp.output.text
    
    def search_relevant_info(question):
        # 初始化 dashvector client
        client = Client(
            api_key = dashvector_api_key,
            endpoint = dashvector_cluster_endpoint
        )
    
        # 获取刚刚存入的集合
        collection = client.get(collection_name)
        assert collection
    
        # 向量检索:指定 topk = 1
        rsp = collection.query(generate_embeddings(question), output_fields=['raw'],
                               topk=1)
        assert rsp
        relevant_info = rsp.output[0].fields['raw']
        print("relevant_info: ", relevant_info)
        return relevant_info
    
    def generate_embeddings(question):
        rsp = TextEmbedding.call(
            model=TextEmbedding.Models.text_embedding_v1,
            input=question
        )
        embeddings = [record['embedding'] for record in rsp.output['embeddings']]
        return embeddings if isinstance(question, list) else embeddings[0]
    
    
    if __name__ == '__main__':
            app.run(host='0.0.0.0',port=9000)
    
  2. 点击部署代码,等待代码部署成功。

验证效果

  1. 在已创建的Web函数详情页,在代码页签,点击测试函数旁边的下拉框,点击配置测试参数

    image

  2. 创建新测试事件事件模板选择hello-world,自定义事件名称,在页面下方的代码框中输入以下测试文本,点击确定

    百炼X1手机的分辨率是多少?
  3. 点击测试函数。页面显示执行成功。展开详细信息,在返回结果页签中,查看大模型根据客户问题,查询知识库后生成的回答。在日志输出页签中,查看向量检索召回的文本。

    image

    image

  4. 验证RAG应用对不同问题的回答和召回效果:更换测试参数中的测试文本,然后重新执行测试。

清理资源

5

在本方案中,您创建了1EventBridge事件流,1OSS Bucket,1DashVector Cluster,2个函数。测试完方案后,您可以参考以下步骤删除对应产品的实例,避免继续产生费用:

  1. 删除EventBridge事件流:登录EventBridge控制台,在左侧导航栏点击事件流找到已创建的事件流,点击操作列的删除,然后根据页面提示操作。

  2. 删除OSS Bucket:登录OSS管理控制台在左侧导航栏点击Bucket列表找到已创建的Bucket。点击Bucket名称跳转到详情页,在左侧导航栏底部点击删除Bucket,然后根据页面提示操作。

  3. 删除DashVector Cluster:登录向量检索服务控制台在左侧导航栏点击Cluster列表找到已创建的Cluster,点击操作列的释放,然后根据页面提示操作。

  4. 删除函数:登录函数计算控制台,在左侧导航栏点击函数管理 > 函数列表,找到已创建的所有函数,点击操作列的删除,然后根据页面提示操作。

一键部署【免费试用】

方案概览

通过EventBridge构建高效的ETL(Extract, Transform, Load)数据管道:从多种数据源实时提取数据,灵活定义数据转换的逻辑,将处理结果持续加载到多种类型的目标(例如向量数据库),为AI推理和构建大模型RAG(Retrieval-Augmented Generation)应用提供高质量的知识库与实时数据。

方案架构

方案提供的默认设置完成部署后在阿里云上搭建的系统如下图所示。实际部署时您可以根据资源规划修改部分设置,但最终形成的运行环境与下图相似。

image

本文将演示如何从01构建基于文本索引和向量检索的实时RAG应用。具体来说,我们将结合EventBridge事件流,对象存储(OSS),函数计算(FC),向量检索服务(DashVector),以及 大模型服务平台百炼Embedding API,构建ETL数据管道,然后向OSS中存储的文本文档动态插入数据,进行实时的文本语义搜索,查询相似度最高的内容。

本方案的技术架构包括以下基础设施和云服务:

基础设施和云服务

说明

1个事件流

用于构建数据管道。

1个对象存储OSS Bucket

用于存储文本文件,作为ETL的数据源。

1个向量检索服务DashVector Cluster

用于存储转换后的向量数据,作为ETL的数据目标,以及RAG应用的知识库。

1个函数计算的事件函数

用于自定义ETL的数据转换逻辑,将文本数据源转换为向量。

1个函数计算的Web函数

用于验证RAG应用的检索召回效果。

一键部署

  1. 单击页面右侧的立即试用按钮,即可开始创建资源。等待几分钟后,即可完成部署。

    说明

    若页面显示您的账户余额小于预估试用点,则需先获取足够的试用点后再进行本方案的试用。获取更多试用点可前往解决方案免费试用

  2. 部署完成后,在当前页面右侧浏览器的RAM 用户登录页面中,单击下一步

  3. 在当前页面左侧的云产品资源 > 试用账号下,复制登录密码,并将其粘贴到右侧浏览器页面的用户密码输入框中,然后单击登录

  4. 登录成功后,在选择绑定的 MFA 类型页面中,单击右上方的跳过绑定

重要
  • 部署完成后,点击当前页面左侧的云产品资源菜单,即可查看所有已创建的云资源信息。

  • 在试用过程中,如果您的试用时长接近本方案的最大限制(可在页面右上角查看剩余时长),系统会提示您进行续期。您也可以单击页面右上角的续期来延长试用时间。当剩余时长为零时,系统将自动释放所有创建的资源。

  • 在接下来的操作中,如需访问某个云资源,请从左侧的云产品资源中复制相应地址,然后将其粘贴到右侧的浏览器中进行访问。

资源开通

  1. 登录阿里云百炼大模型服务平台,阅读并同意协议后,将自动开通阿里云百炼,如果未弹出服务协议,则表示您已经开通。

    • 当您首次开通阿里云百炼时,平台会自动为您发放各模型的新人专属免费额度。详情请参见新人免费额度

      说明

      仅中国大陆版(北京)模型有免费额度,国际版(新加坡)模型无免费额度。

    • 如果开通服务时提示“您尚未进行实名认证”,请先进行个人实名认证

  2. 如果是首次使用函数计算,请先开通函数计算服务

    1. 登录函数计算服务控制台,根据页面提示完成开通。

    2. 开通后,登录函数计算服务控制台,完成阿里云服务授权。

  3. 如果您是首次访问事件总线 EventBridge,请按照以下步骤进行开通。

    1. 登录EventBridge控制台

    2. 概览页的开通事件总线向导页,点击免费开通。根据页面提示完成开通。

    3. EventBridge 需进行授权才能正常收取事件。在概览页的访问授权向导页,点击一键授权,完成阿里云服务授权。

获取API-KEY

获取百炼 API-KEY

  1. 前往阿里云百炼的密钥管理页面。

  2. API-Key 页签下,创建或查看 API Key

    重要
    1. 子账号需要通过主账号完成授权后再去创建 API Key

    2. 请不要将 API Key 以任何方式公开,避免因未经授权的使用造成安全风险或资金损失。

  3. 单击 API Key 列中的image,复制 API Key

获取向量检索服务 API-KEY

  1. 登录向量检索服务控制台

  2. 在左侧导航栏单击API-KEY 管理

  3. API-KEY 管理页面,创建 API-KEY。单击复制按钮,复制API-KEY。

    说明
    1. 子账号需要通过主账号完成授权后再去创建 API-KEY。

    2. 请不要将 API-KEY 以任何方式公开,避免因未经授权的使用造成安全风险或资金损失。

创建数据目标(Sink)

10

创建DashVector作为数据投递的目标。

创建向量检索服务ClusterCollection

  1. 登录向量检索服务控制台

  2. 在左侧导航栏单击Cluster列表,单击创建Cluster,配置以下信息:

    1. 商品类型选择按量付费

    2. 地域选择华南1(深圳)

    3. 实例类型实例规格:新用户可以选择免费试用。除新用户外,建议选择存储型S.small规格,副本数设置为1

    4. 自定义Cluster名称

  3. 单击立即购买。勾选服务协议后,单击立即开通。等待Cluster创建完成。

  4. 在左侧导航栏点击Cluster 列表,在顶部选择地域,找到已创建的Cluster实例,单击操作列的创建Collection

    1. 自定义Collection名称

    2. 选择单向量设置

    3. 向量维度设置为1536

    4. 距离度量方式选择Cosine

    5. 展开高级设置,在定义Schema区域,点击新增字段字段名称填写raw数据类型选择STRING

  5. 单击确定,等待Collection创建完成。

获取向量检索服务Cluster Endpoint

  1. 在左侧导航栏点击Cluster 列表,在顶部选择地域,找到已创建的Cluster实例,点击Cluster名称跳转到Cluster详情页。

  2. 访问端口区域,获取外网访问Endpoint值,记录下来,后续配置中会用到。

    image

构建数据处理管道

10

创建并配置EventBridge事件流作为数据处理管道。

创建事件流并配置Source(源)

  1. 登录EventBridge控制台

  2. 在左侧导航栏单击事件流

  3. 在顶部选择地域为华南1(深圳)

  4. 单击创建事件流

    1. 自定义任务名称

    2. 计费模式选择按事件量计费

  5. 任务创建页签的Source (源)区域,配置以下信息:

    1. 数据提供方选择对象存储 OSS

    2. OSS Bucket选择之前已创建的OSS Bucket数据源。

    3. 文档加载选择TextLoader

    4. 加载模式选择分块加载分块标识符填写\n,实现将数据源文本按照换行符切分为多段。

    5. 角色配置:点击授权创建新角色,在弹出的访问控制快速授权页面,确保已选择权限策略中包含AliyunOSSReadOnlyAccess,单击确认授权,根据页面提示完成授权。返回当前配置页面,选择已创建的角色。

      image

  6. 点击下一步

配置Filtering(过滤)

  1. Filtering(过滤)区域,配置以下信息:

    1. 模式内容选择匹配全部事件。实际业务场景中,也可选择其他过滤规则,或选择自定义匹配来灵活定义过滤规则。

      image

  2. 点击下一步

配置Transform(转换)

  1. Transform(转换)区域,配置以下信息:

    1. 选择阿里云服务函数计算

    2. 使用新建函数模板创建函数,函数模板选择内容向量化

    3. 函数代码中,将{your DASHSCOPE_API_KEY}替换为之前获取的百炼API-KEY,替换时注意保留'',不要保留{}。其余代码不变。

      image

    说明
    • 本例中,经过数据转换后,输出的数据结构如下:

      {
        "status_code":200,
        "request _id":"xxx-xxxx-xxx",
        "output":{
          "embeddings":[
            {
              "embedding":[
                -0.446533203125,
                2.4246826171875,
                ...
              ],
              "text_index":0
            }
          ]
        },
        "usage":{
          "total_tokens":3
        },
        "content":"<进行embedding的数据>"
      }
    • 实际场景中可以根据业务需求修改代码,灵活定制数据转换的方式;或者选择绑定现有函数,使用已部署在函数计算上的函数来执行数据转换。

  2. 点击下一步

配置Sink(目标)

  1. Sink(目标)区域,配置以下信息:

    1. 服务类型选择向量检索服务

    2. DashVector Cluster选择已创建的Cluster。

    3. DashVector Collection选择已创建的Collection。

    4. 数据映射选择Upsert

    5. 向量填写通过上游函数处理转换后得到的向量信息 $.output.embeddings[0].embedding

    6. Partition选择固定值Default

    7. 属性列中,对于属性列名称raw,配置属性值部分事件$.content

      说明

      基于上一步Transform的输出结果,按JsonPath提取所需内容。在RAG场景中,建议配置属性值,以便在RAG召回时找到原始关联数据。

    8. 鉴权配置填写之前获取的向量检索服务API KEY。

    9. 网络配置选择公网网络

      image

  1. 点击保存。等待事件流创建成功,状态应显示运行中

验证数据处理结果

上传文件到OSS Bucket
  1. 登录OSS管理控制台

  2. 在左侧导航栏,点击Bucket列表

  3. Bucket列表中,单击已创建的Bucket 名称,跳转到Bucket详情页。

  4. 在左侧导航栏点击文件管理 > 文件列表,点击上传文件

    1. 点击扫描文件,从本地选择文件上传并扫描。本文使用如下示例文件:百炼系列手机产品介绍.txt,包含了虚拟手机厂商的私有商品数据,便于后续验证RAG应用的效果。

    2. 点击上传文件。等待上传成功。

查看Embedding向量
  1. 登录向量检索服务控制台

  2. 在左侧导航栏,点击Cluster列表。单击已创建的Collection名称,跳转到详情页。

  3. 在左侧导航栏,点击向量管理 > 向量数据预览,查看已生成的向量。

方案验证

10

RAG应用通常的实现流程如下:

  1. 向量检索:用户输入查询语句,应用首先对查询语句进行 Embedding,生成查询向量,然后从VectorDB中检索出与查询向量相似度最高的n个结果向量并进行重排序(下文的示例代码中省略了重排序的过程)。

  2. 大模型生成回答:将向量检索结果作为上下文,由大模型根据用户的提问生成回答。

接下来,我们将以DashVector中的向量数据作为知识库,通过FCWeb函数构建一个简单的RAG应用。通过输入与知识库相关的用户问题,测试RAG应用的回答效果,同时验证检索召回的效果。

创建Web函数

  1. 登录函数计算控制台

  2. 在左侧导航栏单击函数管理 > 函数列表地域选择华南1(深圳)

  3. 单击创建函数,选择Web函数,单击创建Web函数

    项目

    示例值

    说明

    函数名称

    test-rag

    只能包含字母、数字、下划线和中划线。不能以数字、中划线开头。长度在 1-64 之间。

    规格方案

    vCPU 0.35,内存 512 MB,磁盘 512 MB

    函数实例的规格。

    最小实例数

    0

    如果一段时间内函数未被调用,则仅保留最小实例数的实例。

    单实例并发度

    20

    单个函数实例最多同时处理的请求数。

    运行环境

    自定义运行时 / Python / Python 3.10

    函数代码的运行时环境。

    代码上传方式

    当前使用示例代码。后续创建函数后,再使用实际的业务代码来替代。

    选择函数代码的上传方式。

    启动命令

    python3 app.py

    指定函数实例启动时的执行命令。

    监听端口

    9000

    函数实例监听请求的端口。

    执行超时时间

    600

    如果函数在这个时间内未能成功执行,函数计算会返回超时错误,请设置大小合适的超时时间,避免函数执行超时。

  4. 点击创建,等待函数创建成功。

配置函数运行时环境

函数计算控制台,点击已创建的Web函数(注意和之前创建的用于处理数据流的事件函数区分),进入函数详情页。在配置页签的高级配置区域,点击编辑

image

配置环境变量
  1. 高级配置面板,展开环境变量区域,选择使用表单编辑。点击添加变量,依次添加以下环境变量(注意仅添加新变量,不要删除已有变量):

    变量

    DASHSCOPE_API_KEY

    填写之前获取的百炼 API-KEY。

    DASHVECTOR_API_KEY

    填写之前获取的向量检索服务的API-KEY。

    dashvector_cluster_endpoint

    填写之前获取的向量检索服务的Cluster Endpoint。

    collection_name

    填写之前创建的向量检索服务的Collection名称(注意不是Cluster名称)。

    image

配置层
  1. 高级配置面板,展开区域,在已有层的基础上,点击添加层 > 添加自定义层,点击创建自定义层

    image

  2. 创建层页面,配置以下信息:

    1. 自定义名称

    2. 兼容运行时选择Debian 10

    3. 层上传方式选择在线构建依赖层

    4. 构建环境选择Python 3.10

    5. requirements.txt 文件中填写以下内容:

      dashscope
      dashvector
    6. 点击创建,等待层构建成功。

  3. 返回高级配置,点击区域的刷新按钮,然后在第二层的下拉列表中选择已创建的层版本

  4. 点击底部的部署。等待配置生效。

编写函数代码

  1. 在已创建的Web函数详情页,在代码页签,将app.py文件内容替换为以下代码:

    from flask import Flask
    from flask import request
    
    import os
    
    import dashscope
    from dashscope import TextEmbedding, Generation
    
    from dashvector import Client, Doc
    
    REQUEST_ID_HEADER = 'x-fc-request-id'
    
    app = Flask(__name__)
    
    dashscope_api_key = os.getenv('DASHSCOPE_API_KEY')
    dashvector_api_key = os.getenv('DASHVECTOR_API_KEY')
    dashvector_cluster_endpoint = os.getenv('dashvector_cluster_endpoint')
    collection_name = os.getenv('collection_name')
    
    @app.route('/', defaults={'path': ''})
    @app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
    def handler(path):
        rid = request.headers.get(REQUEST_ID_HEADER)
        print("FC Invoke Start RequestId: " + rid)
        # print("Path: " + path)
        # print("Data: " + str(data))
        # print("FC Invoke End RequestId: " + rid)
        data = request.stream.read()
        question = str(data.decode('utf-8'))
        print("User Input: " + question)
        answer = answer_question(question, search_relevant_info(question))
        return answer
    
    def answer_question(question, context):
        prompt = f'''请基于```内的内容回答问题。
            ```
            {context}
            ```
            我的问题是:{question}。
        '''
        rsp = Generation.call(model='qwen-turbo', prompt=prompt)
        return rsp.output.text
    
    def search_relevant_info(question):
        # 初始化 dashvector client
        client = Client(
            api_key = dashvector_api_key,
            endpoint = dashvector_cluster_endpoint
        )
    
        # 获取刚刚存入的集合
        collection = client.get(collection_name)
        assert collection
    
        # 向量检索:指定 topk = 1
        rsp = collection.query(generate_embeddings(question), output_fields=['raw'],
                               topk=1)
        assert rsp
        relevant_info = rsp.output[0].fields['raw']
        print("relevant_info: ", relevant_info)
        return relevant_info
    
    def generate_embeddings(question):
        rsp = TextEmbedding.call(
            model=TextEmbedding.Models.text_embedding_v1,
            input=question
        )
        embeddings = [record['embedding'] for record in rsp.output['embeddings']]
        return embeddings if isinstance(question, list) else embeddings[0]
    
    
    if __name__ == '__main__':
            app.run(host='0.0.0.0',port=9000)
    
  2. 点击部署代码,等待代码部署成功。

验证效果

  1. 在已创建的Web函数详情页,在代码页签,点击测试函数旁边的下拉框,点击配置测试参数

    image

  2. 创建新测试事件事件模板选择hello-world,自定义事件名称,在页面下方的代码框中输入以下测试文本,点击确定

    百炼X1手机的分辨率是多少?
  3. 点击测试函数。页面显示执行成功。展开详细信息,在返回结果页签中,查看大模型根据客户问题,查询知识库后生成的回答。在日志输出页签中,查看向量检索召回的文本。

    image

    image

  4. 验证RAG应用对不同问题的回答和召回效果:更换测试参数中的测试文本,然后重新执行测试。

清理资源

单击当前方案页面右上角的image结束试用,在弹出的确认框中继续单击确定,即可结束试用并释放所有资源。

说明

方案试用结束后,试用账号将销毁,试用资源将自动释放。