MaxFrame分布式技术架构及开发实践

MaxFrame功能介绍

MaxFrame是阿里云MaxCompute推出的分布式Python计算框架,旨在为开发者提供类Pandas风格的编程接口,同时支持分布式处理超大规模数据(TB/PB 级)。

MaxFrame通过无缝集成MaxCompute的弹性资源池与自动分布式执行引擎,突破传统单机Pandas及机器学习算子的内存与性能瓶颈,进而满足大规模数据处理、Python交互式分析及机器学习预处理等场景的需求。

同时,MaxFrame在表示层集成了丰富的AI能力。以AI Function为例,支持调用大模型进行高并发离线推理等场景,进而实现在一套编程框架下完成从数据处理到AI开发的完整流程。

应用场景

  1. 大规模数据分析

    场景:金融风控、汽车自动驾驶、电商用户行为分析等需处理TB/PB级数据的场景。

    优势:相比单机Pandas,MaxFrame可加速数十倍以上,支持全量数据分析。

  2. 交互式查询与调试

    场景:数据科学家通过Jupyter Notebook快速验证数据假设。

    优势:提供类SQL的交互式接口(如 read_odps_query),结合延迟执行减少调试等待时间。

  3. ETL流程优化

    场景:日志清洗、数据格式转换等批处理任务。

    优势:通过apply + UDF实现复杂逻辑,结合MaxCompute资源弹性伸缩,降低开发成本。

  4. 机器学习模型训练推理

    场景:基于XGBoost、LightGBM等常用模型训练、推理。

    优势:兼容XGBoost、LightGBM常用算子,同时支持分布式训练、推理。

核心原理

核心特性

  1. 兼容Pandas API
    MaxFrame提供与Pandas高度兼容的 API,支持GROUP BYJOINApply 等常用DataFrame操作,使得现有代码无需重构即可迁移至分布式环境。

    关键技术

    1. 数据处理模式采用DataFrame分片处理机制,可支持PB级数据并行计算。

    2. 算子优化:兼容并分布式优化GROUP BY/JOIN/MERGE等高频算子。

  2. 分布式资源调度
    基于MaxCompute弹性计算资源池,支持按需扩展计算节点,自动将任务拆分为并行子任务,显著提升大规模数据的处理效率。

  3. 延迟执行与动态优化
    MaxFrame采用Lazy Evaluation模型,仅在调用execute()时触发实际计算。执行引擎自动优化任务计划(如分区裁剪),减少冗余数据扫描与网络传输。

  4. 数据本地化计算
    MaxFrame直接对MaxCompute表数据进行读写,从而避免了将数据迁移至本地环境。同时,它结合了MaxCompute底层的列式存储与分布式缓存技术,以进一步加速计算性能。

    关键技术:使用数据读写模式。通过内部接口直读MaxCompute数据,降低数据传输网络开销及 IO 压力

  5. 灵活的会话管理
    通过new_session()创建独立会话,隔离任务资源与配置(如项目空间、计算引擎版本),支持多任务并行执行。

    关键技术:动态内存分配机制,支持worker并行处理。

流程代码示例

import maxframe.dataframe as md
from maxframe import new_session
from odps import ODPS

o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)

# 初始化会话  
session = new_session(o)

# 打印执行日志
print(session.get_logview_address())

# 读取 MaxCompute 表数据,示例数据为emp,测试时替换项目中已有表即可。
df = mf.read_odps_query("SELECT * FROM emp")  

# 执行分布式聚合  
result = df.groupby("deptno").agg(total_salary=('sal', 'sum')).execute()  

# 分页获取结果  
local_result = result.head(1000).fetch()  # 限制返回 1000 条  

print("聚合结果如下:")
print(local_result)

# 关闭会话  
session.destroy()

传统方案对比

对比项

MaxFrame

PyODPS

Mars

SQL+UDF

开发接口

Pandas兼容

语法和接口与Pandas DataFrame有较大差异。

需要使用SQLPython两套接口。

数据处理

运行时无需将数据拉取至本地处理,减少不必要的本地数据传输,提高作业执行效率。

PyODPSto_pandas方法读取数据,需要将数据拉到本地计算。

仅对部分算子支持分布式;

初始化时需要创建集群,速度慢、稳定性不高。

基于MaxCompute SQL能力支持分布式作业。

计算资源

不受本地资源大小限制,突破Python单机性能瓶颈。

受本地资源大小限制。

受资源大小限制,需要指定Worker、CPUMemory大小。

基于MaxCompute Serverless能力,可实现SQL作业弹性计算。

开发体验

开箱即用的交互式开发环境及离线调度能力。内置常用库,通过注解即可管理三方依赖,无需手动打包。

开箱即用的交互式开发环境及离线调度能力。

需要准备相应的运行环境,并拉起Mars集群。

Python UDF的依赖包需要手工打包和上传。

单机作业改造实践指南

改造原则

  • 接口修改:保持Pandas/XGBoost原生API调用方式。

  • 资源适配:按需配置分布式参数。

  • 数据本地化:优先在MaxCompute集群内部流转数据。

  • 自定义 UDF:若部分接口不兼容,也采用自定义UDF方式实现。

数据准备

如果已有数据或不需要使用该测试数据,该步骤可忽略。

按照下方代码在已有MaxCompute项目中创建测试表并插入数据。

CREATE TABLE emp (
    empno    BIGINT COMMENT '员工编号',
    ename    STRING COMMENT '员工姓名',
    job      STRING COMMENT '工作岗位',
    mgr      BIGINT COMMENT '上级经理的员工编号',
    hiredate DATETIME COMMENT '入职日期',
    sal      DECIMAL(10, 2) COMMENT '薪水',
    comm     DECIMAL(10, 2) COMMENT '奖金/佣金',
    deptno   BIGINT COMMENT '部门编号'
)
COMMENT '员工信息表'
PARTITIONED BY (
    ds STRING COMMENT '数据分区,格式为 YYYYMMDD'
);

-- 插入数据。
INSERT OVERWRITE TABLE emp PARTITION (ds = '20251229')
SELECT * FROM (
    VALUES
        (7369, 'SMITH', 'CLERK', 7902, CAST('1980-12-17 00:00:00' AS DATETIME), 800.00, NULL, 20),
        (7499, 'ALLEN', 'SALESMAN', 7698, CAST('1981-02-20 00:00:00' AS DATETIME), 1600.00, 300.00, 30),
        (7521, 'WARD', 'SALESMAN', 7698, CAST('1981-02-22 00:00:00' AS DATETIME), 1250.00, 500.00, 30),
        (7566, 'JONES', 'MANAGER', 7839, CAST('1981-04-02 00:00:00' AS DATETIME), 2975.00, NULL, 20),
        (7654, 'MARTIN', 'SALESMAN', 7698, CAST('1981-09-28 00:00:00' AS DATETIME), 1250.00, 1400.00, 30),
        (7698, 'BLAKE', 'MANAGER', 7839, CAST('1981-05-01 00:00:00' AS DATETIME), 2850.00, NULL, 30),
        (7782, 'CLARK', 'MANAGER', 7839, CAST('1981-06-09 00:00:00' AS DATETIME), 2450.00, NULL, 10),
        (7788, 'SCOTT', 'ANALYST', 7566, CAST('1987-04-19 00:00:00' AS DATETIME), 3000.00, NULL, 20),
        (7839, 'KING', 'PRESIDENT', NULL, CAST('1981-11-17 00:00:00' AS DATETIME), 5000.00, NULL, 10),
        (7844, 'TURNER', 'SALESMAN', 7698, CAST('1981-09-08 00:00:00' AS DATETIME), 1500.00, 0.00, 30),
        (7876, 'ADAMS', 'CLERK', 7788, CAST('1987-05-23 00:00:00' AS DATETIME), 1100.00, NULL, 20),
        (7900, 'JAMES', 'CLERK', 7698, CAST('1981-12-03 00:00:00' AS DATETIME), 950.00, NULL, 30),
        (7902, 'FORD', 'ANALYST', 7566, CAST('1981-12-03 00:00:00' AS DATETIME), 3000.00, NULL, 20),
        (7934, 'MILLER', 'CLERK', 7782, CAST('1982-01-23 00:00:00' AS DATETIME), 1300.00, NULL, 10),
        (7948, 'JACCKA', 'CLERK', 7782, CAST('1981-04-12 00:00:00' AS DATETIME), 5000.00, NULL, 10),
        (7956, 'WELAN', 'CLERK', 7649, CAST('1982-07-20 00:00:00' AS DATETIME), 2450.00, NULL, 10),
        (7957, 'TEBAGE', 'CLERK', 7748, CAST('1982-12-30 00:00:00' AS DATETIME), 1300.00, NULL, 10)
) AS t (empno, ename, job, mgr, hiredate, sal, comm, deptno);

CREATE TABLE IF NOT EXISTS customer_behavior (
    user_id     BIGINT COMMENT '用户唯一标识',
    action      STRING COMMENT '用户行为: pv (浏览), fav (收藏), cart (加购)',
    age         BIGINT COMMENT '用户年龄',
    gender      BIGINT COMMENT '用户性别: 0-男性, 1-女性',
    price       DOUBLE COMMENT '商品价格',
    category    STRING COMMENT '商品类别',
    buy         BIGINT COMMENT '是否购买: 0-未购买, 1-已购买 (此为模型标签)'
)
COMMENT '客户行为日志表'
PARTITIONED BY (
    ds STRING COMMENT '数据分区, 格式 YYYYMMDD'
);

-- 插入测试
INSERT OVERWRITE TABLE customer_behavior PARTITION (ds = '20251221')
SELECT * FROM (
    VALUES
        -- 一些明确不购买的浏览行为
        (1001, 'pv', 25, 1, 35.50, 'Books', 0),
        (1002, 'pv', 42, 0, 4999.00, 'Electronics', 0),
        (1003, 'pv', 31, 1, 88.00, 'Groceries', 0),
        (1004, 'pv', 19, 0, 120.00, 'Clothing', 0),

        -- 收藏行为,部分可能转化,部分不转化
        (1005, 'fav', 28, 1, 299.00, 'Clothing', 0),
        (1006, 'fav', 35, 0, 8900.00, 'Electronics', 0),
        (1007, 'fav', 22, 1, 85.00, 'Books', 1),  -- 收藏后购买

        -- 加购行为,转化率更高
        (1008, 'cart', 26, 0, 250.00, 'Sports', 0), -- 加购后放弃
        (1009, 'cart', 45, 1, 1500.00, 'Home Goods', 1),
        (1010, 'cart', 33, 1, 55.00, 'Groceries', 1),
        (1011, 'cart', 29, 0, 6800.00, 'Electronics', 1),
        (1012, 'cart', 50, 0, 320.00, 'Home Goods', 1),
        (1013, 'cart', 21, 1, 450.00, 'Clothing', 0), -- 年轻用户加购后放弃

        -- 更多混合数据
        (1014, 'pv', 38, 0, 99.00, 'Books', 0),
        (1015, 'pv', 23, 1, 75.00, 'Groceries', 0),
        (1016, 'cart', 30, 0, 180.00, 'Sports', 1),
        (1017, 'fav', 41, 1, 420.00, 'Home Goods', 0),
        (1018, 'pv', 65, 0, 12000.00, 'Electronics', 0), -- 价格过高,只是看看
        (1019, 'cart', 27, 1, 88.00, 'Clothing', 1),
        (1020, 'pv', 36, 0, 240.00, 'Sports', 0),
        (1021, 'cart', 39, 1, 120.00, 'Groceries', 1),
        (1022, 'fav', 24, 0, 45.00, 'Books', 0),
        (1023, 'cart', 32, 1, 3500.00, 'Electronics', 1),
        (1024, 'pv', 29, 0, 650.00, 'Home Goods', 0),
        (1025, 'cart', 48, 1, 95.00, 'Groceries', 1)

) AS t (user_id, action, age, gender, price, category, buy);

示例一:Pandas数据处理作业改造

假设输入、输出数据都保存在MaxCompute。

原始代码片段

import os
import maxframe.dataframe as md
from odps import ODPS,DataFrame

# 连接 MaxCompute
o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)

# 通过PyODPS 以运行SQL的方式读取MaxCompute表中的数据。
pyodps_pd = o.execute_sql("select * from emp where ds = '20251229'")

# 将PyODPS DataFrame转为Pandas DataFrame,本地执行计算
df_pd = pyodps_pd.to_pandas()

result = df_pd.groupby('deptno').agg({'sal': 'sum'})

# 计算结果写回到 MaxCompute表

pyodps_df = DataFrame(result)

pyodps_df.persist('result_table')

MaxFrame改造方案

import maxframe.dataframe as md
from maxframe import new_session
from odps import ODPS

# 初始化会话  
session = new_session(o)

# 打印执行日志
print(session.get_logview_address())

# 读取 MaxCompute 表数据  
df_pd = md.read_odps_query("select * from emp")  

# 基于MaxCompute资源分布式计算
result = df_pd.groupby('deptno').agg({'sal': 'sum'})

# 计算结果写回到 MaxCompute表
result.to_odps_table("result_table_mf")

# 关闭会话  
session.destroy()

改造效果对比:

指标

单机模式

MaxFrame 分布式模式

数据规模

< x 万行

TB/PB 级

处理时间

小时级

分钟级

资源使用

单机资源,限制在GB

支持数万CU弹性资源

示例二:机器学习模型训练

原始XGBoost代码

import os
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score
from odps import ODPS

o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)

# --- 数据读取和准备 ---
print("正在从 MaxCompute 读取数据...")
pyodps_pd = o.execute_sql("select * from customer_behavior where ds = ('20251221')")
df_pd = pyodps_pd.to_pandas()
print("数据读取完毕。")

# 定义特征和标签
features = ["action", "age", "gender", "price", "category"]
label = "buy"

x = df_pd[features]
y = df_pd[label]

x_encoded = pd.get_dummies(x, columns=['action', 'category'], drop_first=True)

# --- 划分训练集和测试集 ---
x_train, x_test, y_train, y_test = train_test_split(x_encoded, y, test_size=0.2, random_state=42)

# 创建 DMatrix
dtrain = xgb.DMatrix(x_train, label=y_train)
dtest = xgb.DMatrix(x_test, label=y_test) # 直接创建dtest,用于评估

# --- 模型训练 ---
# 配置XGBoost参数
params = {
    'objective': 'binary:logistic', # 明确指定为二分类任务
    'eval_metric': 'auc',           # 指定评估指标
    'learning_rate': 0.1,
    'colsample_bytree': 0.8,
    'tree_method': 'hist',
    'n_jobs': -1
}

print("开始训练XGBoost模型...")
# 训练模型
model = xgb.train(
    params,
    dtrain,
    num_boost_round=100, # 推荐使用 num_boost_round 而不是 n_estimators
    evals=[(dtest, 'test')],
    early_stopping_rounds=10,
    verbose_eval=10
)
print("模型训练完成。")

# --- 模型评估 ---
print("\n--- 模型评估结果 ---")
# 使用模型对测试集数据进行预测
y_pred_proba = model.predict(dtest, iteration_range=(0, model.best_iteration))

# 将概率转换为0或1的类别
y_pred_class = [1 if prob > 0.5 else 0 for prob in y_pred_proba]

# 计算并打印评估指标
accuracy = accuracy_score(y_test, y_pred_class)
# AUC可以直接从训练过程中的evals获取,也可以重算
auc = roc_auc_score(y_test, y_pred_proba)

print(f"最佳迭代次数: {model.best_iteration}")
print(f"测试集准确率 (Accuracy): {accuracy:.4f}")
print(f"测试集AUC (Area Under Curve): {auc:.4f}")

MaxFrame分布式改造

from maxframe import new_session
import maxframe.dataframe as md
from maxframe import config
from datetime import datetime, timedelta
from maxframe.learn.contrib.xgboost import XGBClassifier

o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)

# 设置数据处理时,UDF 执行所用的镜像
config.options.sql.settings = {"odps.session.image": "common"}

# 创建 Session
session = new_session(o)

# 定义原始 MaxFrame DataFrame 
all_data = md.read_odps_table("customer_behavior")

# 拆分训练和预测数据
# 这里我们假设用80%的数据训练,对全量数据预测
train_data = all_data.sample(frac=0.8, random_state=123)
predict_data = all_data

# 定义特征和标签
features = ["action", "age", "gender", "price", "category"]
label = "buy"

x_train = train_data[features]
y_train = train_data[label]

# 初始化模型
model = XGBClassifier(
    n_estimators=300,
    learning_rate=0.1,
    colsample_bytree=0.8,
    n_jobs=20, # 在MaxCompute上,可以指定较大的核数
    tree_method="hist"
)

# 拟合模型(构建计算图)
model.fit(x_train, y_train)

# 对 predict_data 数据集进行预测(构建计算图)
# model.predict 会返回一个包含预测结果的新 DataFrame
predicted_result_df = model.predict(predict_data[features], predict_proba=True)

# 将原始特征和预测结果合并在一起
final_df = predict_data.merge(predicted_result_df, on=predict_data.index.names)

print("\n--- 触发计算并查看前5条预测结果 ---")
# 为了方便查看,我们只选择部分关键列
columns_to_show = ['user_id', 'action', 'buy', 'prediction_result', 'prediction_detail']
# .head() 会将计算结果拉取到本地并打印
print(final_df[columns_to_show].head(5))
print("---------------------------------------")

# 关闭会话  
session.destroy()

示例三:自定义UDF apply、apply_chunk实现

性能优化方式

  • 避免在 UDF 中引入全局依赖(如未安装的库),需通过 @with_python_requirements 声明依赖 。

  • 按需设置 batch_rows 以平衡内存与并行度。

通过apply接口调用自定义UDF

  • 用于对 行/列 应用函数(如 axis=1 表示逐行处理)。

  • 支持延迟执行(Lazy Evaluation),需调用 .execute() 触发实际计算 。

import os
import numpy as np
import maxframe.dataframe as md
from odps import ODPS
from maxframe.session import new_session

o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)

session = new_session(o)

# 创建分布式 DataFrame
df = md.DataFrame(np.random.randint(0, 100, size=(100, 2)), columns=["a", "b"])

# 定义 UDF:计算列平方
def square_row(row):
    return row ** 2
    
# 应用 apply
result_df = df.apply(square_row, axis=1).execute()
print(result_df.fetch())

session.destroy()

# 输出结果
       a     b
0   4489  7569
1   7225   841
2    441  7921
3   9604     9
4   5329   196
..   ...   ...
95    16  2209
96  6724   784
97   441  1521
98   729     1
99  3249  6889

通过apply_chunk接口调用自定义 UDF 分块处理数据

  • 对数据分块 应用函数(如按batch_rows 划分数据块)。

  • 适用于需要局部聚合或分块优化的场景(如内存敏感操作)。

详情参见apply_chunk API说明

import os
import numpy as np
import maxframe.dataframe as md
from odps import ODPS
from maxframe.session import new_session

o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)

session = new_session(o)

# 创建示例数据
df = md.DataFrame([[4, 9]] * 3, columns=['A', 'B'])

# 按3行分块求和
result = df.mf.apply_chunk(np.sum, batch_rows=3).execute()
print(result.fetch())

session.destroy()

# 输出结果
A    12
B    27
dtype: int64