本文基于实际代码案例介绍如何在MaxFrame中高效、安全地挂载和使用阿里云 OSS 作为分布式计算存储。通过MaxFrame with_fs_mount装饰器实现文件系统级挂载,为大规模数据处理提供稳定可靠的外部数据访问能力。
应用场景
需要将MaxFrame作业与持久化对象存储(如 OSS)结合使用的大数据分析场景。例如:
从 OSS 加载原始数据并清洗或处理;
将中间结果写入OSS供下游任务消费;
共享训练后的模型文件、配置文件等静态资源。
传统的读写方式(如 pd.read_csv("oss://..."))受限于 SDK 性能和网络开销,在分布式环境下效率较低。而通过文件系统级挂载(FS Mount),可以在 MaxCompute 中像操作本地磁盘一样访问 OSS 文件,极大提升开发效率。
实践指南
开通服务及授权
开通OSS服务并创建Bucket。
登录对象存储OSS控制台。
在左侧导航栏单击Bucket 列表。
在Bucket 列表页面,单击创建 Bucket。
本实践中Bucket Name为
xxx-oss-test-sh。
创建用于MaxCompute的RAM 角色(Role)并绑定该角色到MaxCompute运行环境。
登录RAM控制台。
在左侧导航栏选择。
在角色页面,单击创建角色。
在创建角色页面的右上角,单击创建服务关联角色。
在创建角色页面,选择信任主体类型为云服务。
信任主体名称选择云原生大数据计算服务MaxCompute。
在权限管理页签,单击新增授权。在弹出的新增授权面板中,选择要授予该角色的权限策略,单击确认新增授权。
权限策略选择:
管理对象存储服务(OSS)权限:AliyunOSSFullAccess
管理大数据计算服务(MaxCompute)的权限:AliyunMaxComputeFullAccess
使用with_fs_mount挂载OSS
推荐用法
from maxframe.udf import with_fs_mount @with_fs_mount( "oss://oss-cn-xxxx-internal.aliyuncs.com/xxx-oss-test-sh/test/", "/mnt/oss_data", storage_options={ "role_arn": "acs:ram::xxx:role/maxframe-oss" }, ) def _process(batch_df): import os if os.path.exists('/mnt/oss_data'): print(f"Mounted files: {os.listdir('/mnt/oss_data')}") else: print("/mnt/oss_data not mounted!") return batch_df * 2不推荐写法
可用于测试用途,不建议用于生产环境。
storage_options={ "oss_access_key_id": "LTAI5t...", "oss_access_key_secret": "Wp9H..." }重要避免硬编码 AccessKey。使用
role_arn可以让系统自动申请临时 STS Token,避免 AK/SK 泄露风险。
结合with_running_options控制资源分配
建议根据任务类型设置合理的 CPU 和内存资源:
from maxframe.udf import with_running_options
@with_running_options(engine="dpe", cpu=2, memory=16)
@with_fs_mount(...)
def _process(batch_df):
...参数 | 建议值 | 说明 |
| 固定 | 当前 FS Mount 仅支持 DPE 引擎。 |
| 1~4 | 若涉及复杂 IO 或解压可适当增加。 |
| 8GB 起 | 大文件加载建议 ≥16GB。 |
使用示例
推荐模式:批量处理(data batch processing)。
在大规模数据处理场景下,可结合MaxFrame apply_chunk功能,对输入数据批量处理。
创建 MaxFrame Session 并启用 SQL 支持
import os
from odps import ODPS
from maxframe import new_session
from maxframe.udf import with_fs_mount
# 初始化 ODPS 客户端
o = ODPS(
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your project>',
endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)
# 设置镜像(如有自定义依赖)
options.sql.settings = {"odps.session.image": "xxxx"}
# 启动会话
session = new_session(o)
print("LogView:", session.get_logview_address())
print("Session ID:", session.session_id)
@with_fs_mount(
"oss://oss-cn-<region>-internal.aliyuncs.com/wzy-oss-test-sh/test/",
"/mnt/oss_data",
storage_options={
"role_arn": "acs:ram::<uid>:role/maxframe-oss"
},
)
@with_running_options(engine="dpe", cpu=2, memory=16)创建自定义函数
def _process(batch_df):
import pandas as pd
import os
# Step 1: 检查挂载是否成功
mount_point = "/mnt/oss_data"
if not os.path.exists(mount_point):
raise RuntimeError("OSS mount failed!")
# Step 2: 加载数据(如映射表、词典)
mapping_file = os.path.join(mount_point, "category_map.csv")
if os.path.isfile(mapping_file):
mapping_df = pd.read_csv(mapping_file)
# Step 3: 处理当前 chunk
result = batch_df.copy()
result['F'] = result['A'] * 10
return result构建DataFrame并应用自定义函数
data = [[1.0, 2.0, 3.0, 4.0, 5.0], ...]
df = md.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])
# 使用 apply_chunk 应用挂载后的函数
result_df = df.mf.apply_chunk(
_process,
skip_infer=True,
output_type="dataframe",
dtypes=df.dtypes,
index=df.index
)
# 执行并获取结果
result = result_df.execute().fetch()skip_infer=True可跳过类型推断,加快执行速度,但需确保 dtypes 和 index 正确传递。
调试技巧
验证挂载状态
可在 _process 函数中加入调试日志:
import os
print("Mount path exists:", os.path.exists("/mnt/oss_data"))
print("Files in mount:", os.listdir("/mnt/oss_data") if os.path.exists("/mnt/oss_data") else [])查看 LogView 输出,确认是否有类似日志:
FS Mount 成功!/mnt/oss_data: ['data.csv', 'config.json', 'model.pkl']
Processing batch with shape: (1000, 5)