MaxFrame可以在分布式环境下使用与Pandas相同的API来分析数据,通过MaxFrame,您能够以高于开源Pandas数十倍的性能在MaxCompute上快速完成数据分析和计算工作。本文为您介绍如何通过MaxFrame使用常用的Pandas算子。
前提条件
已安装MaxFrame,详情请参见准备工作。
数据准备
在安装了MaxFrame的Python环境下运行如下脚本,准备测试表和测试数据。
from odps import ODPS from maxframe.session import new_session import maxframe.dataframe as md import pandas as pd import os o = ODPS( # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。 os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) data_sets = [{ "table_name": "product", "table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint", "source_type": "records", "records" : [ [1, 100, 'Nokia', 1000], [2, 200, 'Apple', 5000], [3, 300, 'Samsung', 9000] ], }, { "table_name" : "sales", "table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint", "source_type": "records", "records" : [ [1, 1, 100, 101, 2008, 10, 5000], [2, 2, 300, 101, 2009, 7, 4000], [3, 4, 100, 102, 2011, 9, 4000], [4, 5, 200, 102, 2013, 6, 6000], [5, 8, 300, 102, 2015, 10, 9000], [6, 9, 100, 102, 2015, 6, 2000] ], "lifecycle": 5 }] def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False): for index, data in enumerate(data_sets): table_name = data.get("table_name") table_schema = data.get("table_schema") source_type = data.get("source_type") if not table_name or not table_schema or not source_type: raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.") lifecycle = data.get("lifecycle", 5) table_name += suffix print(f"Processing {table_name}...") if drop_if_exists: print(f"Deleting {table_name}...") o.delete_table(table_name, if_exists=True) o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True) if source_type == "local_file": file_path = data.get("file") if not file_path: raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.") sep = data.get("sep", ",") pd_df = pd.read_csv(file_path, sep=sep) ODPSDataFrame(pd_df).persist(table_name, drop_table=True) elif source_type == 'records': records = data.get("records") if not records: raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.") with o.get_table(table_name).open_writer() as writer: writer.write(records) else: raise ValueError(f"Unknown data set source_type: {source_type}") print(f"Processed {table_name} Done") prepare_data(o, data_sets, "_maxframe_demo", True)
参数说明:
ALIBABA_CLOUD_ACCESS_KEY_ID:需将该环境变量设置为具备目标MaxCompute项目中待操作对象相关MaxCompute权限的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。
ALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为AccessKey ID对应的AccessKey Secret。
your-default-project:使用的MaxCompute项目名称。您可以登录MaxCompute控制台,在左侧导航栏选择工作区>项目管理,查看MaxCompute项目名称。
your-end-point:目标MaxCompute项目所在地域的Endpoint,可根据网络连接方式自行选择,例如
http://service.cn-chengdu.maxcompute.aliyun.com/api
。详情请参见Endpoint。
查询sales_maxframe_demo表和product_maxframe_demo表的数据,SQL命令如下。
--查询sales_maxframe_demo表 SELECT * FROM sales_maxframe_demo; --返回 +------------+------------+------------+------------+------------+------------+------------+ | index | sale_id | product_id | user_id | year | quantity | price | +------------+------------+------------+------------+------------+------------+------------+ | 1 | 1 | 100 | 101 | 2008 | 10 | 5000 | | 2 | 2 | 300 | 101 | 2009 | 7 | 4000 | | 3 | 4 | 100 | 102 | 2011 | 9 | 4000 | | 4 | 5 | 200 | 102 | 2013 | 6 | 6000 | | 5 | 8 | 300 | 102 | 2015 | 10 | 9000 | | 6 | 9 | 100 | 102 | 2015 | 6 | 2000 | +------------+------------+------------+------------+------------+------------+------------+ --查询product_maxframe_demo表数据 SELECT * FROM product_maxframe_demo; --返回 +------------+------------+--------------+---------------+ | index | product_id | product_name | current_price | +------------+------------+--------------+---------------+ | 1 | 100 | Nokia | 1000 | | 2 | 200 | Apple | 5000 | | 3 | 300 | Samsung | 9000 | +------------+------------+--------------+---------------+
使用MaxFrame进行数据分析
场景1:使用merge方法连接两张数据表,以获取sales_maxframe_demo表中所有sale_id
对应的product_name
以及该产品的所有year
和price
示例代码
from odps import ODPS from maxframe.session import new_session import maxframe.dataframe as md import os o = ODPS( # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用 Access Key ID / Access Key Secret 字符串 os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) session = new_session(o) #session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。 print(session.session_id) sales = md.read_odps_table("sales_maxframe_demo", index_col="index") product = md.read_odps_table("product_maxframe_demo", index_col="product_id") #这里的df并不会立即执行,除非您使用df.execute()来触发。 #这意味着所有的计算都将最终完全在MaxCompute集群完成,避免了中间所不必要的数据传输和阻塞。 df = sales.merge(product, left_on="product_id", right_index=True) df = df[["product_name", "year", "price"]] print(df.execute().fetch()) #保存结果到MaxCompute表中,并销毁Session md.to_odps_table(df, "result_df", overwrite=True).execute() session.destroy()
返回结果:
index product_name year price 1 Nokia 2008 5000 2 Samsung 2009 4000 3 Nokia 2011 4000 4 Apple 2013 6000 5 Samsung 2015 9000 6 Nokia 2015 2000
性能对比
在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算,可以得到如下耗时对比结果:
环境
耗时(单位:秒)
本地Pandas(版本为1.3.5)
65.8
MaxFrame
22
场景2:选出每个出售过的产品第一年销售的产品ID、年份、数量和价格
示例代码:
from odps import ODPS from maxframe.session import new_session import maxframe.dataframe as md import os o = ODPS( # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用 Access Key ID / Access Key Secret 字符串 os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) session = new_session(o) #session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。 print(session.session_id) # 聚合获取每个产品的第一个年份 min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index") min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min')) # join 找到对应的销售记录 sales = md.read_odps_table("sales_maxframe_demo", index_col=['product_id', 'year']) result_df = md.merge(sales, min_year_df, left_index=True, right_on=['product_id','first_year'], how='inner') #这里的result_df并不会立即执行,除非您使用 result_df.execute()来触发。 #这意味着所有的计算都将最终完全在MaxCompute中集群完成,避免了中间所不必要的数据传输和阻塞。 result_df = result_df[['product_id', 'first_year', 'quantity', 'price']] print(result_df.execute().fetch()) #销毁 Session session.destroy()
返回结果:
product_id first_year quantity price 100 100 2008 10 5000 300 300 2009 7 4000 200 200 2013 6 6000
性能对比:
在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算,可以得到如下耗时对比结果:
环境
耗时(单位:秒)
本地Pandas(版本为1.3.5)
186
MaxFrame
21
场景3:为每个用户获取其消费最多的产品ID
该场景将演示多次groupby、join、drop_duplicates和sort_values操作。
示例代码
from odps import ODPS from maxframe.session import new_session import maxframe.dataframe as md import os o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) session = new_session(o) #session id 是一串用于关联 MaxFrame task 的字符,对于调试和追踪任务状态有重要的作用。 print(session.session_id) sales = md.read_odps_table("sales_maxframe_demo", index_col="index") product = md.read_odps_table("product_maxframe_demo", index_col="product_id") sales['total'] = sales['price'] * sales['quantity'] product_cost_df = sales.groupby(['product_id', 'user_id'], as_index=False).agg(user_product_total=('total','sum')) product_cost_df = product_cost_df.merge(product, left_on="product_id", right_index=True, how='right') user_cost_df = product_cost_df.groupby('user_id').agg(max_total=('user_product_total', 'max')) merge_df = product_cost_df.merge(user_cost_df, left_on='user_id', right_index=True) #这里的 result_df 并不会立即执行,除非您使用 result_df.execute()来触发。 #这意味着所有的计算都将最终完全在 MaxCompute 中集群完成,避免了中间所不必要的数据传输和阻塞。 result_df = merge_df[merge_df['user_product_total'] == merge_df['max_total']][['user_id', 'product_id']].drop_duplicates().sort_values(['user_id'], ascending = [1]) print(result_df.execute().fetch()) #销毁 Session session.destroy()
返回结果:
user_id product_id 100 101 100 300 102 300
性能对比
在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行计算,可以得到如下耗时对比结果:
环境
耗时(单位:秒)
本地Pandas(版本为1.3.5)
176
MaxFrame
85
结论
MaxFrame兼容Pandas接口且自动进行分布式处理,在保证强大数据处理能力的同时,可以大幅度提高数据处理规模及计算效率。