RAG知识库在线问答

通过RAG(检索增强生成)技术和LLM(大语言模型)的结合,助力企业客户从海量文档中快速检索信息,本文为您介绍具体的操作流程。

背景信息

基于RAG(Retrieval-Augmented Generation)的文档问答常用于企业内部知识库检索与总结、垂直领域的在线问答等业务场景。基于客户的专业知识库文档,通过RAG(检索增强生成)技术和LLM(大语言模型),理解和响应复杂的自然语言查询,帮助企业客户通过自然语言快速从PDF、WORD、表格、图片文档中检索到所需信息。

前提条件

  • 已创建DLF 2.0数据目录。如未创建,详情请参见创建数据目录

    说明

    如果是RAM用户,在进行数据操作之前,需要先授予相应的资源权限。详情请参见授权管理

  • 已开通AI搜索开放平台,并获得API-KEY。

  • 已开通OpenSearch向量检索版实例,实例的引擎版本需要不低于1.3.0。

操作流程

步骤1:载入Notebook案例

  1. 登录DataWorks Gallery控制台

  2. 找到对应的案例卡片,单击卡片中的载入案例

  3. 选择载入到的工作空间和实例,单击确认,进入DataWorks数据开发页面。

步骤2:开发环境准备

在下方单元格中,包含了七个变量。请在运行之前将变量替换为具体值,以确保运行成功。

  • ${your-search-platform-public-endpoint}:您可以进入AI搜索开放平台,查看Endpoint。

  • ${your-search-platform-api-key}:您可以进入AI搜索开放平台,查看API-KEY。

  • 您可以进入OpenSearch引擎控制台,查看以下信息:

    • ${your-opensearch-public_endpoint}:OpenSearch实例的Endpoint。

    • ${your-opensearch-instance_id}:OpenSearch实例的ID。

    • ${your-opensearch-table-name}:OpenSearch表名。

    • ${your-opensearch-instance-username}:OpenSearch用户名。

    • ${your-opensearch-instance-password}:OpenSearch密码。

# AI搜索开放平台配置
search_plat_host = "${your-search-platform-public-endpoint}"
search_plat_api_key = "${your-search-platform-api-key}"

# OpenSearch向量检索版配置
os_vectorstore_host = "${your-opensearch-public_endpoint}"  
os_vectorstore_instance_id = "${your-opensearch-instance_id}"         
os_vectorstore_table_name = "${your-opensearch-table-name}"
os_vectorstore_user_name = "${your-opensearch-instance-username}"                
os_vectorstore_user_password = "${your-opensearch-instance-password}"

# 输入文档url,示例文档为opensearch产品说明文档
document_url = "https://help.aliyun.com/zh/open-search/search-platform/product-overview/introduction-to-search-platform"

在下方单元格中,包含了七个变量。请在运行之前将变量替换为具体值,以确保运行成功。

您可以在数据湖构建服务(DLF)控制台,查看以下信息:

  • ${dlf_region}:DLF所在的可用区,请您根据地域进行选择。支持的地域请参见地域及访问域名

  • ${dlf_catalog_id}:DLF数据目录的ID。

  • ${dlf_catalog_name}:DLF数据目录的名称。

  • ${dlf_database_name}:DLF数据库的名称。

  • ${dlf_table_name}:DLF数据表的名称。

  • ${dlf_catalog_accessKeyId}:访问DLF服务所需的Access Key ID。获取方法请参见创建AccessKey

  • ${dlf_catalog_accessKeySecret}:访问DLF服务所需的Access Key Secret。获取方法请参见创建AccessKey

# 请在下方填写全局参数的值,再运行代码
dlf_region = "${dlf_region}" #请将 [dlf_region] 替换为您的DLF所在地域ID,例如cn-beijing
dlf_catalog_id = "${dlf_catalog_id}"   #请将 [dlf_catalog_id] 替换为您DLF数据目录ID
dlf_catalog_name = "myfirstcatalog" #请将 [dlf_catalog_name]替换为您的目标DLF Catalog名称;如果已进行OpenLake一体化开通,推荐填写:"myfirstcatalog"
dlf_database_name = "opensearch_db" #请将 [dlf_database_name]替换为您的目标DLF 数据库名称;如果已进行OpenLake一体化开通,推荐填写:"opensearch_db"
dlf_table_name = "rag" #请将[dlf_table_name]替换成自定义表名称,推荐填写:"rag"
dlf_catalog_accessKeyId = "${dlf_catalog_accessKeyId}" #请将 [dlf_catalog_accessKeyId] 替换为您访问DLF服务所需的AccessKeyId
dlf_catalog_accessKeySecret = "${dlf_catalog_accessKeySecret}" #请将 [dlf_catalog_accessKeySecret] 替换为您访问DLF服务所需的AccessKeySecret

# DLF服务的Endpoint
dlf_endpoint = f"dlfnext-vpc.{dlf_region}.aliyuncs.com"

需要提前安装Python 3.7及以上版本,请确保安装相应的Python版本。AI搜索开放平台和OpenSearch向量检索服务相关的环境依赖如下。

! pip install -q alibabacloud_searchplat20240529 alibabacloud_ha3engine_vector openai

步骤3:开发环境初始化

这部分是离线文档处理和在线文档问答的公共代码,包含导入依赖包、初始化搜索开发平台client,搜索引擎配置等。

import asyncio
import json
from operator import attrgetter
from typing import List
from alibabacloud_ha3engine_vector import models, client
from alibabacloud_ha3engine_vector.models import QueryRequest, SparseData
from Tea.exceptions import TeaException, RetryError
from alibabacloud_tea_openapi.models import Config
from alibabacloud_searchplat20240529.client import Client
from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
    CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, GetDocumentSplitRequestDocument, \
    GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, GetTextSparseEmbeddingRequest, \
    GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddingsEmbedding, \
    GetImageAnalyzeTaskStatusResponse, CreateImageAnalyzeTaskRequest, GetImageAnalyzeTaskStatusRequest, \
    CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskResponse,GetDocumentRankRequest, \
    GetTextGenerationRequest, GetTextGenerationRequestMessages, GetQueryAnalysisRequest
from openai import OpenAI

workspace_name = "default"
service_id_config = {"document_analyze": "ops-document-analyze-001",
                     "split": "ops-document-split-001",
                     "image_analyze": "ops-image-analyze-ocr-001",
                     "rank": "ops-bge-reranker-larger",
                     "text_embedding": "ops-text-embedding-002",
                     "text_sparse_embedding": "ops-text-sparse-embedding-001",
                     "llm": "ops-qwen-turbo",
                     "query_analyze": "ops-query-analyze-001"}

# 生成AI搜索开放平台client
config = Config(bearer_token=search_plat_api_key, endpoint=search_plat_host, protocol="http", read_timeout=30000)
ops_client = Client(config=config)

openAIclient = OpenAI(
	api_key=search_plat_api_key,
    base_url="http://" + search_plat_host + "/compatible-mode/v1/"
)

步骤4:离线文档处理并写入Paimon表

离线文档处理主要通过搜索开发平台上的文档理解、切片、和向量化等各类服务,将文档数据解析和提取、文档切片、切片Embedding并最终推送到Paimon表中。新的数据写入Paimon表之后,OpenSearch可以实时感知并构建实时索引,实现数据写入后的秒级实时查询。

# 初始化Paimon表信息
import os
import pandas as pd
import pyarrow as pa

from paimon_python_java import Catalog
from paimon_python_api import Schema

# create catalog
catalog_options = {
    'metastore': 'dlf-paimon',
    'dlf.endpoint': dlf_endpoint,
    'dlf.region': dlf_region ,
    'dlf.catalog.id': dlf_catalog_id,
    'dlf.catalog.accessKeyId': dlf_catalog_accessKeyId ,
    'dlf.catalog.accessKeySecret': dlf_catalog_accessKeySecret,
}

catalog = Catalog.create(catalog_options)

pa_schema = pa.schema([
    ('id', pa.string()),
    ('title', pa.string()),
    ('text', pa.string()),
    ('url', pa.string()),
    ('embedding', pa.string()),
    ('token_id', pa.string()),
    ('token_weight', pa.string())
])

schema = Schema(pa_schema=pa_schema)

# 创建Paimon表
catalog.create_table(f'{dlf_database_name}.{dlf_table_name}', schema, True)

import os
import pandas as pd
import pyarrow as pa

async def write_to_paimon(doc_list):
    # get table
    table = catalog.get_table(f"{dlf_database_name}.{dlf_table_name}")

    # write data
    # 1. Create table write and commit
    write_builder = table.new_batch_write_builder()
    table_write = write_builder.new_write()
    table_commit = write_builder.new_commit()

    # convert data source to arrow RecordBatch
    df = pd.DataFrame(doc_list)
    record_batch = pa.RecordBatch.from_pandas(df, pa_schema)

    # 3. Write data. You can write many times.
    table_write.write_arrow_batch(record_batch)

    # 4. Commit data. If you commit, you cannot write more data.
    commit_messages = table_write.prepare_commit()
    table_commit.commit(commit_messages)

    # 5. Close resources.
    table_write.close()
    table_commit.close()

async def poll_doc_analyze_task_result(ops_client, task_id, service_id, interval=5):
    while True:
        request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
        response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
        status = response.body.result.status
        if status == "PENDING":
            await asyncio.sleep(interval)
        elif status == "SUCCESS":
            return response
        else:
            print("error: " + response.body.result.error)
            raise Exception("document analyze task failed")

def is_analyzable_url(url:str):
    if not url:
        return False
    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
    return url.lower().endswith(tuple(image_extensions))

async def image_analyze(ops_client, url):
    try:
        print("image analyze :" + url)
        if url.startswith("//"):
            url = "https:" + url
        if not is_analyzable_url(url):
            print(url + " is not analyzable.")
            return url
        image_analyze_service_id = service_id_config["image_analyze"]
        document = CreateImageAnalyzeTaskRequestDocument(
            url=url,
        )
        request = CreateImageAnalyzeTaskRequest(document=document)
        response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
        task_id = response.body.result.task_id
        while True:
            request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
            response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
            status = response.body.result.status
            if status == "PENDING":
                await asyncio.sleep(5)
            elif status == "SUCCESS":
                return url + response.body.result.data.content
            else:
                print("image analyze error: " + response.body.result.error)
                return url
    except Exception as e:
        print(f"image analyze Exception : {e}")

def chunk_list(lst, chunk_size):
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]

async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):

    # 生成opensearch开发平台client
    config = Config(bearer_token=search_plat_api_key,endpoint=search_plat_host,protocol="http")
    ops_client = Client(config=config)

    # Step 1: 文档解析
    document_analyze_request = CreateDocumentAnalyzeTaskRequest(document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,file_name=file_name, file_type='html'))
    document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,service_id=service_id_config["document_analyze"],request=document_analyze_request)
    print("document_analyze task_id:" + document_analyze_response.body.result.task_id)
    extraction_result = await poll_doc_analyze_task_result(ops_client, document_analyze_response.body.result.task_id, service_id_config["document_analyze"])
    print("document_analyze done")
    document_content = extraction_result.body.result.data.content
    content_type = extraction_result.body.result.data.content_type

    # Step 2: 文档切片
    document_split_request = GetDocumentSplitRequest(
        GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
    document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
                                                                      document_split_request)
    print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
          + " rich text count:" + str(len(document_split_result.body.result.rich_texts)))

    # Step 3: 文本向量化
    # 提取切片结果。图片切片会通过图片解析服务提取出文本内容
    doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.chunks]
                + [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
                + [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client,chunk.content)} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
                )
    chunk_size = 32  # 一次最多允许计算32个embedding
    all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
    for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
        response = await ops_client.get_text_embedding_async(workspace_name,service_id_config["text_embedding"],GetTextEmbeddingRequest(chunk))
        all_text_embeddings.extend(response.body.result.embeddings)

    all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
    for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
        response = await ops_client.get_text_sparse_embedding_async(workspace_name,service_id_config["text_sparse_embedding"],GetTextSparseEmbeddingRequest(chunk,input_type="document",return_token=True))
        all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)

    paimon_doc_list = []
    for i in range(len(doc_list)):
        paimon_doc = {}
        paimon_doc["id"] = doc_list[i]["id"] 
        paimon_doc["title"] = "" 
        paimon_doc["text"] = doc_list[i]["content"] 
        paimon_doc["url"] = document_url
        paimon_doc["embedding"] = ','.join([str(embedding) for embedding in all_text_embeddings[i].embedding])
        sorted_sparse_embedding_result = sorted(all_text_sparse_embeddings[i].embedding, key=attrgetter('token_id'))
        token_ids = [str(sparse_embedding.token_id) for sparse_embedding in sorted_sparse_embedding_result]
        token_wights = [str(sparse_embedding.weight) for sparse_embedding in sorted_sparse_embedding_result]

        paimon_doc["token_id"] = ','.join(token_ids)
        paimon_doc["token_weight"] = ','.join(token_wights)
        paimon_doc_list.append(paimon_doc)
            
    print("text-embedding done")
    for doc in paimon_doc_list:
        print(doc)

    # Step 4: 写入Paimon表
    await write_to_paimon(paimon_doc_list)

配置项

说明

是否必填

备注

dlf.endpoint

DLF服务的Endpoint。

详情请参见地域及访问域名

dlf.region

DLF所在区域。

详情请参见地域及访问域名

说明

请和dlf.catalog.endpoint选择的地域保持一致。

dlf.catalog.id

DLF数据目录ID。

请在数据湖构建控制台的数据目录列表中查看Catalog ID。

dlf.tokenCache.meta.accessKeyId

访问DLF服务所需的Access Key ID。

获取方法请参见创建AccessKey

dlf.tokenCache.meta.accessKeySecret

访问DLF服务所需的Access Key Secret。

获取方法请参见创建AccessKey

dlf.user.name

DLF数据目录的Owner

请在数据湖构建控制台上查看数据目录详细信息中的Owner,具体操作请参见查看数据目录

执行以下脚本,运行离线异步任务。

if __name__ == "__main__":
    import nest_asyncio # 如果在Jupyter notebook中运行,反注释这两行
    nest_asyncio.apply() # 如果在Jupyter notebook中运行,反注释这两行
    asyncio.run(document_pipeline_execute(document_url))

步骤5:从Paimon表创建索引

在DataWorks左侧栏中单击数据目录,找到之前创建的Paimon表,然后右键单击管理OpenSearch向量索引,选择ID为{your-opensearch-instance_id}的OpenSearch实例,创建名为{your-opensearch-table-name}的索引表,等待索引创建完成,即可进入下一步。

步骤6:在线文档问答

  1. 将Query转化为Embedding,使用OpenSearch向量检索版提供的向量检索能力找到与Query相似的文档切片,组装成Prompt,交给大模型进行答案生成。

    async def os_vector_store_retrieve(query_emb, query_sparse_emb: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddingsEmbedding]):
        os_vector_store_config = models.Config(
            endpoint=os_vectorstore_host,
            instance_id=os_vectorstore_instance_id,
            protocol="http",
            access_user_name=os_vectorstore_user_name,
            access_pass_word=os_vectorstore_user_password
        )
        # 初始化OpenSearch向量引擎客户端
        os_vector_store = client.Client(os_vector_store_config)
        sorted_sparse_embedding_result = sorted(query_sparse_emb, key=attrgetter('token_id'))
        token_ids = [sparse_embedding.token_id for sparse_embedding in sorted_sparse_embedding_result]
        token_wights = [sparse_embedding.weight for sparse_embedding in sorted_sparse_embedding_result]
    
        sparseData = SparseData(indices=token_ids, values=token_wights)
        request = QueryRequest(table_name=os_vectorstore_table_name,
                               vector=query_emb,
                               sparse_data=sparseData,
                               include_vector=True,
                               output_fields=["id", "text"],
                               top_k=5)
        result = await os_vector_store.query_async(request)
        jsonResult = json.loads(result.body)
        search_results = [result['fields']['text'] for result in jsonResult['result']]
        return search_results
    
    async def get_query_result(query):
        # 获取query的稠密向量
        query_emb_result = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"],
                                                                     GetTextEmbeddingRequest(input=[query],
                                                                                             input_type="query"))
        query_emb = query_emb_result.body.result.embeddings[0].embedding
        # 获取query的稀疏向量
        query_sparse_emb = await ops_client.get_text_sparse_embedding_async(workspace_name,
                                                                            service_id_config["text_sparse_embedding"],
                                                                            GetTextSparseEmbeddingRequest(
                                                                                input=[query], input_type="query"))
        # 使用query的稠密向量和稀疏向量去召回相关文档
        search_results = await os_vector_store_retrieve(query_emb,query_sparse_emb.body.result.sparse_embeddings[0].embedding)
        return search_results
    
    # 在线问答,输入是用户问题
    async def query_pipeline_execute(user_query):
    
        # Step 1: query分析
        query_analyze_response = ops_client.get_query_analysis(workspace_name, service_id_config["query_analyze"], GetQueryAnalysisRequest(query=user_query))
        print("问题:" + user_query)
    
        # Step 2: 召回文档
        all_query_results = []
        user_query_results = await get_query_result(user_query)
        all_query_results.extend(user_query_results)
        rewrite_query_results = await get_query_result(query_analyze_response.body.result.query)
        all_query_results.extend(rewrite_query_results)
        for extend_query in query_analyze_response.body.result.queries:
            extend_query_result = await get_query_result(extend_query)
            all_query_results.extend(extend_query_result)
        # 对所有召回结果进行去重
        remove_duplicate_results = list(set(all_query_results))
    
        # Step 3: 对去重后的召回文档进行重排序
        rerank_top_k = 3
        score_results = await ops_client.get_document_rank_async(workspace_name, service_id_config["rank"],GetDocumentRankRequest(remove_duplicate_results, user_query))
        rerank_results = [remove_duplicate_results[item.index] for item in score_results.body.result.scores[:rerank_top_k]]
    
        # Step 4: 调用大模型回答问题
        docs = '\n'.join([f"<article>{s}</article>" for s in rerank_results])
        completion = openAIclient.chat.completions.create(
            model = "ops-qwen-turbo",
            messages = [
                {"role": "user", "content": f"""已知信息包含多个独立文档,每个文档在<article>和</article>之间,已知信息如下:\n'''{docs}'''
                                             \n\n根据上述已知信息,详细且有条理地回答用户的问题。确保答案充分回答了问题并且正确使用了已知信息。如果信息不足以回答问题,请说“根据已知信息无法回答该问题”。
                                             不要使用不在已知信息中的内容生成答案,确保答案中每一个陈述在上述已知信息中有相应内容支撑。答案请使用中文。
                                             \n问题是:'''{user_query}'''"""""}
            ],
            stream=True
        )
        print("\n答案:", end="")
        for resp in completion: 
            print(resp.choices[0].delta.content.replace("**",""), end="")
  2. 设定输入的查询问题。

    # 用户query:
    user_query = "AI搜索开放平台有什么特点?"
  3. 执行以下脚本,运行离线异步任务。

    if __name__ == "__main__":
        import nest_asyncio  # 如果在Jupyter notebook中运行,反注释这行
        nest_asyncio.apply() # 如果在Jupyter notebook中运行,反注释这行
        asyncio.run(query_pipeline_execute(user_query))