本文介绍在AI搜索开放平台进行多模态数据预处理流程。
应用场景
多模态数据预处理场景提供非结构化文档及图片处理方案,全链路分别由文档解析服务、图片解析服务、文档切分服务、文本向量化服务、文本稀疏向量化服务组成,您将体验到完整的数据处理流程。以上服务均需使用AI搜索开放平台的API服务,将按照实际调用产生费用。
前提条件
开通AI搜索开放平台服务,详情请参见开通服务。
获取服务调用地址和身份鉴权信息,详情请参见获取服务接入地址、获取API-KEY。
AI搜索开放平台支持通过公网和VPC地址调用服务,且可通过VPC实现跨地域调用服务。目前支持上海、杭州、深圳、北京、张家口、青岛地域的用户,通过VPC地址调用AI搜索开放平台的服务。
多模态数据预处理链路搭建
为方便用户使用,AI搜索开放平台提供三种类型的开发框架:
Python SDK。
如果业务已经使用LangChain开发框架,在开发框架中选择LangChain。
如果业务已经使用LlamaIndex开发框架,在开发框架中选择LlamaIndex。
步骤一:完成服务选型和代码下载
本文以Python SDK开发框架为例介绍如何搭建多模态数据预处理链路。
登录AI搜索开放平台控制台。
选择上海地域,切换到AI搜索开放平台,切换到目标空间。
说明目前仅支持在上海开通AI搜索开放平台功能。
支持杭州、深圳、北京、张家口、青岛地域的用户,通过VPC地址跨地域调用AI搜索开放平台的服务。
在左侧导航栏选择场景中心,选择多模态数据预处理场景-数据解析和向量化右侧的进入。
根据服务信息结合业务特点,从下拉列表中选择所需服务,服务详情页面可查看服务详细信息。
说明通过API调用多模态数据预处理链路中的算法服务时,需要提供服务ID(service_id),如文档内容解析服务的ID为ops-document-analyze-001。
从服务列表中切换服务后,生成代码中的service_id会同步更新。当代码下载到本地环境后,您仍可以更改service_id,调用对应服务。
环节
服务说明
文档内容解析
文档内容解析服务(ops-document-analyze-001):提供通用文档解析服务,支持从非结构化文档(文本、表格、图片等)中提取标题、分段等逻辑层级结构,以结构化格式输出。
图片内容解析
图片内容理解服务(ops-image-analyze-vlm-001):可基于多模态大模型对图片内容进行解析理解以及文字识别,解析后的文本可用于图片检索问答场景。
图片文本识别服务(ops-image-analyze-ocr-001):使用OCR能力进行图片文字识别,解析后的文本可用于图片检索问答场景。
文档切片
文档切片服务(ops-document-split-001):提供通用文本切片服务,支持基于文档段落、文本语义、指定规则,对HTML、Markdown、txt格式的结构化数据进行拆分,同时支持以富文本形式提取文档中的代码、图片以及表格。
文本向量化
OpenSearch文本向量化服务-001(ops-text-embedding-001):提供多语言(40+)文本向量化服务,输入文本最大长度300,输出向量维度1536维。
OpenSearch通用文本向量化服务-002(ops-text-embedding-002):提供多语言(100+)文本向量化服务,输入文本最大长度8192,输出向量维度1024维。
OpenSearch文本向量化服务-中文-001(ops-text-embedding-zh-001):提供中文文本向量化服务,输入文本最大长度1024,输出向量维度768维。
OpenSearch文本向量化服务-英文-001(ops-text-embedding-en-001):提供英文文本向量化服务,输入文本最大长度512,输出向量维度768维。
文本稀疏向量化
提供将文本数据转化为稀疏向量形式表达的服务,稀疏向量存储空间更小,常用于表达关键词和词频信息,可与稠密向量搭配进行混合检索,提升检索效果。
OpenSearch文本稀疏向量化服务(ops-text-sparse-embedding-001):提供多语言(100+)文本向量化服务,输入文本最大长度8192。
完成服务选型后,单击配置完成,进入代码查询查看和下载代码,按照应用调用数据预处理链路时的运行流程:
作用 | 说明 |
负责文档处理,包含文档解析/图片解析、文档切片、文本向量化。 | 使用主函数document_pipeline_execute完成以下流程,可通过文档URL或Base64编码输入待处理文档。
|
选择代码查询下的文档解析与向量化,单击复制代码或者下载文件,将代码下载到本地。
步骤二:本地环境适配和测试数据预处理开发链路
将代码下载到本地文件后,需要配置代码中的关键参数。
类别 | 参数 | 说明 |
AI搜索开放平台 | api_key | API调用密钥,获取方式请参见管理API Key。 |
aisearch_endpoint | API调用地址,获取方式请参见获取服务接入地址。 说明 注意需要去掉“http://”。 支持通过公网和VPC两种方式调用API。 | |
workspace_name | AI搜索开放平台中的空间名称。 | |
service_id | 服务ID,为操作方便,可以通过service_id_config配置各项服务以及ID。 |
完成参数配置后即可在Python 3.8.1及以上版本环境中运行代码,测试结果是否正确。
如在代码中对AI搜索开放平台介绍进行数据预处理,运行结果如下:
文档解析与向量化文件:
# 多模态数据处理链路
# 环境需求:
# Python版本:3.7及以上
# 包需求:
# pip install alibabacloud_searchplat20240529
# AI搜索开放平台配置
aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
api_key = "OS-xxx"
workspace_name = "default"
service_id_config = {"document_analyze": "ops-document-analyze-001",
"split": "ops-document-split-001",
"text_embedding": "ops-text-embedding-001",
"text_sparse_embedding": "ops-text-sparse-embedding-001",
"image_analyze": "ops-image-analyze-ocr-001"}
# 输入文档url,示例文档为opensearch产品说明文档
document_url = "https://help.aliyun.com/zh/open-search/search-platform/product-overview/introduction-to-search-platform?spm=a2c4g.11186623.0.0.7ab93526WDzQ8z"
import asyncio
from operator import attrgetter
from typing import List
from Tea.exceptions import TeaException, RetryError
from alibabacloud_tea_openapi.models import Config
from alibabacloud_searchplat20240529.client import Client
from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, \
GetDocumentSplitRequestDocument, GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, \
GetTextSparseEmbeddingRequest, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, \
GetImageAnalyzeTaskStatusResponse, CreateImageAnalyzeTaskRequest, GetImageAnalyzeTaskStatusRequest, \
CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskResponse
async def poll_doc_analyze_task_result(ops_client, task_id, service_id, interval=5):
while True:
request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
status = response.body.result.status
if status == "PENDING":
await asyncio.sleep(interval)
elif status == "SUCCESS":
return response
else:
print("error: " + response.body.result.error)
raise Exception("document analyze task failed")
def is_analyzable_url(url:str):
if not url:
return False
image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
return url.lower().endswith(tuple(image_extensions))
async def image_analyze(ops_client, url):
try:
print("image analyze :" + url)
if url.startswith("//"):
url = "https:" + url
if not is_analyzable_url(url):
print(url + " is unanalysable.")
return url
image_analyze_service_id = service_id_config["image_analyze"]
document = CreateImageAnalyzeTaskRequestDocument(
url=url,
)
request = CreateImageAnalyzeTaskRequest(document=document)
response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
task_id = response.body.result.task_id
while True:
request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
status = response.body.result.status
if status == "PENDING":
await asyncio.sleep(5)
elif status == "SUCCESS":
return url + response.body.result.data.content
else:
print("image analyze error: " + response.body.result.error)
return url
except Exception as e:
print(f"image analyze Exception : {e}")
def chunk_list(lst, chunk_size):
for i in range(0, len(lst), chunk_size):
yield lst[i:i + chunk_size]
async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):
# 生成opensearch开发平台client
config = Config(bearer_token=api_key,endpoint=aisearch_endpoint,protocol="http")
ops_client = Client(config=config)
# Step 1: 文档解析/图片解析
document_analyze_request = CreateDocumentAnalyzeTaskRequest(document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,file_name=file_name, file_type='html'))
document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,service_id=service_id_config["document_analyze"],request=document_analyze_request)
print("document_analyze task_id:" + document_analyze_response.body.result.task_id)
extraction_result = await poll_doc_analyze_task_result(ops_client, document_analyze_response.body.result.task_id, service_id_config["document_analyze"])
print("document_analyze done")
document_content = extraction_result.body.result.data.content
content_type = extraction_result.body.result.data.content_type
# Step 2: 文档切片
document_split_request = GetDocumentSplitRequest(
GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
document_split_request)
print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
+ " rich text count:" + str(len(document_split_result.body.result.rich_texts)))
# Step 3: 文本向量化
# 提取切片结果。图片切片会通过图片解析服务提取出文本内容
doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.chunks]
+ [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
+ [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client,chunk.content)} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
)
chunk_size = 32 # 一次最多允许计算32个embedding
all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
response = await ops_client.get_text_embedding_async(workspace_name,service_id_config["text_embedding"],GetTextEmbeddingRequest(chunk))
all_text_embeddings.extend(response.body.result.embeddings)
all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
response = await ops_client.get_text_sparse_embedding_async(workspace_name,service_id_config["text_sparse_embedding"],GetTextSparseEmbeddingRequest(chunk,input_type="document",return_token=True))
all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)
for i in range(len(doc_list)):
doc_list[i]["embedding"] = all_text_embeddings[i].embedding
doc_list[i]["sparse_embedding"] = all_text_sparse_embeddings[i].embedding
print("text-embedding done.")
if __name__ == "__main__":
# 运行异步任务
# import nest_asyncio # 如果在Jupyter notebook中运行,反注释这两行
# nest_asyncio.apply() # 如果在Jupyter notebook中运行,反注释这两行
asyncio.run(document_pipeline_execute(document_url))
# asyncio.run(document_pipeline_execute(document_base64="eHh4eHh4eHg...", file_name="attention.pdf")) #另外一种调用方式