MaxFrame apply_chunk算子使用实践

本文介绍如何使用MaxFrame DataFrame.mf.apply_chunk算子实现大规模数据处理,内容涵盖其在MaxFrame常见场景下的核心用法、常见误区及生产级最佳实践。

apply_chunk功能介绍

在分布式计算框架中:

  • 标准pandas.DataFrame.apply()

    单机操作,无法扩展。

  • MaxFrame df.apply()

    • 支持分布式的逐行/逐块映射

    • apply() 默认按“行”执行计算,对于大规模数据处理效率不高。

MaxFrame apply_chunk()可以满足如下功能:

  • 按批处理,显式控制每个chunk的大小,即batch_rows

  • 支持自定义输出类型与结构(如:output_typedtypesindex_value)。

  • 支持结合UDF 装饰器(如 @with_python_requirements),实现复杂任务开发。

对性能敏感的任务,建议优先使用apply_chunk

签名解析

DataFrame.mf.apply_chunk(
    func,
    batch_rows=None,
    output_type=None,
    dtypes=None,
    index=None,
    index_value=None,
    columns=None,
    elementwise=None,
    sort=False,
    **kwds
)

参数说明

参数

类型

说明

func

callable

用户自定义函数,接收 Pandas DataFrame 并输出 Pandas DataFrame/Series。

该函数的输入Pandas DataFrame 是DataFrame的一个区块,可视为一批行数据。

batch_rows

int

每个chunk的最大行数。

output_type

str

输出类型。例如"dataframe"、"series"等。

dtypes

pd.Series

输出列的数据类型。

index

Index

输出索引对象。

index_value

IndexValue

分布式索引元信息,建议从原df获取。

sort

bool

groupby场景下,是否对group内部排序。

使用示例

import os
import pyarrow as pa
import pandas as pd
import maxframe.dataframe as md
from maxframe.lib.dtypes_extension import dict_
from maxframe import new_session
from odps import ODPS

o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<your region>.maxcompute.aliyun.com/api',
)

session = new_session(o)

# 构建测试数据(Pandas DataFrame)
col_a = pd.Series(
    data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None],
    index=[1, 2, 3],
    dtype=dict_(pa.string(), pa.int64()),
)
col_b = pd.Series(
    data=["A", "B", "C"],
    index=[1, 2, 3],
)
df = md.DataFrame({"A": col_a, "B": col_b})
df.execute()


# 自定义函数
def custom_set_item(df):
    for name, value in df["A"].items():
        if value is not None:
            df["A"][name]["x"] = 100
    return df


# 调用 apply_chunk
result_df = df.mf.apply_chunk(
    custom_set_item,
    output_type="dataframe",
    dtypes=df.dtypes.copy(),
    batch_rows=2,
    skip_infer=True,
    index=df.index,
).execute()

session.destroy()

执行及性能调优建议

显式声明 output_type 和 dtypes

  • 不建议依赖类型推断,可能会导致运行失败或性能下降:

    result_df = df.mf.apply_chunk(<process>)  # 缺少 dtypes!
  • 推荐显示声明

    重要

    不能直接修改原dfdtypes,应使用.copy() 或重新构造。

    在上文代码中进一步修改:

    def process(df_chunk):
        # 这里可以有逻辑,也可以简单地直接返回,关键是它的输出结构
        return df_chunk.copy() # 返回一个包含 'A' 和 'B' 的 DataFrame
        
    def get_incorrect_dtypes(df):
        # 返回一个不完整的 dtypes,故意与 UDF 的输出不符
        return df.dtypes.drop('A')
    
    incorrect_dtypes = get_incorrect_dtypes(df)
          
    print("\n--- 正在尝试使用不匹配的 dtypes 调用 apply_chunk (预期会报错) ---")
    try:
        result_df = df.mf.apply_chunk(
            process,
            output_type="dataframe",
            dtypes=incorrect_dtypes  # 使用了错误的 dtypes
        ).execute()
    except Exception as e:
        print("成功捕获到预期错误!错误信息如下:")
        print(f"错误类型: {type(e)}")
        print(f"错误详情: {e}\n")
        # 错误通常会是类似 'ValueError: forward method expected 1 arguments, but get 2'
        # 因为 dtypes 说只有1列 ('B'),但 UDF 实际返回了2列 ('A', 'B')
    
    print("--- 正确使用方式如下 ---")
    #    正确的方式是,dtypes 必须精确描述 UDF(process) 的输出
    #    因为 process 返回了完整的 DataFrame,所以正确的 dtypes 就是原始的 dtypes
    correct_dtypes = df.dtypes.copy()
    
    result_df = df.mf.apply_chunk(
        custom_set_item,
        output_type="dataframe",
        dtypes=correct_dtypes,
        index=df.index
    )
    
    final_result = result_df.execute().fetch()
    
    print("\n使用正确的 dtypes 后,执行结果如下:")
    print(final_result)
    
    session.destroy()

合理设置batch_rows控制内存与并发

  • 避免单个task OOM。

  • 提高并行度。

  • 更好地配合@with_running_options(memory=...)使用资源。

调试技巧:打印中间结果 & 异常捕获

由于 UDF 运行在远程 MaxCompute Worker 上,标准 print 日志可通过 LogView 查看。

设置flush=True 确保日志及时输出,便于排查。

def process(chunk):
    try:
        print(f"Processing chunk with shape: {chunk.shape}", flush=True)
        print(f"Columns: {list(chunk.columns)}", flush=True)
        result = chunk.sort_values("B")
        print("Success.", flush=True)
        return result
    except Exception as e:
        print(f"[ERROR] Failed to process chunk: {str(e)}", flush=True)
        raise
        

性能调优建议

  • 批次大小:batch_rows根据数据量大小及资源合理设置,避免过大。

  • 输出列数:尽量只返回必要字段。

  • 函数复杂度:避免在 UDF 中做 heavy compute。

  • 外部依赖:使用@with_running_options(memory=16)提升内存。

常见问题

TypeError: cannot determine dtype

  • 问题原因

    未提供dtypes

  • 解决方案

    显式传入pd.Series类型。

输出为空或丢失列

  • 问题原因

    dtypes不匹配。

  • 解决方案

    检查函数返回值列名是否一致。

执行超时或卡住

  • 问题原因

    batch_rows过大。

  • 解决方案

    减小批次,增加资源。