MaxCompute提供用户自定义函数(UDF)及Python(PyODPS和MaxFrame)开发能力,本文为您介绍如何在MaxCompute UDF、PyODPS及MaxFrame作业开发中使用镜像。
在SQL UDF开发中使用镜像
以下以使用Pandas实现一个列求和的UDF为例,为您介绍如何在SQL UDF开发中使用镜像。
编写Python UDF脚本,并将其保存为
sum_pandas.py
文件。脚本示例如下:from odps.udf import annotate import pandas as pd @annotate("string, string -> string") class SumColumns(object): def evaluate(self, arg1, arg2): # 将输入参数转换为pandas DataFrame df = pd.DataFrame({'col1': arg1.split(','), 'col2': arg2.split(',')}) # 使用pandas进行数据处理操作 # 这里以计算两列的和为例 df['sum'] = df['col1'].astype(int) + df['col2'].astype(int) # 将处理结果转换为字符串并返回 result = ','.join(df['sum'].astype(str).values) return result
将
sum_pandas.py
脚本以资源形式上传至MaxCompute项目空间,详情请参见添加资源。命令示例如下:ADD PY sum_pandas.py -f;
将已上传的
sum_pandas.py
脚本注册为自定义函数SumColumns,具体操作请参见注册函数。命令示例如下:CREATE FUNCTION SumColumns AS 'sum_pandas.SumColumns' USING 'sum_pandas.py';
准备测试表
testsum
及测试数据。CREATE TABLE testsum (col1 string, col2 string); INSERT INTO testsum VALUES ('1,2,3','1,2,3'),('1,2,3','3,2,1'),('1,2,3','4,5,6');
调用UDF函数时通过Flag指定已存在的镜像。
set odps.sql.python.version=cp37; set odps.session.image = <镜像名称>; SELECT SumColumns(col1,col2) AS result FROM testsum;
返回结果:
+------------+ | result | +------------+ | 2,4,6 | | 4,4,4 | | 5,7,9 | +------------+
在PyODPS开发中使用镜像
以下以实现scipy包中的psi函数为例,为您介绍如何在PyODPS中使用镜像。
准备测试表
test_float_col
及测试数据。CREATE TABLE test_float_col (col1 double); INSERT INTO test_float_col VALUES (3.75),(2.51);
编写PyODPS代码,计算psi(col1)的值,并保存为psi_col.py文件执行。代码示例如下:
import os from odps import ODPS, options def my_psi(v): from scipy.special import psi return float(psi(v)) # 如果 Project 开启了 Isolation,下面的选项不是必需的 options.sql.settings = {"odps.isolation.session.enable": True} o = ODPS( # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。 os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point' ) df = o.get_table("test_float_col").to_df() # 直接执行并取得结果 df.col1.map(my_psi).execute(image='scipy') # 保存到另一张表 df.col1.map(my_psi).persist("result_table", image='scipy')
参数说明:
ALIBABA_CLOUD_ACCESS_KEY_ID:需将该环境变量设置为具备目标MaxCompute项目中待操作对象相关MaxCompute权限的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。
ALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为AccessKey ID对应的AccessKey Secret。
your-default-project:使用的MaxCompute项目名称。您可以登录MaxCompute控制台,在左侧导航栏选择工作区>项目管理,查看MaxCompute项目名称。
your-end-point:目标MaxCompute项目所在地域的Endpoint,可根据网络连接方式自行选择,例如
http://service.cn-chengdu.maxcompute.aliyun.com/api
。详情请参见Endpoint。
查看结果表result_table。
SELECT * FROM result_table
返回结果:
+------------+ | col1 | +------------+ | 1.1825373886117962 | | 0.7080484451910534 | +------------+
在MaxFrame开发中使用镜像
以下以实现scipy包中的psi函数为例,为您介绍如何在MaxFrame作业中使用镜像。
准备测试表
test_float_col
及测试数据。CREATE TABLE test_float_col (col1 double); INSERT INTO test_float_col VALUES (3.75),(2.51);
编写MaxFrame代码,计算psi(col1)的值,并保存为psi_col.py文件执行。代码示例如下:
import os from odps import ODPS, options from maxframe.session import new_session import maxframe.dataframe as md from maxframe.config import options from maxframe import config # 引用内置scipy镜像 config.options.sql.settings = { "odps.session.image": "scipy" } def my_psi(v): from scipy.special import psi return float(psi(v)) o = ODPS( # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。 os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point' ) # 创建MaxFrame Session session = new_session(o) df = md.read_odps_table('test_float_col') # 直接执行并取得结果 print(df.col1.map(my_psi).execute().fetch())
参数说明:
ALIBABA_CLOUD_ACCESS_KEY_ID:需将该环境变量设置为具备目标MaxCompute项目中待操作对象相关MaxCompute权限的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。
ALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为AccessKey ID对应的AccessKey Secret。
your-default-project:使用的MaxCompute项目名称。您可以登录MaxCompute控制台,在左侧导航栏选择工作区>项目管理,查看MaxCompute项目名称。
your-end-point:目标MaxCompute项目所在地域的Endpoint,可根据网络连接方式自行选择,例如
http://service.cn-chengdu.maxcompute.aliyun.com/api
。详情请参见Endpoint。
返回结果:
0 1.182537 1 0.708048 Name: col1, dtype: float64