分布式 AI 计算引擎 MaxFrame 提供分布式多模态数据处理能力,支持对海量多模态数据(如图片、视频、音频等)进行高效分布式处理,包括图像解码、视频切帧、语音识别等操作,为 AI 大模型训练和推理提供数据预处理支持。
功能概述
面临挑战
在大规模 AI 模型训练场景中,多模态数据(图像、音频等)的处理面临以下挑战:
处理效率低:单机处理无法应对大规模多模态数据,处理周期长。
运维复杂:需要搭建和维护数据处理服务集群
开发割裂:结构化、多模态数据处理以及 AI 推理链路分离,增加开发复杂度。
核心特性
MaxFrame 提供内置的多模态数据处理算子,结合大规模数据处理及 AI Function 离线大模型推理能力,集成多模态处理与 AI 推理。
分布式处理
基于 MaxCompute 分布式计算引擎,支持海量多模态数据并行处理
OSS 原生集成
支持直接访问 OSS 上的多模态数据,无需数据迁移
丰富算子
覆盖图片(解码、属性提取、缩放、裁剪、格式转换)、音频(解码、转录、语种检测、VAD)等处理能力
声明式 API
类 Pandas API 设计,链式调用,降低学习成本
安全认证
支持 RAM 角色(
role_arn)访问 OSS,无需硬编码 AK/SK
适用场景
图像数据清洗:过滤无效的并筛选符合规格的图像
图像预处理:为 AI 模型准备训练数据,如缩放、裁剪、格式转换
音频语音转录:批量将音频转录为文本,支持多语种识别
多模态数据集构建:构建大规模图像/音频数据集,支持模型训练
运行环境
Python 版本:Python 3.11 及以上版本
SDK 版本:MaxFrame SDK 2.6.0及以上版本
# 版本检查
python -c "import maxframe; print(maxframe.__version__)"
# SDK 升级
pip install --upgrade maxframe算子说明
图片算子
图片算子
算子 | 说明 | 示例 |
| 从 URL/OSS 下载文件,返回二进制字节流 |
|
| 解码图像字节为图像对象 |
|
| 获取图像宽度(像素) |
|
| 获取图像高度(像素) |
|
| 获取图像文件大小(字节) |
|
| 获取图像格式(JPEG、PNG 等) |
|
| 获取图像色彩模式(RGB、RGBA、L 等) |
|
音频算子
音频算子
算子 | 说明 | 示例 |
| 从 URL/OSS 下载音频文件,返回二进制字节流 |
|
| 解码音频字节流为 AudioObject 对象 |
|
| 获取音频声道数 |
|
| 获取音频文件大小(字节) |
|
| 获取音频采样率(Hz) |
|
| 获取音频时长(秒) |
|
| 获取音频编码格式 |
|
| 自动检测音频语言类型 |
|
| 语音转录为文本(自动识别语种) |
|
| 语音转录为文本(指定语种) |
|
| 语音活动检测(VAD),返回有效语音片段 |
|
OSS 访问认证
OSS 访问认证
url.download() 通过 storage_options 参数配置 OSS 访问凭证:
# RAM 角色授权(推荐,自动申请临时 STS Token)
df["img_bytes"] = df["oss_path"].url.download(
storage_options={"role_arn": "acs:ram::[account-id]:role/[role-name]"}
)应用场景
以下示例展示多模态算子的典型使用方式。所有示例均假设已完成 Session 创建和引擎相关配置。
建议通过 options.dag.settings = {"engine_order": ["DPE", "MCSQL", "SPE"]} 将 DPE 设为首选引擎,以获得最佳的多模态数据处理性能。场景一:图像属性提取与过滤
从 OSS 批量下载图像,提取属性信息,按条件过滤后写入结果表。
from maxframe import dataframe as md
# 读取包含图像 OSS 路径的数据表
df = md.read_odps_table("image_src_table")
# 下载 → 解码 → 属性提取(链式调用)
df["img_bytes"] = df["oss_path"].url.download(
storage_options={"role_arn": ROLE_ARN}
)
df["img_obj"] = df["img_bytes"].image.decode()
df["width"] = df["img_obj"].image.width
df["height"] = df["img_obj"].image.height
df["size"] = df["img_obj"].image.size
df["format"] = df["img_obj"].image.format
df["mode"] = df["img_obj"].image.mode
# 按宽高范围过滤
df_filtered = df[
df["width"].between(1000, 5000) &
df["height"].between(2000, 6000)
]
# 写入结果表
df_filtered[
["id", "oss_path", "width", "height", "size", "format", "mode"]
].to_odps_table("image_sink_table", overwrite=True).execute()场景二:图像预处理(缩放、裁剪与格式转换)
对图像进行缩放、裁剪和格式转换,为 AI 模型准备训练数据。
df = md.read_odps_table("image_src_table")
df["img_bytes"] = df["oss_path"].url.download(
storage_options={"role_arn": ROLE_ARN}
)
df["img_obj"] = df["img_bytes"].image.decode()
# 缩放图像到 224x224(常用于模型输入)
df["img_resized"] = df["img_obj"].image.resize((224, 224))
# 裁剪图像指定区域(左, 上, 右, 下)
df["img_cropped"] = df["img_obj"].image.crop((100, 100, 500, 500))
# 转换色彩模式(如 RGBA → RGB)
df["img_rgb"] = df["img_obj"].image.convert("RGB")
注意事项
项目 | 说明 |
引擎配置 | 多模态算子需要 DPE 引擎,请确保 |
OSS 路径格式 | 使用完整的 OSS 内网地址: |
惰性执行 | 所有算子操作为惰性构建,需调用 |
安全认证 | 推荐使用 RAM 角色( |
资源清理 | 使用完毕后请调用 |
常见问题
Q1: 图像下载失败如何排查?
检查 OSS 路径格式是否正确(需使用内网 Endpoint)
检查 RAM 角色是否已授予 OSS 访问权限
查看 Logview 中的错误日志获取详细信息
Q2: 如何处理大规模数据?
多模态算子基于 MaxFrame DPE 引擎自动并行执行,底层自动完成数据分片与任务调度,无需手动配置并行度。如需自定义处理逻辑,可结合 MaxFrame apply_chunk 算子进行批量处理。