本文通过一个电商商品图片分析的示例,介绍如何使用 Flink Python DataFrame API 调用多模态大模型,完成从图片读取、预处理、生成商品描述到文本向量化的完整流程。DataFrame API 的更多介绍请参见功能概览。
前提条件
已创建 Flink 工作空间,详情请参见开通实时计算Flink版。
已开通阿里云百炼服务并获取 API Key,详情请参见获取API Key。
(可选)本地开发环境配置:由于 Flink Python DataFrame API 为 VVR 专属功能,暂未在开源版 Flink 中提供,因此您需要使用下面的命令安装 VVR 版 PyFlink 依赖包:
pip install --pre "ververica-flink>11.7"说明pip install 默认安装正式版本的依赖包。PyFlink DataFrame API 目前仅在 preview 版本的 VVR 中提供,因此您需要使用
--pre参数安装最新的 PyFlink 预览版本,或者显式指定版本号:pip install "ververica-flink==11.8.0a1"说明ververica-flink是API-Only包,仅提供类型定义和接口声明,用于本地开发,作业仍需在实时计算Flink版平台提交。
数据准备
在本例中,您需要准备测试图片文件(1.jpg、2.jpg、3.jpg),并通过文件管理功能上传到 Flink 工作空间。上传方法请参见文件管理。
作业开发
创建一个 Python 文件(如 pyflink_dataframe_example.py),按照以下步骤进行作业开发。您可在本文档末尾获取 Python 文件的完整代码。
步骤一:注册 AI 模型 Provider
使用 pf.set_provider 注册模型连接信息。本示例需要注册两个 Provider,分别用于图片理解和文本向量化。
import pyflink.dataframe as pf
parser = argparse.ArgumentParser()
parser.add_argument("--api-key", required=True)
args, _ = parser.parse_known_args()
pf.set_provider(
"chat",
pf.OpenAICompatProvider(
endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
api_key=args.api_key,
system_prompt=(
"你是一个电商商品分析助手,请根据商品图片生成商品描述。"
"输出内容应仅包含描述本身,不能包含其他内容。"
),
content_type="IMAGE_URL",
error_handling_strategy="RETRY",
retry_num=3,
retry_backoff_strategy="EXPONENTIAL",
),
)
pf.set_provider(
"embedding",
pf.OpenAICompatProvider(
endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings",
api_key=args.api_key,
error_handling_strategy="RETRY",
retry_num=3,
retry_backoff_strategy="EXPONENTIAL",
),
)步骤二:定义图片预处理 UDF
本例定义一个 UDF 对图片进行下列预处理操作:
根据 EXIF 信息自动修正图片方向。
通过创建新图片并复制像素数据,去除原始元数据。
根据文件扩展名推断 MIME 类型,输出 Base64 编码的 Data URI。
import base64
from typing import Optional
from pyflink.dataframe import udf
@udf
def preprocess_image(
image_url: Optional[str], image_bytes: Optional[bytes]
) -> Optional[str]:
from PIL import Image
from PIL.ImageOps import exif_transpose
import io
if image_bytes is None:
return None
# Transpose image according to EXIF orientation
image = Image.open(io.BytesIO(image_bytes))
image = exif_transpose(image)
# Remove metadata by creating a new image and copying pixel data
fmt = image.format or "JPEG"
clean = Image.new(image.mode, image.size)
clean.putdata(list(image.getdata()))
output = io.BytesIO()
clean.save(output, format=fmt)
# Output base64-encoded image data URI
url = (image_url or "").split("?", 1)[0].lower()
mime = (
"image/png"
if url.endswith(".png")
else "image/webp" if url.endswith(".webp") else "image/jpeg"
)
encoded = base64.b64encode(output.getvalue()).decode("ascii")
return f"data:{mime};base64,{encoded}"步骤三:准备输入数据
使用 pf.from_dict 构造输入 DataFrame,包含图片 ID 和对应的 OSS 路径。
OSS_PREFIX = "<your_oss_prefix>"
df = pf.from_dict(
{
"image_id": ["1", "2", "3"],
"image_url": [
f"{OSS_PREFIX}/1.jpg",
f"{OSS_PREFIX}/2.jpg",
f"{OSS_PREFIX}/3.jpg",
],
}
)您可以根据 Flink 工作空间的存储类型确定 OSS_PREFIX 的值,详情请参见文件管理:
若存储类型为 OSS bucket,
OSS_PREFIX的值为oss://<您绑定的OSS Bucket名称>/artifacts/namespaces/<项目空间名称>;若存储类型为全托管存储,
OSS_PREFIX的值为oss://flink-fullymanaged-<工作空间ID>/artifacts/namespaces/<项目空间名称>。
您也可以直接在文件管理中复制上传图片的完整路径,以确定OSS_PREFIX的值。
在生产环境中,通常从外部数据源读取数据。DataFrame API 提供了多种数据读写函数,帮助您便捷地操作外部数据,请参见I/O 读写。
步骤四:构建图片处理流水线
将图片读取、预处理、AI 推理和向量化串联为一个完整的处理流水线。该流水线依次执行以下操作:
从 OSS 读取图片二进制数据;
预处理图片并转换为 Base64 Data URI;
调用
qwen-vl-plus多模态模型,根据图片生成商品描述;调用
text-embedding-v4将描述文本转换为向量;选取最终输出列:商品 ID、描述文本和向量。
from pyflink.dataframe import col
result = (
df.with_column("image_bytes", col("image_url").fetch_content())
.with_column(
"image_data",
preprocess_image(col("image_url"), col("image_bytes")),
)
.llm.predict(
"image_data",
provider="chat",
model="qwen-vl-plus",
output_type={
"image_description": pf.DataType.string(),
},
)
.llm.ai_embed(
"image_description", provider="embedding", model="text-embedding-v4"
)
.select("image_id", "image_description", "embedding")
)DataFrame API 的详细文档请参见API参考。
步骤五:输出结果
将处理结果写入 Parquet 文件。
result.write_parquet(
f"{OSS_PREFIX}/product_descriptions/"
)准备自定义 Python 虚拟环境
示例中的 UDF 使用了第三方库 pillow,该依赖未在预装软件包中提供,因此您需要创建自定义的 Python 虚拟环境,并上传到 Flink 工作空间。本示例中的 Python 虚拟环境压缩包已为您准备好,您可根据工作空间的处理器架构,直接下载对应版本并上传到工作空间:
您也可以参考使用Python依赖文档,自行准备 venv.zip 并上传到工作空间。在准备过程中,您需要将上述文档脚本中的 pip install 命令修改为以下内容,以添加 pillow 依赖:
pip install "ververica-flink==11.8.0a1" pillow作业部署与运行
部署 Python 作业。部署流程请参见Flink Python作业。
配置作业:
在页面,单击目标作业名称。
在部署详情页签基础配置区域引擎版本,选择 vvr-11.8.preview.1-jdk11-flink-1.20。
在部署详情页签基础配置区域Entry Point Main Arguments,输入
--api-key <您的阿里云百炼 API key>。在部署详情页签基础配置区域Python Archives,选择上传的 Python 虚拟环境压缩包 venv.zip。
在部署详情页签运行参数配置区域其他配置项,添加 Python 虚拟环境的配置信息:
python.executable: venv.zip/venv/bin/python python.client.executable: venv.zip/venv/bin/python
运行作业:
在页面,单击目标作业名称操作列中的启动。
选择无状态启动,单击启动,作业启动详情请参见作业启动。
单击启动后,作业状态变为运行中或已完成,则代表作业运行正常。如果您部署本文档Python测试文件,作业最终运行状态是已完成状态。
作业运行完成后,在文件管理页面下载
product_descriptions/目录下的 Parquet 文件,验证输出结果。预期输出包含以下列:列名
类型
说明
image_idSTRING
商品 ID
image_descriptionSTRING
模型生成的商品描述
embeddingARRAY<FLOAT>
描述文本对应的向量
完整代码
import argparse
import base64
from typing import Optional
import pyflink.dataframe as pf
from pyflink.dataframe import col, udf
OSS_PREFIX = "<your_oss_prefix>"
parser = argparse.ArgumentParser()
parser.add_argument("--api-key", required=True)
args, _ = parser.parse_known_args()
pf.set_provider(
"chat",
pf.OpenAICompatProvider(
endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
api_key=args.api_key,
system_prompt=(
"你是一个电商商品分析助手,请根据商品图片生成商品描述。"
"输出内容应仅包含描述本身,不能包含其他内容。"
),
content_type="IMAGE_URL",
error_handling_strategy="RETRY",
retry_num=3,
retry_backoff_strategy="EXPONENTIAL",
),
)
pf.set_provider(
"embedding",
pf.OpenAICompatProvider(
endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings",
api_key=args.api_key,
error_handling_strategy="RETRY",
retry_num=3,
retry_backoff_strategy="EXPONENTIAL",
),
)
@udf
def preprocess_image(
image_url: Optional[str], image_bytes: Optional[bytes]
) -> Optional[str]:
from PIL import Image
from PIL.ImageOps import exif_transpose
import io
if image_bytes is None:
return None
# Transpose image according to EXIF orientation
image = Image.open(io.BytesIO(image_bytes))
image = exif_transpose(image)
# Remove metadata by creating a new image and copying pixel data
fmt = image.format or "JPEG"
clean = Image.new(image.mode, image.size)
clean.putdata(list(image.getdata()))
output = io.BytesIO()
clean.save(output, format=fmt)
# Output base64-encoded image data URI
url = (image_url or "").split("?", 1)[0].lower()
mime = (
"image/png"
if url.endswith(".png")
else "image/webp" if url.endswith(".webp") else "image/jpeg"
)
encoded = base64.b64encode(output.getvalue()).decode("ascii")
return f"data:{mime};base64,{encoded}"
df = pf.from_dict(
{
"image_id": ["1", "2", "3"],
"image_url": [
f"{OSS_PREFIX}/1.jpg",
f"{OSS_PREFIX}/2.jpg",
f"{OSS_PREFIX}/3.jpg",
],
}
)
result = (
df.with_column("image_bytes", col("image_url").fetch_content())
.with_column(
"image_data",
preprocess_image(col("image_url"), col("image_bytes")),
)
.llm.predict(
"image_data",
provider="chat",
model="qwen-vl-plus",
output_type={
"image_description": pf.DataType.string(),
},
)
.llm.ai_embed(
"image_description", provider="embedding", model="text-embedding-v4"
)
.select("image_id", "image_description", "embedding")
)
result.write_parquet(
f"{OSS_PREFIX}/product_descriptions/"
)