通过RAG(检索增强生成)技术和LLM(大语言模型)的结合,助力企业客户从海量文档中快速检索信息,本文为您介绍具体的操作流程。
背景信息
基于RAG(Retrieval-Augmented Generation)的文档问答常用于企业内部知识库检索与总结、垂直领域的在线问答等业务场景。基于客户的专业知识库文档,通过RAG(检索增强生成)技术和LLM(大语言模型),理解和响应复杂的自然语言查询,帮助企业客户通过自然语言快速从PDF、WORD、表格、图片文档中检索到所需信息。
前提条件
操作流程
步骤1:载入Notebook案例
找到对应的案例卡片,单击卡片中的载入案例。
选择载入到的工作空间和实例,单击确认,进入DataWorks数据开发页面。
步骤2:开发环境准备
在下方单元格中,包含了七个变量。请在运行之前将变量替换为具体值,以确保运行成功。
${your-search-platform-public-endpoint}
:您可以进入AI搜索开放平台,查看Endpoint。${your-search-platform-api-key}
:您可以进入AI搜索开放平台,查看API-KEY。您可以进入OpenSearch引擎控制台,查看以下信息:
${your-opensearch-public_endpoint}
:OpenSearch实例的Endpoint。${your-opensearch-instance_id}
:OpenSearch实例的ID。${your-opensearch-table-name}
:OpenSearch表名。${your-opensearch-instance-username}
:OpenSearch用户名。${your-opensearch-instance-password}
:OpenSearch密码。
# AI搜索开放平台配置
search_plat_host = "${your-search-platform-public-endpoint}"
search_plat_api_key = "${your-search-platform-api-key}"
# OpenSearch向量检索版配置
os_vectorstore_host = "${your-opensearch-public_endpoint}"
os_vectorstore_instance_id = "${your-opensearch-instance_id}"
os_vectorstore_table_name = "${your-opensearch-table-name}"
os_vectorstore_user_name = "${your-opensearch-instance-username}"
os_vectorstore_user_password = "${your-opensearch-instance-password}"
# 输入文档url,示例文档为opensearch产品说明文档
document_url = "https://help.aliyun.com/zh/open-search/search-platform/product-overview/introduction-to-search-platform"
在下方单元格中,包含了七个变量。请在运行之前将变量替换为具体值,以确保运行成功。
您可以在数据湖构建服务(DLF)控制台,查看以下信息:
${dlf_region}
:DLF所在的可用区,请您根据地域进行选择。支持的地域请参见地域及访问域名。${dlf_catalog_id}
:DLF数据目录的ID。${dlf_catalog_name}
:DLF数据目录的名称。${dlf_database_name}
:DLF数据库的名称。${dlf_table_name}
:DLF数据表的名称。${dlf_catalog_accessKeyId}
:访问DLF服务所需的Access Key ID。获取方法请参见创建AccessKey。${dlf_catalog_accessKeySecret}
:访问DLF服务所需的Access Key Secret。获取方法请参见创建AccessKey。
# 请在下方填写全局参数的值,再运行代码
dlf_region = "${dlf_region}" #请将 [dlf_region] 替换为您的DLF所在地域ID,例如cn-beijing
dlf_catalog_id = "${dlf_catalog_id}" #请将 [dlf_catalog_id] 替换为您DLF数据目录ID
dlf_catalog_name = "myfirstcatalog" #请将 [dlf_catalog_name]替换为您的目标DLF Catalog名称;如果已进行OpenLake一体化开通,推荐填写:"myfirstcatalog"
dlf_database_name = "opensearch_db" #请将 [dlf_database_name]替换为您的目标DLF 数据库名称;如果已进行OpenLake一体化开通,推荐填写:"opensearch_db"
dlf_table_name = "rag" #请将[dlf_table_name]替换成自定义表名称,推荐填写:"rag"
dlf_catalog_accessKeyId = "${dlf_catalog_accessKeyId}" #请将 [dlf_catalog_accessKeyId] 替换为您访问DLF服务所需的AccessKeyId
dlf_catalog_accessKeySecret = "${dlf_catalog_accessKeySecret}" #请将 [dlf_catalog_accessKeySecret] 替换为您访问DLF服务所需的AccessKeySecret
# DLF服务的Endpoint
dlf_endpoint = f"dlfnext-vpc.{dlf_region}.aliyuncs.com"
需要提前安装Python 3.7及以上版本,请确保安装相应的Python版本。AI搜索开放平台和OpenSearch向量检索服务相关的环境依赖如下。
! pip install -q alibabacloud_searchplat20240529 alibabacloud_ha3engine_vector openai
步骤3:开发环境初始化
这部分是离线文档处理和在线文档问答的公共代码,包含导入依赖包、初始化搜索开发平台client,搜索引擎配置等。
import asyncio
import json
from operator import attrgetter
from typing import List
from alibabacloud_ha3engine_vector import models, client
from alibabacloud_ha3engine_vector.models import QueryRequest, SparseData
from Tea.exceptions import TeaException, RetryError
from alibabacloud_tea_openapi.models import Config
from alibabacloud_searchplat20240529.client import Client
from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, GetDocumentSplitRequestDocument, \
GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, GetTextSparseEmbeddingRequest, \
GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddingsEmbedding, \
GetImageAnalyzeTaskStatusResponse, CreateImageAnalyzeTaskRequest, GetImageAnalyzeTaskStatusRequest, \
CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskResponse,GetDocumentRankRequest, \
GetTextGenerationRequest, GetTextGenerationRequestMessages, GetQueryAnalysisRequest
from openai import OpenAI
workspace_name = "default"
service_id_config = {"document_analyze": "ops-document-analyze-001",
"split": "ops-document-split-001",
"image_analyze": "ops-image-analyze-ocr-001",
"rank": "ops-bge-reranker-larger",
"text_embedding": "ops-text-embedding-002",
"text_sparse_embedding": "ops-text-sparse-embedding-001",
"llm": "ops-qwen-turbo",
"query_analyze": "ops-query-analyze-001"}
# 生成AI搜索开放平台client
config = Config(bearer_token=search_plat_api_key, endpoint=search_plat_host, protocol="http", read_timeout=30000)
ops_client = Client(config=config)
openAIclient = OpenAI(
api_key=search_plat_api_key,
base_url="http://" + search_plat_host + "/compatible-mode/v1/"
)
步骤4:离线文档处理并写入Paimon表
离线文档处理主要通过搜索开发平台上的文档理解、切片、和向量化等各类服务,将文档数据解析和提取、文档切片、切片Embedding并最终推送到Paimon表中。新的数据写入Paimon表之后,OpenSearch可以实时感知并构建实时索引,实现数据写入后的秒级实时查询。
# 初始化Paimon表信息
import os
import pandas as pd
import pyarrow as pa
from paimon_python_java import Catalog
from paimon_python_api import Schema
# create catalog
catalog_options = {
'metastore': 'dlf-paimon',
'dlf.endpoint': dlf_endpoint,
'dlf.region': dlf_region ,
'dlf.catalog.id': dlf_catalog_id,
'dlf.catalog.accessKeyId': dlf_catalog_accessKeyId ,
'dlf.catalog.accessKeySecret': dlf_catalog_accessKeySecret,
}
catalog = Catalog.create(catalog_options)
pa_schema = pa.schema([
('id', pa.string()),
('title', pa.string()),
('text', pa.string()),
('url', pa.string()),
('embedding', pa.string()),
('token_id', pa.string()),
('token_weight', pa.string())
])
schema = Schema(pa_schema=pa_schema)
# 创建Paimon表
catalog.create_table(f'{dlf_database_name}.{dlf_table_name}', schema, True)
import os
import pandas as pd
import pyarrow as pa
async def write_to_paimon(doc_list):
# get table
table = catalog.get_table(f"{dlf_database_name}.{dlf_table_name}")
# write data
# 1. Create table write and commit
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
# convert data source to arrow RecordBatch
df = pd.DataFrame(doc_list)
record_batch = pa.RecordBatch.from_pandas(df, pa_schema)
# 3. Write data. You can write many times.
table_write.write_arrow_batch(record_batch)
# 4. Commit data. If you commit, you cannot write more data.
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)
# 5. Close resources.
table_write.close()
table_commit.close()
async def poll_doc_analyze_task_result(ops_client, task_id, service_id, interval=5):
while True:
request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
status = response.body.result.status
if status == "PENDING":
await asyncio.sleep(interval)
elif status == "SUCCESS":
return response
else:
print("error: " + response.body.result.error)
raise Exception("document analyze task failed")
def is_analyzable_url(url:str):
if not url:
return False
image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
return url.lower().endswith(tuple(image_extensions))
async def image_analyze(ops_client, url):
try:
print("image analyze :" + url)
if url.startswith("//"):
url = "https:" + url
if not is_analyzable_url(url):
print(url + " is not analyzable.")
return url
image_analyze_service_id = service_id_config["image_analyze"]
document = CreateImageAnalyzeTaskRequestDocument(
url=url,
)
request = CreateImageAnalyzeTaskRequest(document=document)
response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
task_id = response.body.result.task_id
while True:
request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
status = response.body.result.status
if status == "PENDING":
await asyncio.sleep(5)
elif status == "SUCCESS":
return url + response.body.result.data.content
else:
print("image analyze error: " + response.body.result.error)
return url
except Exception as e:
print(f"image analyze Exception : {e}")
def chunk_list(lst, chunk_size):
for i in range(0, len(lst), chunk_size):
yield lst[i:i + chunk_size]
async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):
# 生成opensearch开发平台client
config = Config(bearer_token=search_plat_api_key,endpoint=search_plat_host,protocol="http")
ops_client = Client(config=config)
# Step 1: 文档解析
document_analyze_request = CreateDocumentAnalyzeTaskRequest(document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,file_name=file_name, file_type='html'))
document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,service_id=service_id_config["document_analyze"],request=document_analyze_request)
print("document_analyze task_id:" + document_analyze_response.body.result.task_id)
extraction_result = await poll_doc_analyze_task_result(ops_client, document_analyze_response.body.result.task_id, service_id_config["document_analyze"])
print("document_analyze done")
document_content = extraction_result.body.result.data.content
content_type = extraction_result.body.result.data.content_type
# Step 2: 文档切片
document_split_request = GetDocumentSplitRequest(
GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
document_split_request)
print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
+ " rich text count:" + str(len(document_split_result.body.result.rich_texts)))
# Step 3: 文本向量化
# 提取切片结果。图片切片会通过图片解析服务提取出文本内容
doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.chunks]
+ [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
+ [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client,chunk.content)} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
)
chunk_size = 32 # 一次最多允许计算32个embedding
all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
response = await ops_client.get_text_embedding_async(workspace_name,service_id_config["text_embedding"],GetTextEmbeddingRequest(chunk))
all_text_embeddings.extend(response.body.result.embeddings)
all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
response = await ops_client.get_text_sparse_embedding_async(workspace_name,service_id_config["text_sparse_embedding"],GetTextSparseEmbeddingRequest(chunk,input_type="document",return_token=True))
all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)
paimon_doc_list = []
for i in range(len(doc_list)):
paimon_doc = {}
paimon_doc["id"] = doc_list[i]["id"]
paimon_doc["title"] = ""
paimon_doc["text"] = doc_list[i]["content"]
paimon_doc["url"] = document_url
paimon_doc["embedding"] = ','.join([str(embedding) for embedding in all_text_embeddings[i].embedding])
sorted_sparse_embedding_result = sorted(all_text_sparse_embeddings[i].embedding, key=attrgetter('token_id'))
token_ids = [str(sparse_embedding.token_id) for sparse_embedding in sorted_sparse_embedding_result]
token_wights = [str(sparse_embedding.weight) for sparse_embedding in sorted_sparse_embedding_result]
paimon_doc["token_id"] = ','.join(token_ids)
paimon_doc["token_weight"] = ','.join(token_wights)
paimon_doc_list.append(paimon_doc)
print("text-embedding done")
for doc in paimon_doc_list:
print(doc)
# Step 4: 写入Paimon表
await write_to_paimon(paimon_doc_list)
配置项 | 说明 | 是否必填 | 备注 |
dlf.endpoint | DLF服务的Endpoint。 | 是 | 详情请参见地域及访问域名。 |
dlf.region | DLF所在区域。 | 是 | 详情请参见地域及访问域名。 说明 请和dlf.catalog.endpoint选择的地域保持一致。 |
dlf.catalog.id | DLF数据目录ID。 | 是 | 请在数据湖构建控制台的数据目录列表中查看Catalog ID。 |
dlf.tokenCache.meta.accessKeyId | 访问DLF服务所需的Access Key ID。 | 是 | 获取方法请参见创建AccessKey。 |
dlf.tokenCache.meta.accessKeySecret | 访问DLF服务所需的Access Key Secret。 | 是 | 获取方法请参见创建AccessKey。 |
dlf.user.name | DLF数据目录的Owner | 是 | 请在数据湖构建控制台上查看数据目录详细信息中的Owner,具体操作请参见查看数据目录。 |
执行以下脚本,运行离线异步任务。
if __name__ == "__main__":
import nest_asyncio # 如果在Jupyter notebook中运行,反注释这两行
nest_asyncio.apply() # 如果在Jupyter notebook中运行,反注释这两行
asyncio.run(document_pipeline_execute(document_url))
步骤5:从Paimon表创建索引
在DataWorks左侧栏中单击数据目录,找到之前创建的Paimon表,然后右键单击管理OpenSearch向量索引,选择ID为{your-opensearch-instance_id}
的OpenSearch实例,创建名为{your-opensearch-table-name}
的索引表,等待索引创建完成,即可进入下一步。
步骤6:在线文档问答
将Query转化为Embedding,使用OpenSearch向量检索版提供的向量检索能力找到与Query相似的文档切片,组装成Prompt,交给大模型进行答案生成。
async def os_vector_store_retrieve(query_emb, query_sparse_emb: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddingsEmbedding]): os_vector_store_config = models.Config( endpoint=os_vectorstore_host, instance_id=os_vectorstore_instance_id, protocol="http", access_user_name=os_vectorstore_user_name, access_pass_word=os_vectorstore_user_password ) # 初始化OpenSearch向量引擎客户端 os_vector_store = client.Client(os_vector_store_config) sorted_sparse_embedding_result = sorted(query_sparse_emb, key=attrgetter('token_id')) token_ids = [sparse_embedding.token_id for sparse_embedding in sorted_sparse_embedding_result] token_wights = [sparse_embedding.weight for sparse_embedding in sorted_sparse_embedding_result] sparseData = SparseData(indices=token_ids, values=token_wights) request = QueryRequest(table_name=os_vectorstore_table_name, vector=query_emb, sparse_data=sparseData, include_vector=True, output_fields=["id", "text"], top_k=5) result = await os_vector_store.query_async(request) jsonResult = json.loads(result.body) search_results = [result['fields']['text'] for result in jsonResult['result']] return search_results async def get_query_result(query): # 获取query的稠密向量 query_emb_result = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"], GetTextEmbeddingRequest(input=[query], input_type="query")) query_emb = query_emb_result.body.result.embeddings[0].embedding # 获取query的稀疏向量 query_sparse_emb = await ops_client.get_text_sparse_embedding_async(workspace_name, service_id_config["text_sparse_embedding"], GetTextSparseEmbeddingRequest( input=[query], input_type="query")) # 使用query的稠密向量和稀疏向量去召回相关文档 search_results = await os_vector_store_retrieve(query_emb,query_sparse_emb.body.result.sparse_embeddings[0].embedding) return search_results # 在线问答,输入是用户问题 async def query_pipeline_execute(user_query): # Step 1: query分析 query_analyze_response = ops_client.get_query_analysis(workspace_name, service_id_config["query_analyze"], GetQueryAnalysisRequest(query=user_query)) print("问题:" + user_query) # Step 2: 召回文档 all_query_results = [] user_query_results = await get_query_result(user_query) all_query_results.extend(user_query_results) rewrite_query_results = await get_query_result(query_analyze_response.body.result.query) all_query_results.extend(rewrite_query_results) for extend_query in query_analyze_response.body.result.queries: extend_query_result = await get_query_result(extend_query) all_query_results.extend(extend_query_result) # 对所有召回结果进行去重 remove_duplicate_results = list(set(all_query_results)) # Step 3: 对去重后的召回文档进行重排序 rerank_top_k = 3 score_results = await ops_client.get_document_rank_async(workspace_name, service_id_config["rank"],GetDocumentRankRequest(remove_duplicate_results, user_query)) rerank_results = [remove_duplicate_results[item.index] for item in score_results.body.result.scores[:rerank_top_k]] # Step 4: 调用大模型回答问题 docs = '\n'.join([f"<article>{s}</article>" for s in rerank_results]) completion = openAIclient.chat.completions.create( model = "ops-qwen-turbo", messages = [ {"role": "user", "content": f"""已知信息包含多个独立文档,每个文档在<article>和</article>之间,已知信息如下:\n'''{docs}''' \n\n根据上述已知信息,详细且有条理地回答用户的问题。确保答案充分回答了问题并且正确使用了已知信息。如果信息不足以回答问题,请说“根据已知信息无法回答该问题”。 不要使用不在已知信息中的内容生成答案,确保答案中每一个陈述在上述已知信息中有相应内容支撑。答案请使用中文。 \n问题是:'''{user_query}'''"""""} ], stream=True ) print("\n答案:", end="") for resp in completion: print(resp.choices[0].delta.content.replace("**",""), end="")
设定输入的查询问题。
# 用户query: user_query = "AI搜索开放平台有什么特点?"
执行以下脚本,运行离线异步任务。
if __name__ == "__main__": import nest_asyncio # 如果在Jupyter notebook中运行,反注释这行 nest_asyncio.apply() # 如果在Jupyter notebook中运行,反注释这行 asyncio.run(query_pipeline_execute(user_query))