多模态数据解析及向量化

本文介绍在AI搜索开放平台进行多模态数据预处理流程。

应用场景

多模态数据预处理场景提供非结构化文档及图片处理方案,全链路分别由文档解析服务、图片解析服务、文档切分服务、文本向量化服务、文本稀疏向量化服务组成,您将体验到完整的数据处理流程。以上服务均需使用AI搜索开放平台的API服务,将按照实际调用产生费用。

前提条件

  • 开通AI搜索开放平台服务,详情请参见开通服务

  • 获取服务调用地址和身份鉴权信息,详情请参见获取服务接入地址获取API-KEY

    AI搜索开放平台支持通过公网和VPC地址调用服务,且可通过VPC实现跨地域调用服务。目前支持上海、杭州、深圳、北京、张家口、青岛地域的用户,通过VPC地址调用AI搜索开放平台的服务。

多模态数据预处理链路搭建

说明

为方便用户使用,AI搜索开放平台提供三种类型的开发框架:

  • Python SDK。

  • 如果业务已经使用LangChain开发框架,在开发框架中选择LangChain。

  • 如果业务已经使用LlamaIndex开发框架,在开发框架中选择LlamaIndex。

步骤一:完成服务选型和代码下载

本文以Python SDK开发框架为例介绍如何搭建多模态数据预处理链路。

  1. 登录AI搜索开放平台控制台

  2. 选择上海地域,切换到AI搜索开放平台,切换到目标空间。

    说明
    • 目前仅支持在上海开通AI搜索开放平台功能。

    • 支持杭州、深圳、北京、张家口、青岛地域的用户,通过VPC地址跨地域调用AI搜索开放平台的服务。

  3. 在左侧导航栏选择场景中心,选择多模态数据预处理场景-数据解析和向量化右侧的进入

  4. 根据服务信息结合业务特点,从下拉列表中选择所需服务,服务详情页面可查看服务详细信息。

    说明
    • 通过API调用多模态数据预处理链路中的算法服务时,需要提供服务ID(service_id),如文档内容解析服务的ID为ops-document-analyze-001。

    • 从服务列表中切换服务后,生成代码中的service_id会同步更新。当代码下载到本地环境后,您仍可以更改service_id,调用对应服务。

    环节

    服务说明

    文档内容解析

    文档内容解析服务(ops-document-analyze-001):提供通用文档解析服务,支持从非结构化文档(文本、表格、图片等)中提取标题、分段等逻辑层级结构,以结构化格式输出。

    图片内容解析

    • 图片内容理解服务(ops-image-analyze-vlm-001):可基于多模态大模型对图片内容进行解析理解以及文字识别,解析后的文本可用于图片检索问答场景。

    • 图片文本识别服务(ops-image-analyze-ocr-001):使用OCR能力进行图片文字识别,解析后的文本可用于图片检索问答场景。

    文档切片

    文档切片服务(ops-document-split-001):提供通用文本切片服务,支持基于文档段落、文本语义、指定规则,对HTML、Markdown、txt格式的结构化数据进行拆分,同时支持以富文本形式提取文档中的代码、图片以及表格。

    文本向量化

    • OpenSearch文本向量化服务-001(ops-text-embedding-001):提供多语言(40+)文本向量化服务,输入文本最大长度300,输出向量维度1536维。

    • OpenSearch通用文本向量化服务-002(ops-text-embedding-002):提供多语言(100+)文本向量化服务,输入文本最大长度8192,输出向量维度1024维。

    • OpenSearch文本向量化服务-中文-001(ops-text-embedding-zh-001):提供中文文本向量化服务,输入文本最大长度1024,输出向量维度768维。

    • OpenSearch文本向量化服务-英文-001(ops-text-embedding-en-001):提供英文文本向量化服务,输入文本最大长度512,输出向量维度768维。

    文本稀疏向量化

    提供将文本数据转化为稀疏向量形式表达的服务,稀疏向量存储空间更小,常用于表达关键词和词频信息,可与稠密向量搭配进行混合检索,提升检索效果。

    OpenSearch文本稀疏向量化服务(ops-text-sparse-embedding-001):提供多语言(100+)文本向量化服务,输入文本最大长度8192。

完成服务选型后,单击配置完成,进入代码查询查看和下载代码,按照应用调用数据预处理链路时的运行流程:

作用

说明

负责文档处理,包含文档解析/图片解析、文档切片、文本向量化。

使用主函数document_pipeline_execute完成以下流程,可通过文档URL或Base64编码输入待处理文档。

  1. 文档解析、图片解析,服务调用请参见文档解析API图片内容提取API

    • 调用异步文档解析接口,从文档URL地址中提取文档内容,或者从Base64编码文件中进行解码。

    • 调用异步图片解析接口,从图片URL地址中提取图片内容,或者从Base64编码文件中进行解码。

  2. 文档切片,服务调用请参见文档切片API

    • 调用文档切片接口,将解析后的文档按指定策略切片。

    • 通过document_split函数进行文档切片,包含文档切片和富文本内容解析两部分。

  3. 文本向量化、文本稀疏向量化,服务调用请参见文本向量化API文本稀疏向量化

    • 调用文本向量化接口,将切分后的数据转化为稠密向量。

    • 调用文本稀疏向量化接口,将切分后的数据转化为稀疏向量。后续如需进行内容检索,可将向量数据写入搜索引擎。

选择代码查询下的文档解析与向量化,单击复制代码或者下载文件,将代码下载到本地。

步骤二:本地环境适配和测试数据预处理开发链路

将代码下载到本地文件后,需要配置代码中的关键参数。

类别

参数

说明

AI搜索开放平台

api_key

API调用密钥,获取方式请参见管理API Key

aisearch_endpoint

API调用地址,获取方式请参见获取服务接入地址

说明

注意需要去掉“http://”。

支持通过公网和VPC两种方式调用API。

workspace_name

AI搜索开放平台中的空间名称。

service_id

服务ID,为操作方便,可以通过service_id_config配置各项服务以及ID。

image

完成参数配置后即可在Python 3.8.1及以上版本环境中运行代码,测试结果是否正确。

如在代码中对AI搜索开放平台介绍进行数据预处理,运行结果如下:

image

文档解析与向量化文件:

# 多模态数据处理链路

# 环境需求:
# Python版本:3.7及以上


# 包需求:
# pip install alibabacloud_searchplat20240529


# AI搜索开放平台配置
aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
api_key = "OS-xxx"
workspace_name = "default"
service_id_config = {"document_analyze": "ops-document-analyze-001",
                     "split": "ops-document-split-001",
                     "text_embedding": "ops-text-embedding-001",
                     "text_sparse_embedding": "ops-text-sparse-embedding-001",
                     "image_analyze": "ops-image-analyze-ocr-001"}

# 输入文档url,示例文档为opensearch产品说明文档
document_url = "https://help.aliyun.com/zh/open-search/search-platform/product-overview/introduction-to-search-platform?spm=a2c4g.11186623.0.0.7ab93526WDzQ8z"

import asyncio
from operator import attrgetter
from typing import List
from Tea.exceptions import TeaException, RetryError
from alibabacloud_tea_openapi.models import Config
from alibabacloud_searchplat20240529.client import Client
from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
    CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, \
    GetDocumentSplitRequestDocument, GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, \
    GetTextSparseEmbeddingRequest, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, \
    GetImageAnalyzeTaskStatusResponse, CreateImageAnalyzeTaskRequest, GetImageAnalyzeTaskStatusRequest, \
    CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskResponse


async def poll_doc_analyze_task_result(ops_client, task_id, service_id, interval=5):
    while True:
        request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
        response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
        status = response.body.result.status
        if status == "PENDING":
            await asyncio.sleep(interval)
        elif status == "SUCCESS":
            return response
        else:
            print("error: " + response.body.result.error)
            raise Exception("document analyze task failed")


def is_analyzable_url(url:str):
    if not url:
        return False
    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
    return url.lower().endswith(tuple(image_extensions))


async def image_analyze(ops_client, url):
    try:
        print("image analyze :" + url)
        if url.startswith("//"):
            url = "https:" + url
        if not is_analyzable_url(url):
            print(url + " is  unanalysable.")
            return url
        image_analyze_service_id = service_id_config["image_analyze"]
        document = CreateImageAnalyzeTaskRequestDocument(
            url=url,
        )
        request = CreateImageAnalyzeTaskRequest(document=document)
        response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
        task_id = response.body.result.task_id
        while True:
            request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
            response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
            status = response.body.result.status
            if status == "PENDING":
                await asyncio.sleep(5)
            elif status == "SUCCESS":
                return url + response.body.result.data.content
            else:
                print("image analyze error: " + response.body.result.error)
                return url
    except Exception as e:
        print(f"image analyze Exception : {e}")


def chunk_list(lst, chunk_size):
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]


async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):

    # 生成opensearch开发平台client
    config = Config(bearer_token=api_key,endpoint=aisearch_endpoint,protocol="http")
    ops_client = Client(config=config)

    # Step 1: 文档解析/图片解析
    document_analyze_request = CreateDocumentAnalyzeTaskRequest(document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,file_name=file_name, file_type='html'))
    document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,service_id=service_id_config["document_analyze"],request=document_analyze_request)
    print("document_analyze task_id:" + document_analyze_response.body.result.task_id)
    extraction_result = await poll_doc_analyze_task_result(ops_client, document_analyze_response.body.result.task_id, service_id_config["document_analyze"])
    print("document_analyze done")
    document_content = extraction_result.body.result.data.content
    content_type = extraction_result.body.result.data.content_type

    # Step 2: 文档切片
    document_split_request = GetDocumentSplitRequest(
        GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
    document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
                                                                      document_split_request)
    print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
          + " rich text count:" + str(len(document_split_result.body.result.rich_texts)))

    # Step 3: 文本向量化
    # 提取切片结果。图片切片会通过图片解析服务提取出文本内容
    doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.chunks]
                + [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
                + [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client,chunk.content)} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
                )
    chunk_size = 32  # 一次最多允许计算32个embedding
    all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
    for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
        response = await ops_client.get_text_embedding_async(workspace_name,service_id_config["text_embedding"],GetTextEmbeddingRequest(chunk))
        all_text_embeddings.extend(response.body.result.embeddings)

    all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
    for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
        response = await ops_client.get_text_sparse_embedding_async(workspace_name,service_id_config["text_sparse_embedding"],GetTextSparseEmbeddingRequest(chunk,input_type="document",return_token=True))
        all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)

    for i in range(len(doc_list)):
        doc_list[i]["embedding"] = all_text_embeddings[i].embedding
        doc_list[i]["sparse_embedding"] = all_text_sparse_embeddings[i].embedding

    print("text-embedding done.")


if __name__ == "__main__":
    # 运行异步任务
    #    import nest_asyncio # 如果在Jupyter notebook中运行,反注释这两行
    #    nest_asyncio.apply() # 如果在Jupyter notebook中运行,反注释这两行
    asyncio.run(document_pipeline_execute(document_url))
    # asyncio.run(document_pipeline_execute(document_base64="eHh4eHh4eHg...", file_name="attention.pdf")) #另外一种调用方式