MaxCompute当前支持利用Python 3语言来开发自定义函数(UDF),以满足特定的业务逻辑需求。本文为您介绍如何通过Python 3语言编写UDF。
UDF代码结构
您可以通过MaxCompute Studio工具使用Python 3语言编写UDF代码,代码中需要包含如下信息:
导入模块:必选。
至少要包含
from odps.udf import annotate
,导入函数签名模块,MaxCompute才可以识别后续代码中定义的函数签名。当UDF代码中需要引用文件资源或表资源时,需要包含from odps.distcache import get_cache_file
(文件资源)或from odps.distcache import get_cache_table
(表资源)。函数签名:必选。
格式为
@annotate(<signature>)
,signature
用于定义函数的输入参数和返回值的数据类型。更多函数签名信息,请参见函数签名与数据类型。自定义Python类:必选。
UDF代码的组织单位,定义了实现业务需求的变量及方法。您还可以在代码中引用MaxCompute内置的第三方库或引用文件、表资源。更多信息,请参见第三方库或引用资源。
evaluate
方法:必选。位于自定义的Python类中。
evaluate
方法定义了输入参数和返回值。一个Python类中只能包含一个evaluate
方法。
UDF代码示例如下。
#导入函数签名模块。
from odps.udf import annotate
#函数签名。
@annotate("bigint,bigint->bigint")
#自定义Python类。
class MyPlus(object):
#evaluate方法。
def evaluate(self, arg0, arg1):
if None in (arg0, arg1):
return None
return arg0 + arg1
使用限制
访问外网
MaxCompute默认不支持通过自定义函数访问外网。如果您需要通过自定义函数访问外网,请根据业务情况填写并提交网络连接申请表单,MaxCompute技术支持团队会及时联系您完成网络开通操作。表单填写指导,请参见网络开通流程。
访问VPC网络
MaxCompute默认不支持通过UDF访问VPC网络。如果您的UDF涉及访问VPC网络中的资源时,需要先创建MaxCompute与目标VPC网络间的网络连接,才可以直接通过UDF访问VPC网络中的资源,操作详情请参见通过UDF访问VPC网络资源。
读取表数据
目前版本不支持使用UDF/UDAF/UDTF读取以下场景的表数据:
做过表结构修改(Schema Evolution)的表数据。
包含复杂数据类型的表数据。
包含JSON数据类型的表数据。
Transactional表的表数据。
注意事项
Python 3与Python 2不兼容。在您使用Python 3之前,需要考虑兼容性问题,在一个SQL中不允许同时使用Python 3和Python 2。
Python 2官方已于2020年初停止维护,建议您根据项目类型执行迁移操作:全新项目:新MaxCompute停止维护Python 2,
UDF开发:通用流程
开发UDF时通常需进行准备工作、编写UDF代码、上传并注册UDF、调用调试UDF这几个步骤。同时MaxCompute支持多种工具,以下以常见的MaxCompute Studio、DataWorks、odpscmd三种工具为例,以一个具体的示例为您介绍UDF开发的通用流程。
使用MaxCompute Studio
准备工作。
使用MaxCompute Studio开发调试UDF时,您需要先安装MaxCompute Studio并连接MaxCompute项目,做好UDF开发前准备工作。操作详情请参见:
编写UDF代码。
在Project区域MaxCompute Studio目录下,右键单击scripts,选择 。
在Create new MaxCompute python class对话框中输入类名Name,选择类型为Python UDF,单击OK完成。
在编辑框中编写UDF代码。
from odps.udf import annotate @annotate("string,bigint->string") class GetUrlChar(object): def evaluate(self, url, n): if n == 0: return "" try: index = url.find(".htm") if index < 0: return "" a = url[:index] index = a.rfind("/") b = a[index + 1:] c = b.split("-") if len(c) < n: return "" return c[-n] except Exception: return "Internal error"
说明如果需要本地调试Java UDF,请参见测试UDF。
上传并注册UDF。
右键单击目标Python程序,选择Deploy to server…。配置函数名称后单击ok。操作详情请参见上传及注册。
本示例配置函数名称为UDF_GET_URL_CHAR。
调用UDF。
在左侧导航栏单击Project Explore,在目标MaxCompute项目上单击右键,选择Open Console并在Console区域输入调用UDF的SQL语句,按Enter键运行即可。SQL命令示例如下。
set odps.sql.python.version=cp37; -- python3 UDF需要使用该命令开启python3 select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);
返回结果如下。
+-----+ | _c0 | +-----+ | a | +-----+
使用DataWorks
准备工作。
使用DataWorks开发调试UDF时,您需要先开通DataWorks并绑定MaxCompute项目,做好UDF开发前准备工作。操作详情请参见使用DataWorks连接。
编写UDF代码。
您可以在任意Python开发工具中开发UDF代码并打包为一个代码包。您可以使用以下UDF代码示例。
from odps.udf import annotate @annotate("string,bigint->string") class GetUrlChar(object): def evaluate(self, url, n): if n == 0: return "" try: index = url.find(".htm") if index < 0: return "" a = url[:index] index = a.rfind("/") b = a[index + 1:] c = b.split("-") if len(c) < n: return "" return c[-n] except Exception: return "Internal error"
上传并注册UDF。
您可以将已打包好的代码包通过DataWorks上传并完成UDF注册,操作详情请参见:
调用UDF。
注册完成UDF后,您可以创建一个ODPS SQL节点,在节点中编写并创建SQL命令来调用调试UDF。创建ODPS SQL节点的操作请参见开发ODPS SQL任务,命令示例如下。
set odps.sql.python.version=cp37; -- python3 UDF需要使用该命令开启python3 select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);
使用odpscmd
准备工作。
使用odpscmd开发调试UDF时,您需要先下载安装odpscmd工具,并配置config文件连接MaxCompute项目,做好UDF开发前准备工作。操作详情请参见使用本地客户端(odpscmd)连接。
编写UDF代码。
您可以在任意Python开发工具中开发UDF代码并打包为一个代码包。您可以使用以下UDF代码示例。
from odps.udf import annotate @annotate("string,bigint->string") class GetUrlChar(object): def evaluate(self, url, n): if n == 0: return "" try: index = url.find(".htm") if index < 0: return "" a = url[:index] index = a.rfind("/") b = a[index + 1:] c = b.split("-") if len(c) < n: return "" return c[-n] except Exception: return "Internal error"
上传并注册UDF。
您可以将已打包好的代码包通过odpscmd上传并完成UDF注册,操作详情请参见:
调用UDF。
注册完成UDF后,您可以编写并创建SQL命令来调用调试UDF。命令示例如下。
set odps.sql.python.version=cp37; -- python3 UDF需要使用该命令开启python3 select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);
UDF开发:安装第三方库Numpy
MaxCompute内置的Python 3运行环境中未安装第三方库Numpy。如果您需要使用Numpy的UDF,请手动上传Numpy的WHEEL包。从PyPI或镜像下载Numpy包时,包的文件名为numpy-<版本号>-cp37-cp37m-manylinux1_x86_64.whl。上传包的操作请参见资源操作或Python UDF使用第三方包。
Python 3支持的标准库列表请参见Python 3标准库。
UDF开发:函数签名与数据类型
函数签名格式如下。
@annotate(<signature>)
signature
为字符串,用于标识输入参数和返回值的数据类型。执行UDF时,UDF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。
'arg_type_list -> type'
其中:
arg_type_list
:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。arg_type_list
还支持星号(*)或为空(''):当
arg_type_list
为星号(*)时,表示输入参数为任意个数。当
arg_type_list
为空('')时,表示无输入参数。
type
:表示返回值的数据类型。UDF只返回一列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
在编写UDF代码过程中,您可以根据MaxCompute项目的数据类型版本选取合适的数据类型,更多数据类型版本及各版本支持的数据类型信息,请参见数据类型版本说明。
合法的函数签名示例如下。
函数签名示例 | 说明 |
| 输入参数类型为BIGINT、DOUBLE,返回值类型为STRING。 |
| 输入任意个参数,返回值类型为STRING。 |
| 无输入参数,返回值类型为DOUBLE。 |
| 输入参数类型为ARRAY<BIGINT>,返回值类型为STRUCT<x:STRING, y:INT>。 |
| 无输入参数,返回值类型为MAP<BIGINT, STRING>。 |
为确保编写Python UDF过程中使用的数据类型与MaxCompute支持的数据类型保持一致,您需要关注二者间的数据类型映射关系。具体映射关系如下。
MaxCompute SQL Type | Python 3 Type |
BIGINT | INT |
STRING | UNICODE |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
DATETIME | DATETIME.DATETIME |
FLOAT | FLOAT |
CHAR | UNICODE |
VARCHAR | UNICODE |
BINARY | BYTES |
DATE | DATETIME.DATE |
DECIMAL | DECIMAL.DECIMAL |
ARRAY | LIST |
MAP | DICT |
STRUCT | COLLECTIONS.NAMEDTUPLE |
UDF开发:引用资源
Python UDF可以通过odps.distcache
模块引用资源,支持引用文件资源和表资源。
odps.distcache.get_cache_file(resource_name, mode)
:以指定模式mode
返回指定文件资源的内容。resource_name
支持STRING类型,对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。mode
支持STRING类型,默认值为't'
。当mode
为't'
时以文本格式打开文件,当mode
为'b'
时以二进制格式打开文件。返回值为File-like对象。在使用完此对象后,您需要调用
close
方法释放打开的资源文件。
引用文件资源示例如下。
from odps.udf import annotate from odps.distcache import get_cache_file @annotate('bigint->string') class DistCacheExample(object): def __init__(self): cache_file = get_cache_file('test_distcache.txt') kv = {} for line in cache_file: line = line.strip() if not line: continue k, v = line.split() kv[int(k)] = v cache_file.close() self.kv = kv def evaluate(self, arg): return self.kv.get(arg)
odps.distcache.get_cache_table(resource_name)
:返回指定资源表的内容。resource_name
对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。支持读取表中BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、FLOAT、CHAR、VARCHAR、BINARY、DATE、DECIMAL、ARRAY、MAP和STRUCT类型数据。返回值为Generator类型,调用者通过遍历获取表的内容,每次遍历得到的是以数组形式存在的表中的一条记录。
引用表资源示例如下。
from odps.udf import annotate
from odps.distcache import get_cache_table
@annotate('->string')
class DistCacheTableExample(object):
def __init__(self):
self.records = list(get_cache_table('udf_test'))
self.counter = 0
self.ln = len(self.records)
def evaluate(self):
if self.counter > self.ln - 1:
return None
ret = self.records[self.counter]
self.counter += 1
return str(ret)
UDF开发完成后:UDF调用说明
按照开发流程,完成Python 3 UDF开发后,您即可通过MaxCompute SQL调用Python 3 UDF。调用方法如下。
开启Python 3
MaxCompute默认使用Python 2,如果您要使用Python 3,可以在Session级别设置如下属性开启Python 3,并与SQL语句一起提交执行。
set odps.sql.python.version=cp37;
调用函数
在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,您可以参照内建函数的使用方法使用自定义函数。
跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。更多跨项目分享信息,请参见基于Package跨项目访问资源。
Python 2 UDF迁移
Python 2官方已于2020年初停止维护,建议您根据项目类型执行迁移操作:
全新项目:新MaxCompute项目,或第一次使用Python语言编写UDF的MaxCompute项目。建议所有的Python UDF都直接使用Python 3语言编写。
存量项目:创建了大量Python 2 UDF的MaxCompute项目。请您谨慎开启Python 3。如果您计划逐步将所有Python 2 UDF迁移为Python 3 UDF,推荐方法如下:
新作业和新UDF:使用Python 3语言编写,在Session级别开启Python 3。开启Python 3方法,请参见开启Python 3。
Python 2 UDF:改写Python 2 UDF,使其可以同时兼容Python 2和Python 3。改写方法请参见将Python 2代码移植到Python 3。
说明如果您需要编写公共UDF,并为多个MaxCompute项目授权UDF的操作权限,建议UDF同时兼容Python 2和Python 3。