OSS挂载及使用实践

本文基于实际代码案例介绍如何在MaxFrame中高效、安全地挂载和使用阿里云 OSS 作为分布式计算存储。通过MaxFrame with_fs_mount装饰器实现文件系统级挂载,为大规模数据处理提供稳定可靠的外部数据访问能力。

应用场景

需要将MaxFrame作业与持久化对象存储(如 OSS)结合使用的大数据分析场景。例如:

  • 从 OSS 加载原始数据并清洗或处理;

  • 将中间结果写入OSS供下游任务消费;

  • 共享训练后的模型文件、配置文件等静态资源。

传统的读写方式(如 pd.read_csv("oss://..."))受限于 SDK 性能和网络开销,在分布式环境下效率较低。而通过文件系统级挂载(FS Mount),可以在 MaxCompute 中像操作本地磁盘一样访问 OSS 文件,极大提升开发效率。

实践指南

开通服务及授权

  1. 开通OSS服务并创建Bucket。

    1. 登录对象存储OSS控制台

    2. 在左侧导航栏单击Bucket 列表

    3. Bucket 列表页面,单击创建 Bucket

      本实践中Bucket Namexxx-oss-test-sh

  2. 创建用于MaxComputeRAM 角色(Role)并绑定该角色到MaxCompute运行环境。

    1. 登录RAM控制台

    2. 在左侧导航栏选择身份管理 > 角色

    3. 角色页面,单击创建角色

    4. 创建角色页面的右上角,单击创建服务关联角色

      1. 创建角色页面,选择信任主体类型云服务

      2. 信任主体名称选择云原生大数据计算服务MaxCompute

      3. 权限管理页签,单击新增授权。在弹出的新增授权面板中,选择要授予该角色的权限策略,单击确认新增授权

        权限策略选择:

使用with_fs_mount挂载OSS

  1. 推荐用法

    from maxframe.udf import with_fs_mount
    
    @with_fs_mount(
        "oss://oss-cn-xxxx-internal.aliyuncs.com/xxx-oss-test-sh/test/",
        "/mnt/oss_data",
        storage_options={
            "role_arn": "acs:ram::xxx:role/maxframe-oss"
        },
    )
    def _process(batch_df):
        import os
        if os.path.exists('/mnt/oss_data'):
            print(f"Mounted files: {os.listdir('/mnt/oss_data')}")
        else:
            print("/mnt/oss_data not mounted!")
        return batch_df * 2
        
  2. 不推荐写法

    可用于测试用途,不建议用于生产环境。

    storage_options={
        "oss_access_key_id": "LTAI5t...",
        "oss_access_key_secret": "Wp9H..."
    }
    重要

    避免硬编码 AccessKey。使用 role_arn 可以让系统自动申请临时 STS Token,避免 AK/SK 泄露风险

结合with_running_options控制资源分配

建议根据任务类型设置合理的 CPU 和内存资源:

from maxframe.udf import with_running_options
@with_running_options(engine="dpe", cpu=2, memory=16)
@with_fs_mount(...)
def _process(batch_df):
    ...

参数

建议值

说明

engine="dpe"

固定

当前 FS Mount 仅支持 DPE 引擎。

cpu

1~4

若涉及复杂 IO 或解压可适当增加。

memory

8GB 起

大文件加载建议 ≥16GB。

使用示例

推荐模式:批量处理(data batch processing)。

在大规模数据处理场景下,可结合MaxFrame apply_chunk功能,对输入数据批量处理。

创建 MaxFrame Session 并启用 SQL 支持

import os
from odps import ODPS
from maxframe import new_session
from maxframe.udf import with_fs_mount

# 初始化 ODPS 客户端
o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)

# 设置镜像(如有自定义依赖)
options.sql.settings = {"odps.session.image": "xxxx"}

# 启动会话
session = new_session(o)

print("LogView:", session.get_logview_address())
print("Session ID:", session.session_id)



@with_fs_mount(
    "oss://oss-cn-<region>-internal.aliyuncs.com/wzy-oss-test-sh/test/",
    "/mnt/oss_data",
    storage_options={
        "role_arn": "acs:ram::<uid>:role/maxframe-oss"
    },
)
@with_running_options(engine="dpe", cpu=2, memory=16)

创建自定义函数

def _process(batch_df):
    import pandas as pd
    import os

    # Step 1: 检查挂载是否成功
    mount_point = "/mnt/oss_data"
    if not os.path.exists(mount_point):
        raise RuntimeError("OSS mount failed!")

    # Step 2: 加载数据(如映射表、词典)
    mapping_file = os.path.join(mount_point, "category_map.csv")
    if os.path.isfile(mapping_file):
        mapping_df = pd.read_csv(mapping_file)

    # Step 3: 处理当前 chunk
    result = batch_df.copy()
    result['F'] = result['A'] * 10

    return result

构建DataFrame并应用自定义函数

data = [[1.0, 2.0, 3.0, 4.0, 5.0], ...]
df = md.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])

# 使用 apply_chunk 应用挂载后的函数
result_df = df.mf.apply_chunk(
    _process,
    skip_infer=True,
    output_type="dataframe",
    dtypes=df.dtypes,
    index=df.index
)

# 执行并获取结果
result = result_df.execute().fetch()

skip_infer=True可跳过类型推断,加快执行速度,但需确保 dtypes 和 index 正确传递。

调试技巧

验证挂载状态

可在 _process 函数中加入调试日志:

import os
print("Mount path exists:", os.path.exists("/mnt/oss_data"))
print("Files in mount:", os.listdir("/mnt/oss_data") if os.path.exists("/mnt/oss_data") else [])

查看 LogView 输出,确认是否有类似日志:

FS Mount 成功!/mnt/oss_data: ['data.csv', 'config.json', 'model.pkl']
Processing batch with shape: (1000, 5)