本文以具体数据及开发场景为例,为您介绍在DataWorks数据开发的PyODPS 3节点中,如何创建和操作MaxFrame中的DataFrame对象,以及使用DataFrame完成基本的数据处理,帮助您在十分钟内快速使用MaxFrame进行开发。
数据准备
本文以movielens 100K进行举例,测试数据已存放在MaxCompute公开项目BIGDATA_PUBLIC_DATASET下的data_science
Schema中,您可直接进行使用,表信息如下:
maxframe_ml_100k_movie(电影相关数据)
maxframe_ml_100k_users(用户相关数据)
maxframe_ml_100k_ratings(评分相关数据)
操作步骤
进入DataWorks数据开发模块,创建PyODPS 3节点。详情请参见开发PyODPS 3任务。
在PyODPS 3节点创建MaxFrame会话对象,代码如下。
import os from odps import ODPS from maxframe import new_session # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) session = new_session(o)
参数说明:
ALIBABA_CLOUD_ACCESS_KEY_ID:需将该环境变量设置为具备目标MaxCompute项目中待操作对象相关MaxCompute权限的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。
ALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为AccessKey ID对应的AccessKey Secret。
your-default-project:使用的MaxCompute项目名称。您可以登录MaxCompute控制台,在左侧导航栏选择工作区>项目管理,查看MaxCompute项目名称。
your-end-point:目标MaxCompute项目所在地域的Endpoint。详情请参见Endpoint。
使用read_odps_table方法创建一个DataFrame对象。
import maxframe.dataframe as md users = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users',index_col="zip_code")
您可以通过
dtypes
属性查看这个DataFrame的字段名及字段类型。print(users.dtypes)
返回结果:
user_id int32 age int32 sex object occupation object dtype: object
只需查看对象的标识就会自动显示DataFrame的第一行和最后一行。
print(users.execute().fetch())
返回结果:
zip_code user_id age sex occupation 85711 1 24 M technician 94043 2 53 F other 32067 3 23 M writer 43537 4 24 M technician 15213 5 33 F other ... ... ... .. ... 33319 939 26 F student 02215 940 32 M administrator 97229 941 20 M student 78209 942 48 F librarian 77841 943 22 M student
您可以使用
head
方法获取前N条数据,方便快捷地进行数据预览。print(users.head(10).execute().fetch())
返回结果:
zip_code user_id age sex occupation 85711 1 24 M technician 94043 2 53 F other 32067 3 23 M writer 43537 4 24 M technician 15213 5 33 F other 98101 6 42 M executive 91344 7 57 M administrator 05201 8 36 M administrator 01002 9 29 M student 90703 10 53 M lawyer
如果您不需要查看所有字段,则可以进行如下操作:
从中筛选出一部分字段。
print(users[['user_id', 'age']].head(5).execute().fetch())
返回结果:
zip_code user_id age 85711 1 24 94043 2 53 32067 3 23 43537 4 24 15213 5 33
删除几列。
users = users.drop(columns=['sex', 'age']) print(users.head(5).execute().fetch())
返回结果:
zip_code user_id occupation 85711 1 technician 94043 2 other 32067 3 writer 43537 4 technician 15213 5 other
排除掉一些字段,通过计算得到一些新的列。例如:性别为男,则添加
sex_bool
属性并将其设置为True。否则,将其设置为False。users["sex_bool"] = users.sex == "M" users = users.drop(columns=['sex']) print(users.head(5).execute().fetch())
返回结果:
zip_code user_id age occupation sex_bool 85711 1 24 technician True 85711 1 24 technician True 94043 2 53 other False 94043 2 53 other True 94043 2 53 other True
将用户按职业划分,并按照用户数量从高到低进行排序。
df = users.groupby("occupation").agg({"user_id": "count"}) result_df = df.sort_values("user_id", ascending=False) print(result_df.execute().fetch())
返回结果:
occupation user_id student 196 other 105 educator 95 administrator 79 engineer 67 programmer 66 librarian 51 writer 45 executive 32 scientist 31 artist 28 technician 27 marketing 26 entertainment 18 healthcare 16 retired 14 lawyer 12 salesman 12 none 9 homemaker 7 doctor 7
使用
join
将这三张表联合起来,然后保存为一张新的表maxframe_ml_100k_lens。o.delete_table('maxframe_ml_100k_lens', if_exists=True) lens = lens = movies.join(ratings.add_prefix('mean_')).join(users.add_prefix('meana_')) md.to_odps_table(lens, "maxframe_ml_100k_lens").execute() print(lens.dtypes)
返回结果:
movie_id int32 title object release_date object video_release_date object imdb_url object mean_user_id int32 mean_item_id int32 mean_rating int32 mean_unix_timestamp int32 meana_user_id float64 meana_age float64 meana_sex object meana_occupation object dtype: object
作业执行
MaxFrame采用延迟执行策略,当您在MaxFrame中编写数据处理命令时,这些命令不会立即被执行,只有在您显式要求结果(例如需要在客户端本地查看数据)时才会发生。触发作业立即执行的条件如下:
通过execute()触发。
DataFrame和Series的所有绘图函数,包括maxframe.dataframe.DataFrame.plot()、maxframe.dataframe()等。
示例:
df.head(10)
不会触发作业立即执行。df.head(10).execute()
会触发作业立即执行。
- 本页导读