基于MaxFrame实现分布式Pandas处理

MaxFrame可以在分布式环境下使用与Pandas相同的API来分析数据,通过MaxFrame,您能够以高于开源Pandas数十倍的性能在MaxCompute上快速完成数据分析和计算工作。本文为您介绍如何通过MaxFrame使用常用的Pandas算子。

前提条件

已安装MaxFrame,详情请参见准备工作

数据准备

  1. 在安装了MaxFrame的Python环境下运行如下脚本,准备测试表和测试数据。

    from odps import ODPS
    from maxframe.session import new_session
    import maxframe.dataframe as md
    import pandas as pd
    import os
    
    o = ODPS(
        # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
        # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
        # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    data_sets = [{
        "table_name": "product",
        "table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint",
        "source_type": "records",
        "records" : [
            [1, 100, 'Nokia', 1000],
            [2, 200, 'Apple', 5000],
            [3, 300, 'Samsung', 9000]
        ],
    },
    {
        "table_name" : "sales",
        "table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
        "source_type": "records",
        "records" : [
            [1, 1, 100, 101, 2008, 10, 5000],
            [2, 2, 300, 101, 2009, 7, 4000],
            [3, 4, 100, 102, 2011, 9, 4000],
            [4, 5, 200, 102, 2013, 6, 6000],
            [5, 8, 300, 102, 2015, 10, 9000],
            [6, 9, 100, 102, 2015, 6, 2000]
        ],
        "lifecycle": 5
    }]
    
    def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
        for index, data in enumerate(data_sets):
            table_name = data.get("table_name")
            table_schema = data.get("table_schema")
            source_type = data.get("source_type")
            
            if not table_name or not table_schema or not source_type:
                raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.")
    
            lifecycle = data.get("lifecycle", 5)
            table_name += suffix
            
            print(f"Processing {table_name}...")
            if drop_if_exists:
                print(f"Deleting {table_name}...")
                o.delete_table(table_name, if_exists=True)
            
            o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True)
    
            if source_type == "local_file":
                file_path = data.get("file")
                if not file_path:
                    raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.")
                sep = data.get("sep", ",")
                pd_df = pd.read_csv(file_path, sep=sep)
                ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
            elif source_type == 'records':
                records = data.get("records")
                if not records:
                    raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.")
                with o.get_table(table_name).open_writer() as writer:
                    writer.write(records)
            else:
                raise ValueError(f"Unknown data set source_type: {source_type}")
            
            print(f"Processed {table_name} Done")
    
    prepare_data(o, data_sets, "_maxframe_demo", True)

    参数说明:

    • ALIBABA_CLOUD_ACCESS_KEY_ID:需将该环境变量设置为具备目标MaxCompute项目中待操作对象相关MaxCompute权限的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为AccessKey ID对应的AccessKey Secret。

    • your-default-project:使用的MaxCompute项目名称。您可以登录MaxCompute控制台,在左侧导航栏选择工作区>项目管理,查看MaxCompute项目名称。

    • your-end-point:目标MaxCompute项目所在地域的Endpoint,可根据网络连接方式自行选择,例如http://service.cn-chengdu.maxcompute.aliyun.com/api。详情请参见Endpoint

  2. 查询sales_maxframe_demo表和product_maxframe_demo表的数据,SQL命令如下。

    --查询sales_maxframe_demo表
    SELECT * FROM sales_maxframe_demo;
    
    --返回
    +------------+------------+------------+------------+------------+------------+------------+
    | index      | sale_id    | product_id | user_id    | year       | quantity   | price      |
    +------------+------------+------------+------------+------------+------------+------------+
    | 1          | 1          | 100        | 101        | 2008       | 10         | 5000       |
    | 2          | 2          | 300        | 101        | 2009       | 7          | 4000       |
    | 3          | 4          | 100        | 102        | 2011       | 9          | 4000       |
    | 4          | 5          | 200        | 102        | 2013       | 6          | 6000       |
    | 5          | 8          | 300        | 102        | 2015       | 10         | 9000       |
    | 6          | 9          | 100        | 102        | 2015       | 6          | 2000       |
    +------------+------------+------------+------------+------------+------------+------------+
    
    
    --查询product_maxframe_demo表数据
    SELECT * FROM product_maxframe_demo;
    
    --返回
    +------------+------------+--------------+---------------+
    | index      | product_id | product_name | current_price |
    +------------+------------+--------------+---------------+
    | 1          | 100        | Nokia        | 1000          |
    | 2          | 200        | Apple        | 5000          |
    | 3          | 300        | Samsung      | 9000          |
    +------------+------------+--------------+---------------+

使用MaxFrame进行数据分析

场景1:使用merge方法连接两张数据表,以获取sales_maxframe_demo表中所有sale_id对应的product_name以及该产品的所有yearprice

  • 示例代码

    from odps import ODPS
    from maxframe.session import new_session
    import maxframe.dataframe as md
    import os
    
    o = ODPS(
        # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
        # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
        # 不建议直接使用 Access Key ID / Access Key Secret 字符串
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    session = new_session(o)
    
    #session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。
    print(session.session_id)
    
    sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
    product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
    
    #这里的df并不会立即执行,除非您使用df.execute()来触发。
    #这意味着所有的计算都将最终完全在MaxCompute集群完成,避免了中间所不必要的数据传输和阻塞。
    df = sales.merge(product, left_on="product_id", right_index=True)
    df = df[["product_name", "year", "price"]]
    
    print(df.execute().fetch())
    
    #保存结果到MaxCompute表中,并销毁Session
    md.to_odps_table(df, "result_df", overwrite=True).execute()
    
    session.destroy()

    返回结果:

    index product_name  year  price                   
    1            Nokia  2008   5000
    2          Samsung  2009   4000
    3            Nokia  2011   4000
    4            Apple  2013   6000
    5          Samsung  2015   9000
    6            Nokia  2015   2000
  • 性能对比

    在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算,可以得到如下耗时对比结果:

    环境

    耗时(单位:秒)

    本地Pandas(版本为1.3.5)

    65.8

    MaxFrame

    22

场景2:选出每个出售过的产品第一年销售的产品ID、年份、数量和价格

  • 示例代码:

    from odps import ODPS
    from maxframe.session import new_session
    import maxframe.dataframe as md
    import os
    
    o = ODPS(
        # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
        # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
        # 不建议直接使用 Access Key ID / Access Key Secret 字符串
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    session = new_session(o)
    
    #session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。
    print(session.session_id)
    
    # 聚合获取每个产品的第一个年份
    min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
    min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))
    
    # join 找到对应的销售记录
    sales = md.read_odps_table("sales_maxframe_demo", index_col=['product_id', 'year'])
    result_df = md.merge(sales, min_year_df, 
                            left_index=True, 
                            right_on=['product_id','first_year'],
                            how='inner')
    
    #这里的result_df并不会立即执行,除非您使用 result_df.execute()来触发。
    #这意味着所有的计算都将最终完全在MaxCompute中集群完成,避免了中间所不必要的数据传输和阻塞。
    result_df = result_df[['product_id', 'first_year', 'quantity', 'price']]
    
    print(result_df.execute().fetch())
    
    #销毁 Session
    session.destroy()

    返回结果:

        product_id  first_year  quantity  price
    100         100        2008        10   5000
    300         300        2009         7   4000
    200         200        2013         6   6000
  • 性能对比:

    在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算,可以得到如下耗时对比结果:

    环境

    耗时(单位:秒)

    本地Pandas(版本为1.3.5)

    186

    MaxFrame

    21

场景3:为每个用户获取其消费最多的产品ID

说明

该场景将演示多次groupby、join、drop_duplicates和sort_values操作。

  • 示例代码

    from odps import ODPS
    from maxframe.session import new_session
    import maxframe.dataframe as md
    import os
    
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    session = new_session(o)
    
    #session id 是一串用于关联 MaxFrame task 的字符,对于调试和追踪任务状态有重要的作用。
    print(session.session_id)
    
    sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
    product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
    
    sales['total'] = sales['price'] * sales['quantity']
    
    product_cost_df = sales.groupby(['product_id', 'user_id'], as_index=False).agg(user_product_total=('total','sum'))
    product_cost_df = product_cost_df.merge(product, left_on="product_id", right_index=True, how='right')
    
    user_cost_df = product_cost_df.groupby('user_id').agg(max_total=('user_product_total', 'max'))
    merge_df = product_cost_df.merge(user_cost_df, left_on='user_id', right_index=True)
    
    #这里的 result_df 并不会立即执行,除非您使用 result_df.execute()来触发。
    #这意味着所有的计算都将最终完全在 MaxCompute 中集群完成,避免了中间所不必要的数据传输和阻塞。
    result_df = merge_df[merge_df['user_product_total'] == merge_df['max_total']][['user_id', 'product_id']].drop_duplicates().sort_values(['user_id'], ascending = [1])
    
    print(result_df.execute().fetch())
    
    #销毁 Session
    
    session.destroy()

    返回结果:

         user_id  product_id
    100      101         100
    300      102         300
  • 性能对比

    在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行计算,可以得到如下耗时对比结果:

    环境

    耗时(单位:秒)

    本地Pandas(版本为1.3.5)

    176

    MaxFrame

    85

结论

MaxFrame兼容Pandas接口且自动进行分布式处理,在保证强大数据处理能力的同时,可以大幅度提高数据处理规模及计算效率。