本文介绍MaxFrame提供的包括Session、Input/Output、Execute及Fetch几类特有API,用于在MaxFrame任务中便捷的进行数据处理。
Session
new_session
接口名称:new_session,源码请参见new_session。
new_session( session_id: str = None, default: bool = True, new: bool = True, odps_entry: Optional[ODPS] = None )
功能描述:启动一个MaxFrame任务会话。
入参
参数名称
参数类型
是否必填
描述
session_id
String
否
会话标识符。
用于为新创建的会话指定一个唯一的标识符。若不指定,则MaxFrame会自动生成一个默认的标识符。
default
Boolean
否
是否将创建的Session作为默认Session。
默认值为True。
new
Boolean
否
是否创建新会话。
默认值为True,若设置为False,则根据session_id重用现有会话。
odps_entry
ODPS
是
ODPS入口对象。详情请参见初始化ODPS入口。
返回值
Session对象。
示例
from maxframe import new_session from odps import ODPS # 使用MaxFrame相关账号初始化ODPS o = ODPS( # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。 os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) # 初始化MaxFrame会话 session = new_session(odps_entry=o)
Input/Output
read_odps_table
接口名称:read_odps_table,源码请参见read_odps_table。
read_odps_table( table_name: Union[str, Table], partitions: Union[None, str, List[str]] = None, columns: Optional[List[str]] = None, index_col: Union[None, str, List[str]] = None, odps_entry: ODPS = None, string_as_binary: bool = None, append_partitions: bool = False )
功能描述:从MaxCompute表中读取数据并构建DataFrame。支持指定部分列作为索引,如果不指定,则会生成RangeIndex。
入参
参数名称
参数类型
是否必填
描述
table_name
String/Table
是
待读取的MaxCompute表名或Table对象。
partitions
String/List
否
待读取的表分区或分区列表。
格式为:
<partition_name>=<partition_value>
。若不指定,则读取表中的所有分区数据。columns
List
否
待读取的表列名List。
格式为:
<column1>, <column2>, ...
。若不指定,则读取表中的所有列(不包含分区列)数据。index_col
String/List
否
指定作为索引的列名或列名List。
odps_entry
ODPS
否
ODPS入口对象。详情请参见初始化ODPS入口。
string_as_binary
Boolean
否
是否以二进制形式读取字符串数据。
append_partitions
Boolean
否
是否读取分区列的数据。
默认值为False。若设置为True,在未指定
columns
参数时,将读取所有列(包含分区列)的数据。返回值
DataFrame对象。
示例
import maxframe.dataframe as md df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col='user_id', columns=['age', 'sex']) print(df.execute().fetch()) # 返回值 user_id age sex 1 24 M 2 53 F 3 23 M 4 24 M 5 33 F ... ... ... 939 26 F 940 32 M 941 20 M 942 48 F 943 22 M
read_odps_query
接口名称:read_odps_query,源码请参见read_odps_query。
read_odps_query( query: str, odps_entry: ODPS = None, index_col: Union[None, str, List[str]] = None, string_as_binary: bool = None )
功能描述:从MaxCompute查询中读取数据并构建DataFrame。支持指定部分列作为索引,如果不指定,则会生成RangeIndex。
入参
参数名称
参数类型
是否必填
描述
query
String
是
待读取的MaxCompute SQL语句。
odps_entry
ODPS
否
ODPS入口对象。详情请参见初始化ODPS入口。
index_col
String/List
否
指定作为索引的列名或列名List。
string_as_binary
Boolean
否
是否以二进制形式读取字符串数据。
返回值
DataFrame对象。
示例
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`')
to_odps_table
接口名称:to_odps_table,源码请参见to_odps_table。
to_odps_table( table: Union[Table, str], partition: Optional[str] = None, partition_col: Union[None, str, List[str]] = None, overwrite: bool = False, unknown_as_string: Optional[bool] = None, index: bool = True, index_label: Union[None, str, List[str]] = None, lifecycle: Optional[int] = None )
功能描述:将DataFrame对象写入MaxCompute表。若MaxCompute中表不存在,则会自动创建。
入参
参数名称
参数类型
是否必填
描述
table
String/Table
是
待写入DataFrame的表名称或Table对象。
partition
String
否
待写入的分区。
例如
pt1=xxx, pt2=yyy
。partition_col
String/List
否
指定DataFrame中作为分区列的列名或列名List。
overwrite
Boolean
否
当表或分区已经存在时,是否覆盖数据。
默认值为False。
unknown_as_string
Boolean
否
对于无法识别的类型,是否作为String类型进行处理。
默认值为False。如果为True,DataFrame中的对象类型会被视为字符串。可能引发错误。
index
Boolean
否
是否存储索引。
默认值为True。
index_label
String/List
否
为索引指定的列名称。
索引列名称由index_label参数指定。如果不指定,则使用系统默认名称(index)。仅具有一级索引时,名称将默认为index,对于多级索引,名称将为level_x,其中x是索引的级别。
lifecycle
int
否
指定输出表的生命周期。
取值为正整数,若表已经存在,则该设置会覆盖原参数。
返回值
DataFrame对象。
示例
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id')) ouput_df = df.to_odps_table('output_table', lifecycle = 7)
to_odps_model
接口名称:to_odps_model。
to_odps_model( model_name: str, model_version: str = None, schema: str = None, project: str = None, description: Optional[str] = None, version_description: Optional[str] = None, create_model: bool = True, set_default_version: bool = False )
功能描述:将MaxFrame作业训练生成的Xgboost模型保存成MaxCompute的模型对象。
入参
参数名称
参数类型
是否必填
描述
model_name
String
是
模型名称。
当
project
和schema
已在作业中单独指定,则仅需指定模型名;否则需要按照project.schema.model
格式指定模型名称。
model_version
String
否
模型版本。
若未指定,则由系统自动生成。
schema
String
否
模型归属的schema。
若未指定,则默认schema为“default”。
project
String
否
模型归属的项目。
description
String
否
模型描述。
version_description
String
否
模型版本描述。
create_model
Boolean
否
是否在模型不存在时自动创建。
默认值:True。
set_default_version
Boolean
否
是否将当前版本设为模型的默认版本。
默认值:False。
返回值
Scalar 对象,调用
.execute()
后触发模型保存操作。示例
# 训练 XGBoost 模型 from maxframe.learn.contrib.xgboost import XGBClassifier X_df = md.DataFrame(X, columns=cols) clf = XGBClassifier(n_estimators=10) clf.fit(X_df, y) # 保存模型至 MaxCompute clf.to_odps_model( model_name="my_model", # 指定项目和schema,则model_name的示例格式如下 # model_name="project.schema.my_model" model_version="version1" ).execute()
Execute
execute
接口名称:execute,源码请参见execute
execute( session: SessionType = None )
功能描述:执行execute方法,以启动数据处理任务。
入参
参数名称
参数类型
是否必填
描述
session
Session
否
用于执行数据处理任务的会话。创建方式请参见new_session。
若不指定,则使用通过new_session初始化的全局会话。
返回值
无。
示例
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col='user_id')) df.execute()
Fetch
fetch
接口名称:fetch,源码请参见fetch。
fetch( session: SessionType = None )
功能描述:获取结果数据到本地环境。
入参
参数名称
参数类型
是否必填
描述
session
Session
否
用于获取结果数据的会话。创建方式请参见new_session。
若不指定,则使用通过new_session初始化的全局会话。
返回值
Pandas的DataFrame或Series。
示例
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id') res = df.execute().fetch() print(res) # 返回结果 user_id age sex 1 24 M 2 53 F 3 23 M 4 24 M 5 33 F ... ... .. 939 26 F 940 32 M 941 20 M 942 48 F 943 22 M [943 rows x 2 columns]