基于EventBridge和DashVector打造RAG全链路动态语义检索能力

如果您想构建一个基于文本索引的语义搜索系统,您可以利用事件总线(EventBridge)向量检索服务 DashVector函数计算 FC(Function Compute),并通过百炼服务上的同步接口API,从零开始构建一个基于文本索引和向量检索的语义搜索系统。本文将介绍如何构建一个完全动态的RAG入库方案,通过事件总线(EventBridge)拉取OSS非结构化数据,同时将数据投递至向量数据库,从而实现完整的RAG Ingestion流程。

RAG背景概述

大语言模型(LLM)作为自然语言处理领域的核心技术,具有丰富的自然语言处理能力。然而,其训练语料库主要由普适知识和常识性知识(如维基百科、新闻、小说)以及各种领域的专业知识组成,这导致LLM在处理特定领域的知识表示和应用时存在一定的局限性。特别是在垂直领域或企业内部等私域专属知识方面,由于缺乏针对性的数据和专业术语的充分覆盖,LLM的表现可能会不尽人意。为了解决这些问题,可以通过微调或增加专有数据集来提高模型在特定领域的性能。

实现专属领域的知识问答的关键在于让大型语言模型(LLM)能够理解并获取那些超出了它训练数据范围内的特定领域知识。这可以通过精心设计的提示词(Prompt)来引导LLM,在回答该领域的问题时,根据提供的额外知识来进行作答。通常情况下,用户的提问采用完整的句子形式,而非像搜索引擎那样仅输入几个关键词。因此,直接使用关键词与企业内部的知识库进行匹配往往效果不佳,因为处理长句还涉及到分词、权重分配等一系列复杂步骤。相反,如果将提问文本以及知识库中的内容都转换为高质量的向量表示,然后通过向量检索技术将其转变为一种基于语义的搜索过程,则可以在提取相关信息方面达到更高效的结果。

方案概述

  1. 数据集成(Ingestion):将对象存储OSS中的文件通过事件触发的方式,经过转换后存入向量数据库DashVector中。

    image
  2. 数据检索(Search):把检索到的原始数据转换为易于计算机处理的向量形式,最后存储于DashVector数据库中以便高效检索和分析。

    image

前提条件

1. 创建函数

函数计算控制台创建函数。

说明

本文只介绍示例函数代码。创建函数的具体步骤,请参见创建函数

  • 测试代码如下所示:

    # -*- coding: utf-8 -*-
    import os
    import ast
    import copy
    import json
    import logging
    import dashscope
    from dashscope import TextEmbedding
    from http import HTTPStatus
    
    logger = logging.getLogger()
    logger.setLevel(level=logging.INFO)
    
    dashscope.api_key='Your-API-KEY'
    # 需要替换成自己创建的DashScope API-KEY。
    
    def handler(event, context):
      evt = json.loads(event)
      evtinput = evt[0]['data']
      resp = dashscope.TextEmbedding.call(
            model=dashscope.TextEmbedding.Models.text_embedding_v1,
            api_key=os.getenv('DASHSCOPE_API_KEY'), 
            # 需要替换成自己创建的DashScope API-KEY。
            input= evtinput )
      if resp.status_code == HTTPStatus.OK:
            print(resp)
      else:
            print(resp)
      return resp
    重要

    需手动安装相关函数环境。具体操作,请参见为函数安装第三方依赖

    pip3 install dashvector dashscope -t .
  • 返回样例如下所示:

    {
        "code": "",
        "message": "",
        "output": {
            "embeddings": [
                {
                    "embedding": [
                        -2.192838430404663,
                        -0.703125,
                        ... ...
                        -0.8980143070220947,
                        -0.9130208492279053,
                        -0.520526111125946,
                        -0.47154948115348816
                    ],
                    "text_index": 0
                }
            ]
        },
        "request_id": "e9f9a555-85f2-9d15-ada8-133af5******",
        "status_code": 200,
        "usage": {
            "total_tokens": 3
        }
    }

2. 创建Ingestion数据集成任务

  1. 登录事件总线 EventBridge 控制台

  2. 在左侧导航栏,单击事件流,然后选择目标地域,最后单击创建事件流

    image

  3. 创建事件流页面,配置以下信息。

    1. Source(源)配置导向页面,按照以下表格说明配置参数信息,然后单击下一步

      配置项

      示例值

      说明

      数据提供方

      对象存储 OSS

      在下拉框中选择对象存储 OSS

      OSS Bucket

      ai4d-35bzmh7q33gl******

      选择已创建的OSS Bucket,如果没有,可单击创建OSS Bucket进行创建。

      OSS 前缀 (prefix)

      非必填项,若不填表示EB将拉取整个Bucket内容。

      文档加载

      TextLoder

      支持解析TextLoader作为文档加载器。

      加载模式

      单文档加载(single)

      有两种模式可供您选择:

      • 单文档加载(single):表示单个文件作为一条数据加载。本示例中采用此种模式。

      • 分块加载(elements):表示按照分隔符加载数据。

      image

    2. Filtering(过滤)配置向导页面,选择模式内容,然后单击下一步

      说明

      本示例中模式内容匹配全部事件

      image

    3. Transform(转换)配置向导页面,选择选择阿里云服务函数计算,然后选择目标函数和函数版本,最后单击下一步

      重要

      转换部分主要是将原始数据转换成向量化数据,为投递至DashVector做数据准备。

      配置项

      示例值

      说明

      选择阿里云服务

      函数计算

      在下拉框中选择函数计算,并选择绑定现有函数

      函数

      ebragdemo

      在下拉列表中选择已创建的函数。

      版本和别名

      指定版本

      支持选择以下两种类型:

      • 指定版本:需要选择函数的具体版本。

      • 指定别名:需要选择函数的别名。

      版本

      LATEST

      选择目标函数的具体版本。

    4. Sink(目标)配置向导页面,按照以下表格说明配置参数信息,然后单击保存

      配置项

      示例值

      说明

      服务类型

      向量检索服务DashVector

      在下拉框中选择向量检索服务DashVector

      DashVector Cluster

      DashVector

      选择创建成功的数据库,如果没有,可单击新建 DashVector Cluster进行创建。

      DashVector Collection

      Demotest

      选择创建成功的Collection。

      数据映射

      Upsert

      选择Upsert方式插入您的向量数据库。

      向量

      $.output.embeddings[0].embedding

      填写上游DashscopeTextEmbedding投递的向量信息。如$.output. embeddings[0].embedding

      鉴权配置 (API-KEY)

      ********

      填写获取的DashVector API-KEY。

      image

3. 创建Search数据检索任务

在进行数据检索时,需要首先对数据进行Embedding,然后将Embedding后的向量值与数据库值做检索排序。最后填写Prompt模板,通过自然语言理解和语义分析,理解数据检索意图。

说明

该任务可以部署在云端函数计算,也可以在直接本地环境中执行。

  1. 创建embedding.py文件,将需要检索的问题进行文本向量化。示例代码如下所示:

    import os
    import dashscope
    from dashscope import TextEmbedding
    
    def generate_embeddings(news):
        rsp = TextEmbedding.call(
            model=TextEmbedding.Models.text_embedding_v1,
            input=news
        )
        embeddings = [record['embedding'] for record in rsp.output['embeddings']]
        return embeddings if isinstance(news, list) else embeddings[0]
    
    if __name__ == '__main__':
        dashscope.api_key = '{your-dashscope-api-key}'
        # 需要您替换成自己创建的Dashscope API-KEY 。
  2. 创建search.py文件,并将如下示例代码复制到search.py文件中,通过DashVector的向量检索能力来检索相似度的最高的内容。

    from dashvector import Client
    
    from embedding import generate_embeddings
    
    
    def search_relevant_news(question):
        # 初始化 dashvector client
        client = Client(
          api_key='{your-dashvector-api-key}',
          # 需要您替换成自己创建的Dashvector API-KEY。
          endpoint='{your-dashvector-cluster-endpoint}'
        )
    
        # 获取存入的集合
        collection = client.get('news_embedings')
        assert collection
    
        # 向量检索:指定 topk = 1 
        rsp = collection.query(generate_embeddings(question), output_fields=['raw'],
                               topk=1)
        assert rsp
        return rsp.output[0].fields['raw']
  3. 创建answer.py文件,就可以按照特定的模板作为PromptLLM发起提问了,本示例中选用的LLM是通义千问,代码示例如下:

    from dashscope import Generation
    
    def answer_question(question, context):
        prompt = f'''请基于```内的内容回答问题。"
    	```
    	{context}
    	```
    	我的问题是:{question}。
        '''
        rsp = Generation.call(model='qwen-turbo', prompt=prompt)
        return rsp.output.text
  4. 创建run.py文件,并将如下示例代码复制到run.py文件中,并执行run.py文件。

    import dashscope
    
    from search import search_relevant_news
    from answer import answer_question
    
    if __name__ == '__main__':
        dashscope.api_key = '{your-dashscope-api-key}'
        # 需要您替换成自己创建的DashScope API-KEY。
    
        question = 'EventBridge 是什么,它有哪些能力?'
        context = search_relevant_news(question)
        answer = answer_question(question, context)
    
        print(f'question: {question}\n' f'answer: {answer}')