快速开始

MaxFrame为您提供兼容Pandas的API接口,用于数据处理。其中包括筛选、投影、拼接和聚合等基本API,及用于调用自定义函数的高级API(如transform、apply),高级API可以实现特定业务逻辑和数据操作,从而解决标准算子可能无法覆盖复杂场景的问题。同时MaxFrame结合大数据的处理需求,引入了特有的API接口,如读写MaxCompute表格数据(read_odps_table、to_odps_table)、执行延迟计算(execute)等,让您可以更高效地在大数据环境下进行数据分析,不受本地计算资源的限制。

数据准备

本文以MaxCompute公共数据集中的maxframe_ml_100k_users表为例,为您介绍MaxFrame的使用。测试数据已存放在MaxCompute公开项目BIGDATA_PUBLIC_DATASET下的data_science Schema中,您可直接使用。

初始化会话

执行MaxFrame作业前,首先需要初始化MaxFrame会话(Session)。在代码的入口处,通过调用new_session 接口初始化整个作业。后续的数据处理将借助所构建的会话对象与后端服务进行交互,以执行各种数据操作。示例如下:

import os
from maxframe import new_session
from odps import ODPS

# 使用MaxFrame相关账号初始化ODPS
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)

# 初始化MaxFrame会话
new_session(odps_entry=o)

更多关于new_session接口的使用细节,请参见new_session

构建DataFrame对象

read_odps_tableread_odps_query接口允许您基于MaxCompute表创建DataFrame对象,且这些DataFrame对象支持Pandas式的数据操作。MaxFrame还支持使用本地数据初始化DataFrame对象,便于您快速进行测试和开发。

  • 使用MaxCompute表

    您可通过read_odps_table接口执行以下操作:

    • 通过read_odps_table接口读取MaxCompute表数据,并将其转换为MaxFrame DataFrame对象。

      import maxframe.dataframe as md
      
      # 从MaxCompute表读取数据,创建DataFrame
      df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users')
    • 通过指定index_col参数,选择表中的某列作为DataFrame的索引。

      import maxframe.dataframe as md
      
      # 使用数据库的id列作为DataFrame的index
      df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col="user_id")
    • 通过指定columns参数,选择表中的部分列构建DataFrame对象。

      import maxframe.dataframe as md
      
      # 择表中的部分列构建DataFrame对象
      df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', columns=['user_id', 'age', 'sex'])

      更多关于read_odps_tableAPI的使用细节,请参见read_odps_table

  • 使用MaxCompute SQL查询结果

    除了直接使用表构建DataFrame,MaxFrame还支持通过read_odps_query接口执行SQL查询,并以查询结果作为DataFrame的数据输入。

    import maxframe.dataframe as md
    
    df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`')

    使用index_col参数指定DataFrame的索引:

    import maxframe.dataframe as md
    
    df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id')

    更多关于read_odps_queryAPI的使用细节,请参见read_odps_query

  • 使用本地定义的数据

    MaxFrame支持类似于Pandas的使用体验,可以基于本地数据直接构建MaxFrame DataFrame对象。示例如下:

    import maxframe.dataframe as md
    
    d = {'col1': [1, 2], 'col2': [3, 4]}
    df = md.DataFrame(data=d)
    print(df.execute().fetch())
    # 返回结果
       col1  col2
    0     1     3
    1     2     4
    
    df = md.DataFrame(data=d, dtype=np.int8)
    print(df.execute().fetch())
    # 返回结果
       col1  col2
    0     1     3
    1     2     4
    
    d = {'col1': [0, 1, 2, 3], 'col2': pd.Series([2, 3], index=[2, 3])}
    df = md.DataFrame(data=d, index=[0, 1, 2, 3])
    print(df.execute().fetch())
    # 返回结果
       col1  col2
    0     0   NaN
    1     1   NaN
    2     2   2.0
    3     3   3.0
    
    data = np.array([(1, 2, 3), (4, 5, 6), (7, 8, 9)],
                    dtype=[("a", "i4"), ("b", "i4"), ("c", "i4")])
    df = md.DataFrame(data, columns=['c', 'a'])
    df.execute().fetch()
    # 返回结果
       c  a
    0  3  1
    1  6  4
    2  9  7

    更多使用细节,请参见DataFrame

数据处理

MaxFrame提供了一系列兼容Pandas的API接口,覆盖了数据计算、投影、过滤及排序等多种操作,且支持丰富的算子用于常规数据处理。除此之外,MaxFrame还支持UDF,以满足用户使用自定义函数进行数据加工和分析的需求。

数学计算

各种数学计算操作能够直接在DataFrame上进行,包括加法、减法、乘法及除法等。下述示例为您展示如何使用MaxFrame进行基本的算术操作。

  • 示例1:简单的数据加法。

    import maxframe.dataframe as md
    
    df = md.DataFrame({'angles': [0, 3, 4],
                       'degrees': [360, 180, 360]},
                      index=['circle', 'triangle', 'rectangle'])
    print(df.execute().fetch())
    # 返回结果
               angles  degrees
    circle          0      360
    triangle        3      180
    rectangle       4      360
    
    df = df + 1
    print(df.execute().fetch())
    # 返回结果
               angles  degrees
    circle          1      361
    triangle        4      181
    rectangle       5      361
  • 示例2:DataFrame之间的乘法。

    import maxframe.dataframe as md
    
    df = md.DataFrame({'angles': [0, 3, 4],
                       'degrees': [360, 180, 360]},
                      index=['circle', 'triangle', 'rectangle'])
    
    other = md.DataFrame({'angles': [0, 3, 4]},
                         index=['circle', 'triangle', 'rectangle'])
    
    print(df.mul(other, fill_value=0).execute())
    
    # 返回结果
               angles  degrees
    circle          0      0.0
    triangle        9      0.0
    rectangle      16      0.0

更多关于数学运算接口的信息,请参见Binary operator functionsComputations / descriptive stats

过滤/投影/抽样

过滤

过滤操作允许您根据特定的条件从DataFrame中选择或排除数据。该操作对于处理和分析大型数据集至关重要,允许您集中关注其中最具相关性的信息。

  • 示例1:显示前几行数据。

    import maxframe.dataframe as md
    
    df = md.DataFrame({'animal': ['alligator', 'bee', 'falcon', 'lion',
                       'monkey', 'parrot', 'shark', 'whale', 'zebra']})
    print(df.head().execute().fetch())
    
    # 返回结果
          animal 
    0  alligator 
    1        bee 
    2     falcon 
    3       lion 
    4     monkey 
  • 示例2:删除指定的列。

    import maxframe.dataframe as md                                    
    
    df = md.DataFrame(np.arange(12).reshape(3, 4), columns=['A', 'B', 'C', 'D'])
    print(df.drop(['B', 'C'], axis=1).execute().fetch())
    
    # 返回结果
       A   D
    0  0   3
    1  4   7
    2  8  11

投影

您可以通过投影重塑DataFrame的结构,包括要展示的列或者重新安排列的顺序。通过投影操作,可以创建数据的简化视图或者调整数据的展示方式,以满足特定的分析目的。

示例:修改列名

import maxframe.dataframe as md
df = md.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
print(df.set_axis(['I', 'II'], axis='columns').execute().fetch())

# 返回结果
   I  II 
0  1   4 
1  2   5 
2  3   6 

抽样

抽样是一种选择DataFrame中随机样本的过程,对于处理大型数据集以及估计数据的统计属性尤为重要。

示例:随机抽样

import maxframe.dataframe as md

df = md.DataFrame({'num_legs': [2, 4, 8, 0],
                        'num_wings': [2, 0, 0, 0],
                        'num_specimen_seen': [10, 2, 1, 8]},
                       index=['falcon', 'dog', 'spider', 'fish'])
print(df['num_legs'].sample(n=3, random_state=1).execute())

# 返回结果
falcon    2
fish      0
dog       4
Name: num_legs, dtype: int64

更多关于过滤、投影和抽样的信息请参见Reindexing / selection / label manipulation

排序

排序允许您根据一列或多列的值重新排列DataFrame中行的顺序,常用于数据分析。通过排序,可以更容易地观察到数据的模式、趋势和异常点。

示例:按照单列或多列排序

import maxframe.dataframe as md
import numpy as np
df = md.DataFrame({                            
    'col1': ['A', 'A', 'B', np.nan, 'D', 'C'], 
    'col2': [2, 1, 9, 8, 7, 4],                
    'col3': [0, 1, 9, 4, 2, 3],                
})            
res = df.sort_values(by=['col1']).execute()
print(res.fetch())

# 返回结果
   col1  col2  col3
0     A     2     0
1     A     1     1
2     B     9     9
5     C     4     3
4     D     7     2
3  None     8     4


res =  df.sort_values(by=['col1', 'col2']).execute()
print(res.fetch())

# 返回结果
     col1 col2 col3
 1   A    1    1   
 0   A    2    0   
 2   B    9    9   
 5   C    4    3   
 4   D    7    2   
 3   None  8    4   

更多关于排序操作的细节,请参见Reshaping / sorting / transposing

拼接

拼接操作是数据处理中非常基本且强大的工具,允许您将不同的数据集基于某些公共字段或索引,水平或垂直地结合起来。MaxFrame提供了拼接接口,帮助您轻松完成数据集合拼接操作。

示例:水平拼接

import maxframe.dataframe as md
df = md.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
                        'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})

other = md.DataFrame({'key': ['K0', 'K1', 'K2'],
                       'B': ['B0', 'B1', 'B2']})

print(df.join(other, lsuffix='_caller', rsuffix='_other').execute().fetch())

# 返回结果
  key_caller   A key_other     B
0         K0  A0        K0    B0
1         K1  A1        K1    B1
2         K2  A2        K2    B2
3         K3  A3      None  None
4         K4  A4      None  None
5         K5  A5      None  None

更多关于拼接操作的使用细节和方法,请参见Combining / joining / merging

聚合/UDF

聚合

聚合操作是将一组值转换为一个单一值的过程。在数据分析中,聚合是基本的数据归纳工具,用于总结、发现数据的统计特征。

示例:多类型聚合

import maxframe.dataframe as md

df = md.DataFrame([[1, 2, 3],
                   [4, 5, 6],
                   [7, 8, 9],
                   [np.nan, np.nan, np.nan]],
                  columns=['A', 'B', 'C'])
print(df.agg(['sum', 'min']).execute().fetch())

# 返回结果
        A     B     C
min   1.0   2.0   3.0
sum  12.0  15.0  18.0

df.agg({'A' : ['sum', 'min'], 'B' : ['min', 'max']}).execute().fetch()

# 返回结果
        A    B
max   NaN  8.0
min   1.0  2.0
sum  12.0  NaN

UDF

除了标准的数据处理算子外,MaxFrame还支持UDF,以支持在DataFrame对象上执行用户自定义的函数处理。这增加了数据处理的灵活性,允许使用自定义的逻辑对数据集进行更丰富的操作。

重要

执行UDF前,需要在new_session之前通过config.options.sql.settings参数声明使用common镜像。

  • 示例1:使用transform方法调用自定义函数

    import maxframe.dataframe as md
    
    from maxframe import config
    config.options.sql.settings = {
        "odps.session.image": "common",
        "odps.sql.type.system.odps2": "true"
    }
    session = new_session(o)
    
    df = md.DataFrame({'A': range(3), 'B': range(1, 4)})
    print(df.transform(lambda x: x + 1).execute().fetch())
    
    # 返回结果
       A  B
    0  1  2
    1  2  3
    2  3  4
  • 示例2:使用apply方法调用自定义函数

    如果您希望UDF执行前后数据的列数发生变化,可以使用apply方法来调用自定义函数。

    import maxframe.dataframe as md
    import numpy as np
    
    from maxframe import config
    config.options.sql.settings = {
        "odps.session.image": "common",
        "odps.sql.type.system.odps2": "true"
    }
    session = new_session(o)
    
    def simple(row):
        row['is_man'] = row['sex'] == "man"
        return row
    
    df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users')
    new_dtypes = df.dtypes.copy()
    new_dtypes["is_man"] = np.dtype(np.bool_)
    df.apply(
                simple,
                axis=1,
                result_type="expand",
                output_type="dataframe",
                dtypes=new_dtypes
            ).execute().fetch()
    
    # 返回结果
         user_id  age sex     occupation zip_code  is_man
    0          1   24   M     technician    85711   False
    1          2   53   F          other    94043   False
    2          3   23   M         writer    32067   False
    3          4   24   M     technician    43537   False
    4          5   33   F          other    15213   False
    ..       ...  ...  ..            ...      ...     ...
    938      939   26   F        student    33319   False
    939      940   32   M  administrator    02215   False
    940      941   20   M        student    97229   False
    941      942   48   F      librarian    78209   False
    942      943   22   M        student    77841   False
    
    [943 rows x 6 columns]

对于聚合/UDF操作中,如何选择聚合函数等更多使用细节,请参见Function application / GroupBy / window

结果存储

数据集被转换后,可以使用to_odps_table将结果保存至MaxCompute表中。

  • 示例1:将处理后的数据写入MaxCompute表。

    # 将处理后的数据写入MaxCompute表
    filtered_df.to_odps_table('<table_name>')
  • 示例2:指定存储的周期。

    通过指定lifecycle参数,指定结果数据表存活的生命周期。

    # 将处理后的数据写入MaxCompute表
    filtered_df.to_odps_table('<table_name>', lifecycle = 7)

参数说明:

table_name:待写入数据的目标MaxCompute表名。

更多关于存储操作的细节,请参见to_odps_table

执行任务并查看执行结果

您可以通过execute()方法触发数据处理任务,并使用fetch()方法获取部分执行结果数据。

示例:获取并展示执行结果。

通过追加execute()fetch()方法,完成数据处理流程并查看结果。相比于Pandas,MaxFrame允许有效地处理大规模数据集,并通过延迟计算模式来减少不必要的数据传输。

# 获取执行结果的部分数据
data = filtered_df.execute().fetch()
print(data)

更多关于任务执行和结果获取的细节,请参见executefetch