MaxFrame为您提供兼容Pandas的API接口,用于数据处理。其中包括筛选、投影、拼接和聚合等基本API,及用于调用自定义函数的高级API(如transform、apply),高级API可以实现特定业务逻辑和数据操作,从而解决标准算子可能无法覆盖复杂场景的问题。同时MaxFrame结合大数据的处理需求,引入了特有的API接口,如读写MaxCompute表格数据(read_odps_table、to_odps_table)、执行延迟计算(execute)等,让您可以更高效地在大数据环境下进行数据分析,不受本地计算资源的限制。
数据准备
本文以MaxCompute公共数据集中的maxframe_ml_100k_users
表为例,为您介绍MaxFrame的使用。测试数据已存放在MaxCompute公开项目BIGDATA_PUBLIC_DATASET下的data_science
Schema中,您可直接使用。
初始化会话
执行MaxFrame作业前,首先需要初始化MaxFrame会话(Session)。在代码的入口处,通过调用new_session
接口初始化整个作业。后续的数据处理将借助所构建的会话对象与后端服务进行交互,以执行各种数据操作。示例如下:
import os
from maxframe import new_session
from odps import ODPS
# 使用MaxFrame相关账号初始化ODPS
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
# 初始化MaxFrame会话
new_session(odps_entry=o)
更多关于new_session
接口的使用细节,请参见new_session。
构建DataFrame对象
read_odps_table
和read_odps_query
接口允许您基于MaxCompute表创建DataFrame对象,且这些DataFrame对象支持Pandas式的数据操作。MaxFrame还支持使用本地数据初始化DataFrame对象,便于您快速进行测试和开发。
使用MaxCompute表
您可通过
read_odps_table
接口执行以下操作:通过
read_odps_table
接口读取MaxCompute表数据,并将其转换为MaxFrame DataFrame对象。import maxframe.dataframe as md # 从MaxCompute表读取数据,创建DataFrame df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users')
通过指定index_col参数,选择表中的某列作为DataFrame的索引。
import maxframe.dataframe as md # 使用数据库的id列作为DataFrame的index df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col="user_id")
通过指定columns参数,选择表中的部分列构建DataFrame对象。
import maxframe.dataframe as md # 择表中的部分列构建DataFrame对象 df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', columns=['user_id', 'age', 'sex'])
更多关于
read_odps_table
API的使用细节,请参见read_odps_table。
使用MaxCompute SQL查询结果
除了直接使用表构建DataFrame,MaxFrame还支持通过
read_odps_query
接口执行SQL查询,并以查询结果作为DataFrame的数据输入。import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`')
使用index_col参数指定DataFrame的索引:
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id')
更多关于
read_odps_query
API的使用细节,请参见read_odps_query。使用本地定义的数据
MaxFrame支持类似于Pandas的使用体验,可以基于本地数据直接构建MaxFrame DataFrame对象。示例如下:
import maxframe.dataframe as md d = {'col1': [1, 2], 'col2': [3, 4]} df = md.DataFrame(data=d) print(df.execute().fetch()) # 返回结果 col1 col2 0 1 3 1 2 4 df = md.DataFrame(data=d, dtype=np.int8) print(df.execute().fetch()) # 返回结果 col1 col2 0 1 3 1 2 4 d = {'col1': [0, 1, 2, 3], 'col2': pd.Series([2, 3], index=[2, 3])} df = md.DataFrame(data=d, index=[0, 1, 2, 3]) print(df.execute().fetch()) # 返回结果 col1 col2 0 0 NaN 1 1 NaN 2 2 2.0 3 3 3.0 data = np.array([(1, 2, 3), (4, 5, 6), (7, 8, 9)], dtype=[("a", "i4"), ("b", "i4"), ("c", "i4")]) df = md.DataFrame(data, columns=['c', 'a']) df.execute().fetch() # 返回结果 c a 0 3 1 1 6 4 2 9 7
更多使用细节,请参见DataFrame。
数据处理
MaxFrame提供了一系列兼容Pandas的API接口,覆盖了数据计算、投影、过滤及排序等多种操作,且支持丰富的算子用于常规数据处理。除此之外,MaxFrame还支持UDF,以满足用户使用自定义函数进行数据加工和分析的需求。
数学计算
各种数学计算操作能够直接在DataFrame上进行,包括加法、减法、乘法及除法等。下述示例为您展示如何使用MaxFrame进行基本的算术操作。
示例1:简单的数据加法。
import maxframe.dataframe as md df = md.DataFrame({'angles': [0, 3, 4], 'degrees': [360, 180, 360]}, index=['circle', 'triangle', 'rectangle']) print(df.execute().fetch()) # 返回结果 angles degrees circle 0 360 triangle 3 180 rectangle 4 360 df = df + 1 print(df.execute().fetch()) # 返回结果 angles degrees circle 1 361 triangle 4 181 rectangle 5 361
示例2:DataFrame之间的乘法。
import maxframe.dataframe as md df = md.DataFrame({'angles': [0, 3, 4], 'degrees': [360, 180, 360]}, index=['circle', 'triangle', 'rectangle']) other = md.DataFrame({'angles': [0, 3, 4]}, index=['circle', 'triangle', 'rectangle']) print(df.mul(other, fill_value=0).execute()) # 返回结果 angles degrees circle 0 0.0 triangle 9 0.0 rectangle 16 0.0
更多关于数学运算接口的信息,请参见Binary operator functions和Computations / descriptive stats。
过滤/投影/抽样
过滤
过滤操作允许您根据特定的条件从DataFrame中选择或排除数据。该操作对于处理和分析大型数据集至关重要,允许您集中关注其中最具相关性的信息。
示例1:显示前几行数据。
import maxframe.dataframe as md df = md.DataFrame({'animal': ['alligator', 'bee', 'falcon', 'lion', 'monkey', 'parrot', 'shark', 'whale', 'zebra']}) print(df.head().execute().fetch()) # 返回结果 animal 0 alligator 1 bee 2 falcon 3 lion 4 monkey
示例2:删除指定的列。
import maxframe.dataframe as md df = md.DataFrame(np.arange(12).reshape(3, 4), columns=['A', 'B', 'C', 'D']) print(df.drop(['B', 'C'], axis=1).execute().fetch()) # 返回结果 A D 0 0 3 1 4 7 2 8 11
投影
您可以通过投影重塑DataFrame的结构,包括要展示的列或者重新安排列的顺序。通过投影操作,可以创建数据的简化视图或者调整数据的展示方式,以满足特定的分析目的。
示例:修改列名
import maxframe.dataframe as md
df = md.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
print(df.set_axis(['I', 'II'], axis='columns').execute().fetch())
# 返回结果
I II
0 1 4
1 2 5
2 3 6
抽样
抽样是一种选择DataFrame中随机样本的过程,对于处理大型数据集以及估计数据的统计属性尤为重要。
示例:随机抽样
import maxframe.dataframe as md
df = md.DataFrame({'num_legs': [2, 4, 8, 0],
'num_wings': [2, 0, 0, 0],
'num_specimen_seen': [10, 2, 1, 8]},
index=['falcon', 'dog', 'spider', 'fish'])
print(df['num_legs'].sample(n=3, random_state=1).execute())
# 返回结果
falcon 2
fish 0
dog 4
Name: num_legs, dtype: int64
更多关于过滤、投影和抽样的信息请参见Reindexing / selection / label manipulation。
排序
排序允许您根据一列或多列的值重新排列DataFrame中行的顺序,常用于数据分析。通过排序,可以更容易地观察到数据的模式、趋势和异常点。
示例:按照单列或多列排序
import maxframe.dataframe as md
import numpy as np
df = md.DataFrame({
'col1': ['A', 'A', 'B', np.nan, 'D', 'C'],
'col2': [2, 1, 9, 8, 7, 4],
'col3': [0, 1, 9, 4, 2, 3],
})
res = df.sort_values(by=['col1']).execute()
print(res.fetch())
# 返回结果
col1 col2 col3
0 A 2 0
1 A 1 1
2 B 9 9
5 C 4 3
4 D 7 2
3 None 8 4
res = df.sort_values(by=['col1', 'col2']).execute()
print(res.fetch())
# 返回结果
col1 col2 col3
1 A 1 1
0 A 2 0
2 B 9 9
5 C 4 3
4 D 7 2
3 None 8 4
更多关于排序操作的细节,请参见Reshaping / sorting / transposing。
拼接
拼接操作是数据处理中非常基本且强大的工具,允许您将不同的数据集基于某些公共字段或索引,水平或垂直地结合起来。MaxFrame提供了拼接接口,帮助您轻松完成数据集合拼接操作。
示例:水平拼接
import maxframe.dataframe as md
df = md.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
other = md.DataFrame({'key': ['K0', 'K1', 'K2'],
'B': ['B0', 'B1', 'B2']})
print(df.join(other, lsuffix='_caller', rsuffix='_other').execute().fetch())
# 返回结果
key_caller A key_other B
0 K0 A0 K0 B0
1 K1 A1 K1 B1
2 K2 A2 K2 B2
3 K3 A3 None None
4 K4 A4 None None
5 K5 A5 None None
更多关于拼接操作的使用细节和方法,请参见Combining / joining / merging。
聚合/UDF
聚合
聚合操作是将一组值转换为一个单一值的过程。在数据分析中,聚合是基本的数据归纳工具,用于总结、发现数据的统计特征。
示例:多类型聚合
import maxframe.dataframe as md
df = md.DataFrame([[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
[np.nan, np.nan, np.nan]],
columns=['A', 'B', 'C'])
print(df.agg(['sum', 'min']).execute().fetch())
# 返回结果
A B C
min 1.0 2.0 3.0
sum 12.0 15.0 18.0
df.agg({'A' : ['sum', 'min'], 'B' : ['min', 'max']}).execute().fetch()
# 返回结果
A B
max NaN 8.0
min 1.0 2.0
sum 12.0 NaN
UDF
除了标准的数据处理算子外,MaxFrame还支持UDF,以支持在DataFrame对象上执行用户自定义的函数处理。这增加了数据处理的灵活性,允许使用自定义的逻辑对数据集进行更丰富的操作。
执行UDF前,需要在new_session之前通过config.options.sql.settings
参数声明使用common镜像。
示例1:使用transform方法调用自定义函数
import maxframe.dataframe as md from maxframe import config config.options.sql.settings = { "odps.session.image": "common", "odps.sql.type.system.odps2": "true" } session = new_session(o) df = md.DataFrame({'A': range(3), 'B': range(1, 4)}) print(df.transform(lambda x: x + 1).execute().fetch()) # 返回结果 A B 0 1 2 1 2 3 2 3 4
示例2:使用apply方法调用自定义函数
如果您希望UDF执行前后数据的列数发生变化,可以使用apply方法来调用自定义函数。
import maxframe.dataframe as md import numpy as np from maxframe import config config.options.sql.settings = { "odps.session.image": "common", "odps.sql.type.system.odps2": "true" } session = new_session(o) def simple(row): row['is_man'] = row['sex'] == "man" return row df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users') new_dtypes = df.dtypes.copy() new_dtypes["is_man"] = np.dtype(np.bool_) df.apply( simple, axis=1, result_type="expand", output_type="dataframe", dtypes=new_dtypes ).execute().fetch() # 返回结果 user_id age sex occupation zip_code is_man 0 1 24 M technician 85711 False 1 2 53 F other 94043 False 2 3 23 M writer 32067 False 3 4 24 M technician 43537 False 4 5 33 F other 15213 False .. ... ... .. ... ... ... 938 939 26 F student 33319 False 939 940 32 M administrator 02215 False 940 941 20 M student 97229 False 941 942 48 F librarian 78209 False 942 943 22 M student 77841 False [943 rows x 6 columns]
对于聚合/UDF操作中,如何选择聚合函数等更多使用细节,请参见Function application / GroupBy / window。
结果存储
数据集被转换后,可以使用to_odps_table
将结果保存至MaxCompute表中。
示例1:将处理后的数据写入MaxCompute表。
# 将处理后的数据写入MaxCompute表 filtered_df.to_odps_table('<table_name>')
示例2:指定存储的周期。
通过指定lifecycle参数,指定结果数据表存活的生命周期。
# 将处理后的数据写入MaxCompute表 filtered_df.to_odps_table('<table_name>', lifecycle = 7)
参数说明:
table_name:待写入数据的目标MaxCompute表名。
更多关于存储操作的细节,请参见to_odps_table。
执行任务并查看执行结果
您可以通过execute()
方法触发数据处理任务,并使用fetch()
方法获取部分执行结果数据。
示例:获取并展示执行结果。
通过追加execute()
和fetch()
方法,完成数据处理流程并查看结果。相比于Pandas,MaxFrame允许有效地处理大规模数据集,并通过延迟计算模式来减少不必要的数据传输。
# 获取执行结果的部分数据
data = filtered_df.execute().fetch()
print(data)