基于EventBridge和DashVector打造RAG全链路动态语义检索能力

如果您想构建一个基于文本索引的语义搜索系统,您可以利用事件总线(EventBridge)向量检索服务函数计算 FC(Function Compute),并通过百炼服务上的同步接口API,从零开始构建一个基于文本索引和向量检索的语义搜索系统。本文将介绍如何构建一个完全动态的RAG入库方案,通过事件总线(EventBridge)拉取OSS非结构化数据,同时将数据投递至向量数据库,从而实现完整的RAG Ingestion流程。

RAG背景概述

大语言模型(LLM)作为自然语言处理领域的核心技术,具有丰富的自然语言处理能力。然而,其训练语料库主要由普适知识和常识性知识(如维基百科、新闻、小说)以及各种领域的专业知识组成,这导致LLM在处理特定领域的知识表示和应用时存在一定的局限性。特别是在垂直领域或企业内部等私域专属知识方面,由于缺乏针对性的数据和专业术语的充分覆盖,LLM的表现可能会不尽人意。为了解决这些问题,可以通过微调或增加专有数据集来提高模型在特定领域的性能。

实现专属领域的知识问答的关键在于让大型语言模型(LLM)能够理解并获取那些超出了它训练数据范围内的特定领域知识。这可以通过精心设计的提示词(Prompt)来引导LLM,在回答该领域的问题时,根据提供的额外知识来进行作答。通常情况下,用户的提问采用完整的句子形式,而非像搜索引擎那样仅输入几个关键词。因此,直接使用关键词与企业内部的知识库进行匹配往往效果不佳,因为处理长句还涉及到分词、权重分配等一系列复杂步骤。相反,如果将提问文本以及知识库中的内容都转换为高质量的向量表示,然后通过向量检索技术将其转变为一种基于语义的搜索过程,则可以在提取相关信息方面达到更高效的结果。

步骤概述

  1. 数据集成(Ingestion):将对象存储OSS中的文件通过事件触发的方式,经过转换后存入向量数据库DashVector中。

    image
  2. 数据检索(Search):把检索到的原始数据转换为易于计算机处理的向量形式,最后存储于DashVector数据库中以便高效检索和分析。

    image

前提条件

1. 创建Ingestion数据集成任务

  1. 登录事件总线 EventBridge 控制台

  2. 在左侧导航栏,单击事件流,然后选择目标地域,最后单击创建事件流

    image

  3. 创建事件流页面,配置以下信息。

    1. Source(源)配置导向页面,按照以下表格说明配置参数信息,然后单击下一步

      配置项

      示例值

      说明

      数据提供方

      对象存储 OSS

      在下拉框中选择对象存储 OSS

      OSS Bucket

      ai4d-35bzmh7q33gl******

      选择已创建的OSS Bucket,如果没有,可单击创建OSS Bucket进行创建。

      OSS 前缀 (prefix)

      非必填项,若不填表示EB将拉取整个Bucket内容。

      文档加载

      TextLoder

      支持解析TextLoader作为文档加载器。

      加载模式

      单文档加载(single)

      有两种模式可供您选择:

      • 单文档加载(single):表示单个文件作为一条数据加载。本示例中采用此种模式。

      • 分块加载(elements):表示按照分隔符加载数据。

      image

    2. Filtering(过滤)配置向导页面,选择模式内容,然后单击下一步

      说明

      本示例中模式内容匹配全部事件

      image

    3. Transform(转换)配置向导页面,选择选择阿里云服务函数计算,然后选择目标函数和函数版本,最后单击下一步

      重要

      转换部分主要是将原始数据转换成向量化数据,为投递至DashVector做数据准备。

      配置项

      示例值

      说明

      选择阿里云服务

      函数计算

      在下拉框中选择函数计算,并选择新建函数模板

      函数

      EventStreaming_Transform_Split_****

      自定义函数名称,也选择控制台默认的函数名称。

      函数模板

      内容分割

      支持以下四种函数模板,请按需选择。

      • 内容分割

      • 内容映射

      • 内容富化

      • 动态路由

      函数模板的具体介绍,请参见使用函数计算实现消息数据清洗

      pdBvZFJhg9

      本示例中创建函数所使用的代码

      • 测试代码如下所示:

        # -*- coding: utf-8 -*-
        import os
        import ast
        import copy
        import json
        import logging
        import dashscope
        from dashscope import TextEmbedding
        from http import HTTPStatus
        
        logger = logging.getLogger()
        logger.setLevel(level=logging.INFO)
        
        dashscope.api_key='Your-API-KEY'
        # 需要您替换成自己创建的DashScope API-KEY。
        
        def handler(event, context):
          evt = json.loads(event)
          evtinput = evt['data']
          resp = dashscope.TextEmbedding.call(
                model=dashscope.TextEmbedding.Models.text_embedding_v1,
                api_key=os.getenv('DASHSCOPE_API_KEY'), 
                # 需要您替换成自己创建的DashScope API-KEY。
                input= evtinput )
          if resp.status_code == HTTPStatus.OK:
                print(resp)
          else:
                print(resp)
          return resp
        重要

        需手动安装相关函数环境。具体操作,请参见为函数安装第三方依赖

        pip3 install dashvector dashscope -t .
      • 返回样例如下所示:

        {
            "code": "",
            "message": "",
            "output": {
                "embeddings": [
                    {
                        "embedding": [
                            -2.192838430404663,
                            -0.703125,
                            ... ...
                            -0.8980143070220947,
                            -0.9130208492279053,
                            -0.520526111125946,
                            -0.47154948115348816
                        ],
                        "text_index": 0
                    }
                ]
            },
            "request_id": "e9f9a555-85f2-9d15-ada8-133af54352b8",
            "status_code": 200,
            "usage": {
                "total_tokens": 3
            }
        }
    4. Sink(目标)配置向导页面,按照以下表格说明配置参数信息,然后单击保存

      配置项

      示例值

      说明

      服务类型

      向量检索服务

      在下拉框中选择向量检索服务

      DashVector Cluster

      DashVector

      选择创建成功的数据库,如果没有,可单击新建 DashVector Cluster进行创建。

      DashVector Collection

      Demotest

      选择创建成功的Collection。

      数据映射

      Upsert

      选择Upsert方式插入您的向量数据库。

      向量

      $.output. embeddings[0].embedding

      填写上游Dashscope的TextEmbedding投递的向量信息。如$.output. embeddings[0].embedding

      鉴权配置 (API-KEY)

      ********

      获取的DashVector API-KEY参数。

      image

2. 创建Search数据检索任务

在进行数据检索时,需要首先对数据进行Embedding,然后将Embedding后的向量值与数据库值做检索排序。最后填写Prompt模板,通过自然语言理解和语义分析,理解数据检索意图。

说明

该任务可以部署在云端函数计算,也可以在直接本地环境中执行。

  1. 创建embedding.py文件,将需要检索的问题进行文本向量化。示例代码如下所示:

    import os
    import dashscope
    from dashscope import TextEmbedding
    
    def generate_embeddings(news):
        rsp = TextEmbedding.call(
            model=TextEmbedding.Models.text_embedding_v1,
            input=news
        )
        embeddings = [record['embedding'] for record in rsp.output['embeddings']]
        return embeddings if isinstance(news, list) else embeddings[0]
    
    if __name__ == '__main__':
        dashscope.api_key = '{your-dashscope-api-key}'
        # 需要您替换成自己创建的Dashscope API-KEY 。
  2. 创建search.py文件,并将如下示例代码复制到search.py文件中,通过DashVector的向量检索能力来检索相似度的最高的内容。

    from dashvector import Client
    
    from embedding import generate_embeddings
    
    
    def search_relevant_news(question):
        # 初始化 dashvector client
        client = Client(
          api_key='{your-dashvector-api-key}',
          # 需要您替换成自己创建的Dashvector API-KEY。
          endpoint='{your-dashvector-cluster-endpoint}'
        )
    
        # 获取存入的集合
        collection = client.get('news_embedings')
        assert collection
    
        # 向量检索:指定 topk = 1 
        rsp = collection.query(generate_embeddings(question), output_fields=['raw'],
                               topk=1)
        assert rsp
        return rsp.output[0].fields['raw']
  3. 创建answer.py文件,就可以按照特定的模板作为Prompt向LLM发起提问了,本示例中选用的LLM是通义千问,代码示例如下:

    from dashscope import Generation
    
    def answer_question(question, context):
        prompt = f'''请基于```内的内容回答问题。"
    	```
    	{context}
    	```
    	我的问题是:{question}。
        '''
        rsp = Generation.call(model='qwen-turbo', prompt=prompt)
        return rsp.output.text
  4. 创建run.py文件,并将如下示例代码复制到run.py文件中,并执行run.py文件。

    import dashscope
    
    from search import search_relevant_news
    from answer import answer_question
    
    if __name__ == '__main__':
        dashscope.api_key = '{your-dashscope-api-key}'
        # 需要您替换成自己创建的DashScope API-KEY。
    
        question = 'EventBridge 是什么,它有哪些能力?'
        context = search_relevant_news(question)
        answer = answer_question(question, context)
    
        print(f'question: {question}\n' f'answer: {answer}')