本文为您介绍MaxFrame提供的几类特有API,包括Session、Input/Output、Execute及Fetch,用于在MaxFrame任务中进行数据处理。
Session
new_session
接口名称:new_session,源码请参见new_session。
new_session( session_id: str = None, default: bool = True, new: bool = True, odps_entry: Optional[ODPS] = None )
功能描述:启动一个MaxFrame任务会话。
入参
参数名称
参数类型
是否必填
描述
session_id
String
否
会话标识符。
用于为新创建的会话指定一个唯一的标识符。若不指定,则MaxFrame会自动生成一个默认的标识符。
default
Boolean
否
是否将创建的Session作为默认Session。
默认值为True。
new
Boolean
否
是否创建新会话。
默认值为True,若设置为False,则根据session_id重用现有会话。
odps_entry
ODPS
是
ODPS入口对象。详情请参见创建ODPS入口。
返回值
Session对象。
示例
from maxframe import new_session from odps import ODPS # 使用MaxFrame相关账号初始化ODPS o = ODPS( # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。 os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) # 初始化MaxFrame会话 session = new_session(odps_entry=o)
Input/Output
read_odps_table
接口名称:read_odps_table,源码请参见read_odps_table。
read_odps_table( table_name: Union[str, Table], partitions: Union[None, str, List[str]] = None, columns: Optional[List[str]] = None, index_col: Union[None, str, List[str]] = None, odps_entry: ODPS = None, string_as_binary: bool = None, append_partitions: bool = False )
功能描述:从MaxCompute表中读取数据并构建DataFrame。支持指定部分列作为索引,如果不指定,则会生成RangeIndex。
入参
参数名称
参数类型
是否必填
描述
table_name
String/Table
是
待读取的MaxCompute表名或Table对象。
partitions
String/List
否
待读取的表分区或分区列表。
格式为:
<partition_name>=<partition_value>
。若不指定,则读取表中的所有分区数据。columns
List
否
待读取的表列名List。
格式为:
<column1>, <column2>, ...
。若不指定,则读取表中的所有列(不包含分区列)数据。index_col
String/List
否
指定作为索引的列名或列名List。
odps_entry
ODPS
否
ODPS入口对象。详情请参见创建ODPS入口。
string_as_binary
Boolean
否
是否以二进制形式读取字符串数据。
append_partitions
Boolean
否
是否读取分区列的数据。
默认值为False。若设置为True,在未指定
columns
参数时,将读取所有列(包含分区列)的数据。返回值
DataFrame对象。
示例
import maxframe.dataframe as md df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col='user_id', columns=['age', 'sex']) print(df.execute().fetch()) # 返回值 user_id age sex 1 24 M 2 53 F 3 23 M 4 24 M 5 33 F ... ... ... 939 26 F 940 32 M 941 20 M 942 48 F 943 22 M
read_odps_query
接口名称:read_odps_query,源码请参见read_odps_query。
read_odps_query( query: str, odps_entry: ODPS = None, index_col: Union[None, str, List[str]] = None, string_as_binary: bool = None )
功能描述:从MaxCompute查询中读取数据并构建DataFrame。支持指定部分列作为索引,如果不指定,则会生成RangeIndex。
入参
参数名称
参数类型
是否必填
描述
query
String
是
待读取的MaxCompute SQL语句。
odps_entry
ODPS
否
ODPS入口对象。详情请参见创建ODPS入口。
index_col
String/List
否
指定作为索引的列名或列名List。
string_as_binary
Boolean
否
是否以二进制形式读取字符串数据。
返回值
DataFrame对象。
示例
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`')
to_odps_table
接口名称:to_odps_table,源码请参见to_odps_table。
to_odps_table( table: Union[Table, str], partition: Optional[str] = None, partition_col: Union[None, str, List[str]] = None, overwrite: bool = False, unknown_as_string: Optional[bool] = None, index: bool = True, index_label: Union[None, str, List[str]] = None, lifecycle: Optional[int] = None )
功能描述:将DataFrame对象写入MaxCompute表。若MaxCompute中表不存在,则会自动创建。
入参
参数名称
参数类型
是否必填
描述
table
String/Table
是
待写入DataFrame的表名称或Table对象。
partition
String
否
待写入的分区。
例如
pt1=xxx, pt2=yyy
。partition_col
String/List
否
指定DataFrame中作为分区列的列名或列名List。
overwrite
Boolean
否
当表或分区已经存在时,是否覆盖数据。
默认值为False。
unknown_as_string
Boolean
否
对于无法识别的类型,是否作为String类型进行处理。
默认值为False。如果为True,DataFrame中的对象类型会被视为字符串。可能引发错误。
index
Boolean
否
是否存储索引。
默认值为True。
index_label
String/List
否
为索引指定的列名称。
索引列名称由index_label参数指定。如果不指定,则使用系统默认名称(index)。仅具有一级索引时,名称将默认为index,对于多级索引,名称将为level_x,其中x是索引的级别。
lifecycle
int
否
指定输出表的生命周期。
取值为正整数,若表已经存在,则该设置会覆盖原参数。
返回值
DataFrame对象。
示例
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id')) ouput_df = df.to_odps_table('output_table', lifecycle = 7)
Execute
execute
接口名称:execute,源码请参见execute
execute( session: SessionType = None )
功能描述:执行execute方法,以启动数据处理任务。
入参
参数名称
参数类型
是否必填
描述
session
Session
否
用于执行数据处理任务的会话。创建方式请参见new_session。
若不指定,则使用通过new_session初始化的全局会话。
返回值
无。
示例
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col='user_id')) df.execute()
Fetch
fetch
接口名称:fetch,源码请参见fetch。
fetch( session: SessionType = None )
功能描述:获取结果数据到本地环境。
入参
参数名称
参数类型
是否必填
描述
session
Session
否
用于获取结果数据的会话。创建方式请参见new_session。
若不指定,则使用通过new_session初始化的全局会话。
返回值
Pandas的DataFrame或Series。
示例
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id') res = df.execute().fetch() print(res) # 返回结果 user_id age sex 1 24 M 2 53 F 3 23 M 4 24 M 5 33 F ... ... .. 939 26 F 940 32 M 941 20 M 942 48 F 943 22 M [943 rows x 2 columns]