MaxCompute使用的Python 2版本为2.7。本文为您介绍如何通过Python 2语言编写UDF。
UDF代码结构
您可以通过MaxCompute Studio工具使用Python 2语言编写UDF代码,代码中需要包含如下信息:
编码声明:可选。
固定声明格式为
#coding:utf-8
或# -*- coding: utf-8 -*-
,二者等效。当Python 2代码中出现中文字符时,运行程序会报错,必须在代码头部增加编码声明。导入模块:必选。
至少要包含
from odps.udf import annotate
,导入函数签名模块,MaxCompute才可以识别后续代码中定义的函数签名。当UDF代码中需要引用文件资源或表资源时,需要包含from odps.distcache import get_cache_file
(文件资源)或from odps.distcache import get_cache_table
(表资源)。函数签名。必选。
格式为
@annotate(<signature>)
,signature
用于定义函数的输入参数和返回值的数据类型。更多函数签名信息,请参见函数签名及数据类型。自定义Python类:必选。
UDF代码的组织单位,定义了实现业务需求的变量及方法。您还可以在代码中引用MaxCompute内置的第三方库或引用文件、表资源。更多信息,请参见第三方库或引用资源。
evaluate
方法:必选。位于自定义的Python类中。
evaluate
方法定义了输入参数和返回值。一个Python类中只能包含一个evaluate
方法。
UDF代码示例如下。
#coding:utf-8
#导入函数签名模块。
from odps.udf import annotate
#函数签名。
@annotate("bigint,bigint->bigint")
#自定义Python类。
class MyPlus(object):
#evaluate方法。
def evaluate(self, arg0, arg1):
if None in (arg0, arg1):
return None
return arg0 + arg1
使用限制
MaxCompute Python 2 UDF使用的Python版本为2.7,并以沙箱模式执行用户代码,即代码是在一个受限的运行环境中执行的。在该环境中,以下行为会被禁止:
读写本地文件。
启动子进程。
启动线程。
使用Socket通信。
其他系统调用。
基于上述原因,您上传的代码都必须通过标准Python实现。Python标准库中涉及到上述功能的模块或C扩展模块都会被禁止使用。具体标准库的可用模块说明如下:
所有基于标准Python实现(不依赖扩展模块)的模块都可用。
C扩展模块中下列模块可用:
array、audioop
binascii、bisect
cmath、_codecs_cn、_codecs_hk、_codecs_iso2022、_codecs_jp、_codecs_kr、_codecs_tw、_collections、cStringIO
datetime
_functools、future_builtins、
_heapq、_hashlib
itertools
_json
_locale、_lsprof
math、_md5、_multibytecodec
operator
_random
_sha256、_sha512、_sha、_struct、strop
time
unicodedata
_weakref
cPickle
沙箱限制了您的代码最多可向标准输出和标准错误输出写入数据的大小为20 KB,即
sys.stdout/sys.stderr
最多能写入20 KB数据,多余的字符会被忽略。
第三方库
MaxCompute的Python 2运行环境中安装了除Python标准库外比较常用的第三方库,作为标准库的补充,例如Numpy。
使用第三方库存在限制,例如禁止本地访问、网络I/O受限,因此第三方库中涉及到相关功能的API也被禁止使用。
函数签名及数据类型
函数签名格式如下。
@annotate(<signature>)
signature
为字符串,用于标识输入参数和返回值的数据类型。执行UDF时,UDF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。
'arg_type_list -> type'
其中:
arg_type_list
:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。arg_type_list
还支持星号(*)或为空(''):当
arg_type_list
为星号(*)时,表示输入参数为任意个数。当
arg_type_list
为空('')时,表示无输入参数。
type
:表示返回值的数据类型。UDF只返回一列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
在编写UDF代码过程中,您可以根据MaxCompute项目的数据类型版本选取合适的数据类型,更多数据类型版本及各版本支持的数据类型信息,请参见数据类型版本说明。
合法的函数签名示例如下。
函数签名示例 | 说明 |
| 输入参数类型为BIGINT、DOUBLE,返回值类型为STRING。 |
| 输入任意个参数,返回值类型为STRING。 |
| 无输入参数,返回值类型为DOUBLE。 |
| 输入参数类型为ARRAY<BIGINT>,返回值类型为STRUCT<x:STRING, y:INT>。 |
| 无输入参数,返回值类型为MAP<BIGINT, STRING>。 |
为确保编写Python UDF过程中使用的数据类型与MaxCompute支持的数据类型保持一致,您需要关注二者间的数据类型映射关系。具体映射关系如下。
MaxCompute SQL Type | Python 2 Type |
BIGINT | INT |
STRING | STR |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
DATETIME | INT |
FLOAT | FLOAT |
CHAR | STR |
VARCHAR | STR |
BINARY | BYTEARRAY |
DATE | INT |
DECIMAL | DECIMAL.DECIMAL |
ARRAY | LIST |
MAP | DICT |
STRUCT | COLLECTIONS.NAMEDTUPLE |
DATETIME类型对应的Python类型是INT,值为Epoch UTC Time起至今的毫秒数。您可以通过Python标准库中的DATETIME模块处理日期时间类型。
odps.udf.int(value,[silent=True])
增加了参数silent
。当silent
为True时,如果value
无法转为INT,则会返回None(不会返回异常)。NULL值对应Python的None。
引用资源
Python UDF可以通过odps.distcache
模块引用资源,支持引用文件资源和表资源。
odps.distcache.get_cache_file(resource_name)
:返回指定文件资源的内容。resource_name
为STRING类型,对应当前MaxCompute项目中已存在的文件资源名。如果文件资源名非法或者没有相应的文件资源,会返回异常。说明使用UDF访问资源,在创建UDF时需要声明引用的资源,否则会报错。
返回值为File-like对象。在使用完此对象后,您需要调用
close
方法释放打开的资源文件。
引用文件资源示例如下。
from odps.udf import annotate from odps.distcache import get_cache_file @annotate('bigint->string') class DistCacheExample(object): def __init__(self): cache_file = get_cache_file('test_distcache.txt') kv = {} for line in cache_file: line = line.strip() if not line: continue k, v = line.split() kv[int(k)] = v cache_file.close() self.kv = kv def evaluate(self, arg): return self.kv.get(arg)
odps.distcache.get_cache_table(resource_name)
:返回指定表资源的内容。resource_name
支持STRING类型,对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。返回值为GENERATOR类型,调用者以遍历方式获取表的内容,每次遍历可得到以数组形式存在的表中的一条记录。
引用表资源示例如下。
from odps.udf import annotate from odps.distcache import get_cache_table @annotate('->string') class DistCacheTableExample(object): def __init__(self): self.records = list(get_cache_table('udf_test')) self.counter = 0 self.ln = len(self.records) def evaluate(self): if self.counter > self.ln - 1: return None ret = self.records[self.counter] self.counter += 1 return str(ret)
UDF开发:通用流程
开发UDF时通常需进行准备工作、编写UDF代码、上传并注册UDF、调用调试UDF这几个步骤。同时MaxCompute支持多种工具,例如常见的MaxCompute Studio、DataWorks、odpscmd。
使用各类工具开发Python2 UDF的流程与Python3的流程一致,通用流程示例请参见Python3的相关文档UDF开发:通用流程。
使用MaxCompute Studio完整开发及调用Python 2 UDF的示例也可参见开发Python UDF。
UDF开发完成后:UDF调用说明
完成Python 2 UDF开发后,您即可通过MaxCompute SQL调用Python 2 UDF。调用方法如下:
在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,您可以参照内建函数的使用方法使用自定义函数。
跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。更多跨项目分享信息,请参见基于Package跨项目访问资源。