使用Flink Python DataFrame API进行电商商品图片分析

更新时间:
复制为 MD 格式

本文通过一个电商商品图片分析的示例,介绍如何使用 Flink Python DataFrame API 调用多模态大模型,完成从图片读取、预处理、生成商品描述到文本向量化的完整流程。DataFrame API 的更多介绍请参见功能概览

前提条件

  • 已创建 Flink 工作空间,详情请参见开通实时计算Flink

  • 已开通阿里云百炼服务并获取 API Key,详情请参见获取API Key

  • (可选)本地开发环境配置:由于 Flink Python DataFrame API 为 VVR 专属功能,暂未在开源版 Flink 中提供,因此您需要使用下面的命令安装 VVR 版 PyFlink 依赖包:

    pip install --pre "ververica-flink>11.7"
    说明

    pip install 默认安装正式版本的依赖包。PyFlink DataFrame API 目前仅在 preview 版本的 VVR 中提供,因此您需要使用 --pre 参数安装最新的 PyFlink 预览版本,或者显式指定版本号:

    pip install "ververica-flink==11.8.0a1"
    说明

    ververica-flinkAPI-Only包,仅提供类型定义和接口声明,用于本地开发,作业仍需在实时计算Flink版平台提交。

数据准备

在本例中,您需要准备测试图片文件(1.jpg2.jpg3.jpg),并通过文件管理功能上传到 Flink 工作空间。上传方法请参见文件管理

作业开发

创建一个 Python 文件(如 pyflink_dataframe_example.py),按照以下步骤进行作业开发。您可在本文档末尾获取 Python 文件的完整代码。

步骤一:注册 AI 模型 Provider

使用 pf.set_provider 注册模型连接信息。本示例需要注册两个 Provider,分别用于图片理解和文本向量化。

import pyflink.dataframe as pf

parser = argparse.ArgumentParser()
parser.add_argument("--api-key", required=True)
args, _ = parser.parse_known_args()

pf.set_provider(
    "chat",
    pf.OpenAICompatProvider(
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
        api_key=args.api_key,
        system_prompt=(
            "你是一个电商商品分析助手,请根据商品图片生成商品描述。"
            "输出内容应仅包含描述本身,不能包含其他内容。"
        ),
        content_type="IMAGE_URL",
        error_handling_strategy="RETRY",
        retry_num=3,
        retry_backoff_strategy="EXPONENTIAL",
    ),
)

pf.set_provider(
    "embedding",
    pf.OpenAICompatProvider(
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings",
        api_key=args.api_key,
        error_handling_strategy="RETRY",
        retry_num=3,
        retry_backoff_strategy="EXPONENTIAL",
    ),
)

步骤二:定义图片预处理 UDF

本例定义一个 UDF 对图片进行下列预处理操作:

  1. 根据 EXIF 信息自动修正图片方向。

  2. 通过创建新图片并复制像素数据,去除原始元数据。

  3. 根据文件扩展名推断 MIME 类型,输出 Base64 编码的 Data URI。

import base64
from typing import Optional
from pyflink.dataframe import udf

@udf
def preprocess_image(
    image_url: Optional[str], image_bytes: Optional[bytes]
) -> Optional[str]:
    from PIL import Image
    from PIL.ImageOps import exif_transpose
    import io

    if image_bytes is None:
        return None

    # Transpose image according to EXIF orientation
    image = Image.open(io.BytesIO(image_bytes))
    image = exif_transpose(image)

    # Remove metadata by creating a new image and copying pixel data
    fmt = image.format or "JPEG"
    clean = Image.new(image.mode, image.size)
    clean.putdata(list(image.getdata()))
    output = io.BytesIO()
    clean.save(output, format=fmt)

    # Output base64-encoded image data URI
    url = (image_url or "").split("?", 1)[0].lower()
    mime = (
        "image/png"
        if url.endswith(".png")
        else "image/webp" if url.endswith(".webp") else "image/jpeg"
    )
    encoded = base64.b64encode(output.getvalue()).decode("ascii")
    return f"data:{mime};base64,{encoded}"

步骤三:准备输入数据

使用 pf.from_dict 构造输入 DataFrame,包含图片 ID 和对应的 OSS 路径。

OSS_PREFIX = "<your_oss_prefix>"

df = pf.from_dict(
    {
        "image_id": ["1", "2", "3"],
        "image_url": [
            f"{OSS_PREFIX}/1.jpg",
            f"{OSS_PREFIX}/2.jpg",
            f"{OSS_PREFIX}/3.jpg",
        ],
    }
)
说明

您可以根据 Flink 工作空间的存储类型确定 OSS_PREFIX 的值,详情请参见文件管理

  • 若存储类型为 OSS bucket,OSS_PREFIX的值为 oss://<您绑定的OSS Bucket名称>/artifacts/namespaces/<项目空间名称>

  • 若存储类型为全托管存储,OSS_PREFIX的值为oss://flink-fullymanaged-<工作空间ID>/artifacts/namespaces/<项目空间名称>

您也可以直接在文件管理中复制上传图片的完整路径,以确定OSS_PREFIX的值。

说明

在生产环境中,通常从外部数据源读取数据。DataFrame API 提供了多种数据读写函数,帮助您便捷地操作外部数据,请参见I/O 读写

步骤四:构建图片处理流水线

将图片读取、预处理、AI 推理和向量化串联为一个完整的处理流水线。该流水线依次执行以下操作:

  • 从 OSS 读取图片二进制数据;

  • 预处理图片并转换为 Base64 Data URI;

  • 调用 qwen-vl-plus 多模态模型,根据图片生成商品描述;

  • 调用 text-embedding-v4 将描述文本转换为向量;

  • 选取最终输出列:商品 ID、描述文本和向量。

from pyflink.dataframe import col

result = (
    df.with_column("image_bytes", col("image_url").fetch_content())
    .with_column(
        "image_data",
        preprocess_image(col("image_url"), col("image_bytes")),
    )
    .llm.predict(
        "image_data",
        provider="chat",
        model="qwen-vl-plus",
        output_type={
            "image_description": pf.DataType.string(),
        },
    )
    .llm.ai_embed(
        "image_description", provider="embedding", model="text-embedding-v4"
    )
    .select("image_id", "image_description", "embedding")
)
说明

DataFrame API 的详细文档请参见API参考

步骤五:输出结果

将处理结果写入 Parquet 文件。

result.write_parquet(
    f"{OSS_PREFIX}/product_descriptions/"
)

准备自定义 Python 虚拟环境

示例中的 UDF 使用了第三方库 pillow,该依赖未在预装软件包中提供,因此您需要创建自定义的 Python 虚拟环境,并上传到 Flink 工作空间。本示例中的 Python 虚拟环境压缩包已为您准备好,您可根据工作空间的处理器架构,直接下载对应版本并上传到工作空间:

说明

您也可以参考使用Python依赖文档,自行准备 venv.zip 并上传到工作空间。在准备过程中,您需要将上述文档脚本中的 pip install 命令修改为以下内容,以添加 pillow 依赖:

pip install "ververica-flink==11.8.0a1" pillow

作业部署与运行

  1. 部署 Python 作业。部署流程请参见Flink Python作业

  2. 配置作业:

    1. 运维中心 > 作业运维页面,单击目标作业名称。

    2. 部署详情页签基础配置区域引擎版本,选择 vvr-11.8.preview.1-jdk11-flink-1.20。

    3. 部署详情页签基础配置区域Entry Point Main Arguments,输入 --api-key <您的阿里云百炼 API key>

    4. 部署详情页签基础配置区域Python Archives,选择上传的 Python 虚拟环境压缩包 venv.zip。

    5. 部署详情页签运行参数配置区域其他配置项,添加 Python 虚拟环境的配置信息:

      python.executable: venv.zip/venv/bin/python
      python.client.executable: venv.zip/venv/bin/python
  3. 运行作业:

    1. 运维中心 > 作业运维页面,单击目标作业名称操作列中的启动

    2. 选择无状态启动,单击启动,作业启动详情请参见作业启动

      单击启动后,作业状态变为运行中已完成,则代表作业运行正常。如果您部署本文档Python测试文件,作业最终运行状态是已完成状态。

  4. 作业运行完成后,在文件管理页面下载 product_descriptions/ 目录下的 Parquet 文件,验证输出结果。预期输出包含以下列:

    列名

    类型

    说明

    image_id

    STRING

    商品 ID

    image_description

    STRING

    模型生成的商品描述

    embedding

    ARRAY<FLOAT>

    描述文本对应的向量

完整代码

import argparse
import base64
from typing import Optional

import pyflink.dataframe as pf
from pyflink.dataframe import col, udf

OSS_PREFIX = "<your_oss_prefix>"

parser = argparse.ArgumentParser()
parser.add_argument("--api-key", required=True)
args, _ = parser.parse_known_args()

pf.set_provider(
    "chat",
    pf.OpenAICompatProvider(
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
        api_key=args.api_key,
        system_prompt=(
            "你是一个电商商品分析助手,请根据商品图片生成商品描述。"
            "输出内容应仅包含描述本身,不能包含其他内容。"
        ),
        content_type="IMAGE_URL",
        error_handling_strategy="RETRY",
        retry_num=3,
        retry_backoff_strategy="EXPONENTIAL",
    ),
)

pf.set_provider(
    "embedding",
    pf.OpenAICompatProvider(
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings",
        api_key=args.api_key,
        error_handling_strategy="RETRY",
        retry_num=3,
        retry_backoff_strategy="EXPONENTIAL",
    ),
)


@udf
def preprocess_image(
    image_url: Optional[str], image_bytes: Optional[bytes]
) -> Optional[str]:
    from PIL import Image
    from PIL.ImageOps import exif_transpose
    import io

    if image_bytes is None:
        return None

    # Transpose image according to EXIF orientation
    image = Image.open(io.BytesIO(image_bytes))
    image = exif_transpose(image)

    # Remove metadata by creating a new image and copying pixel data
    fmt = image.format or "JPEG"
    clean = Image.new(image.mode, image.size)
    clean.putdata(list(image.getdata()))
    output = io.BytesIO()
    clean.save(output, format=fmt)

    # Output base64-encoded image data URI
    url = (image_url or "").split("?", 1)[0].lower()
    mime = (
        "image/png"
        if url.endswith(".png")
        else "image/webp" if url.endswith(".webp") else "image/jpeg"
    )
    encoded = base64.b64encode(output.getvalue()).decode("ascii")
    return f"data:{mime};base64,{encoded}"


df = pf.from_dict(
    {
        "image_id": ["1", "2", "3"],
        "image_url": [
            f"{OSS_PREFIX}/1.jpg",
            f"{OSS_PREFIX}/2.jpg",
            f"{OSS_PREFIX}/3.jpg",
        ],
    }
)

result = (
    df.with_column("image_bytes", col("image_url").fetch_content())
    .with_column(
        "image_data",
        preprocess_image(col("image_url"), col("image_bytes")),
    )
    .llm.predict(
        "image_data",
        provider="chat",
        model="qwen-vl-plus",
        output_type={
            "image_description": pf.DataType.string(),
        },
    )
    .llm.ai_embed(
        "image_description", provider="embedding", model="text-embedding-v4"
    )
    .select("image_id", "image_description", "embedding")
)

result.write_parquet(
    f"{OSS_PREFIX}/product_descriptions/"
)