Python 2官方即将停止维护,因此MaxCompute的Python UDF提供了Python 3支持。MaxCompute使用的Python 3版本为CPython-3.7.3。本文主要介绍如何实现Python 3的UDF ,包括UDF、UDAF和UDTF三种函数。

使用限制

Python 3与Python 2不兼容。由于在一个SQL作业中,只能选定Python 3或Python 2版本来执行,所以在您使用Python 3之前,需要考虑兼容性的问题,在一个SQL里不允许同时使用Python 3 UDF和 Python 2 UDF 。

开启Python 3

Python 3目前只支持作业级别开启。在执行Python 3 UDF的SQL语句前增加如下语句一起执行,即可开启Python 3。
set odps.sql.python.version=cp37;

第三方库

MaxCompute Python UDF支持第三方库,Python 2运行环境中除了安装了标准库之外还安装了比较常用的第三方库Numpy,做为标准库的补充。

MaxCompute内置的Python 3运行环境中未安装第三方库Numpy。如果您需要使用Numpy的UDF,需要手动上传Numpy的Wheel包。从PyPI或镜像下载Numpy包时,包的文件名为numpy-<版本号>-cp37-cp37m-manylinux1_x86_64.whl

Python2 UDF迁移

Python 2官方即将停止维护,因此建议您:
  • 全新项目:如果是在一个新的项目空间,或者您的项目空间中第一次使用Python编写UDF ,建议所有的Python UDF都直接使用Python 3编写。
  • 存量项目:如果您的项目中使用了大量的Python 2 UDF,那么开启Python 3时需要谨慎。 因为在一个SQL中不允许同时使用Python 3 UDF和 Python 2 UDF。如果您打算逐渐将所有Python 2 UDF迁移成Python 3 UDF,推荐的方法如下:
    • 对于新作业和新UDF,使用Python 3编写,作业级别设置开启Python 3。
    • 对于旧的Python 2 UDF进行改写,使其可以同时兼容Python 2和Python 3(改写方法请参见将Python2代码移植到Python3)。
      说明 如果您写的UDF是一个公共UDF,授权给了多个项目空间使用,建议您的UDF同时兼容Python 2和Python 3。

参数与返回值

参数与返回值的指定方式,如下所示。
@odps.udf.annotate(signature)

Python UDF目前支持的MaxCompute SQL数据类型包括BIGINT、STRING、DOUBLE、BOOLEAN和DATETIME。SQL语句在执行之前,必须确定所有函数的参数类型和返回值类型。因此对于Python这一动态类型语言,需要通过对UDF类加Decorator的方式指定函数签名。

函数签名Signature通过字符串指定,命令格式如下。
arg_type_list '->' type_list
arg_type_list: type_list | '*' | ''
type_list: [type_list ','] type
type: 'bigint' | 'string' | 'double' | 'boolean' | 'datetime'
命令说明
  • 箭头左边表示参数类型,右边表示返回值类型。
  • 只有UDTF的返回值可以是多列,UDF和UDAF只能返回一列。
  • 星号(*)代表变长参数。使用变长参数时,UDF、UDTF、UDAF可以匹配任意输入参数。
合法的Signature示例如下。
'bigint,double->string'            # 参数为bigint、double,返回值为string。
'bigint,boolean->string,datetime'  # UDTF参数为bigint、boolean,返回值为string,datetime。
'*->string'                        # 变长参数,输入参数任意,返回值为string。
'->double'                         # 参数为空,返回值为double。

查询语义解析阶段会检查不符合函数签名的用法,抛出错误,禁止执行此函数。执行时,UDF函数的参数会以函数签名指定的类型传入。返回值类型也要与函数签名指定的类型一致,否则检查到类型不匹配时也会报错。

MaxCompute SQL数据类型对应的Python类型如下。
MaxCompute SQL Type Python Type
BIGINT INT
STRING STR
DOUBLE FLOAT
BOOLEAN BOOL
DATETIME INT
说明
  • DATETIME类型对应的Python类型是INT,值为Epoch UTC Time起至今的毫秒数。您可以通过Python标准库中的DATETIME模块处理日期时间类型。
  • 此外,odps.udf.int(value,[silent=True])增加了参数silent 。当silent为True时,如果value无法转为INT,则会返回None(不会抛出异常)。
  • NULL值对应Python里的None。

UDF

定义一个new-style class,并实现evaluate方法,即可实现Python UDF。
from odps.udf import annotate
@annotate("bigint,bigint->bigint")
class MyPlus(object):
   def evaluate(self, arg0, arg1):
       if None in (arg0, arg1):
           return None
       return arg0 + arg1
说明 Python UDF必须通过annotate指定函数签名。

Python UDF的使用示例请参考使用MaxCompute分析IP来源最佳实践

UDAF

  • class odps.udf.BaseUDAF:继承此类实现Python UDAF。
  • BaseUDAF.new_buffer():实现此方法返回聚合函数的中间值的bufferbuffer必须是marshallable对象(例如LIST、DICT),并且buffer的大小不应该随数据量递增。在极限情况下,buffer 在执行对象序列化后的大小不应该超过2MB。
  • BaseUDAF.iterate(buffer[, args, ...]):实现此方法将args聚合到中间值buffer中。
  • BaseUDAF.merge(buffer, pbuffer):实现此方法将两个中间值buffer聚合到一起,即将pbuffer合并到buffer中。
  • BaseUDAF.terminate(buffer):实现此方法将中间值buffer转换为MaxCompute SQL的基本类型。
示例如下,使用UDAF求平均值。
@annotate('double->double')
class Average(BaseUDAF):
    def new_buffer(self):
        return [0, 0]
    def iterate(self, buffer, number):
        if number is not None:
            buffer[0] += number
            buffer[1] += 1
    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]
    def terminate(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]

UDTF

  • class odps.udf.BaseUDTF:Python UDTF的基类。您可以继承此类实现processclose等方法。
  • BaseUDTF.__init__():初始化方法。继承类如果需要实现这个方法,必须在一开始调用基类的初始化方法super(BaseUDTF, self).__init__() init方法在整个UDTF生命周期中只会被调用一次,即在处理第一条记录之前。当UDTF需要保存内部状态时,可以在这个方法中初始化所有状态。
  • BaseUDTF.process([args, ...]):这个方法由MaxCompute SQL框架调用,SQL中每一条记录都会对应调用一次processprocess的参数为SQL语句中指定的UDTF输入参数。
  • BaseUDTF.forward([args, ...]):UDTF的输出方法。此方法由用户代码调用。每调用一次forward,便会输出一条记录。 forward的参数为SQL语句中指定的UDTF的输出参数。
  • BaseUDTF.close():UDTF的结束方法。此方法由MaxCompute SQL框架调用,并且只会被调用一次,即在处理完最后一条记录之后。
UDTF的示例如下。
#coding:utf-8
# explode.py
from odps.udf import annotate
from odps.udf import BaseUDTF
@annotate('string -> string')
class Explode(BaseUDTF):
   """将string按逗号分隔输出成多条记录。
   def process(self, arg):
       props = arg.split(',')
       for p in props:
           self.forward(p)
说明 Python UDTF也可以不加annotate指定参数类型和返回值类型。这样,函数在SQL中使用时可以匹配任意输入参数,但返回值类型无法推导,所有输出参数都将被视为STRING类型。因此在调用forward时,就必须将所有输出值转成STRING类型。

引用资源

Python UDF可以通过odps.distcache模块引用资源文件,目前支持引用文件资源和表资源。

  • odps.distcache.get_cache_file(resource_name):返回指定名字的资源内容。
    • resource_name为STRING类型,对应当前项目空间中已存在的资源名。如果资源名非法或者没有相应的资源,则会抛出异常。
    • 返回值为File-like对象。在使用完这个对象后,调用者应该调用close方法释放打开的资源文件。
    示例如下。
    @annotate('bigint->string')
    class DistCacheExample(object):
    def __init__(self):
        cache_file = get_cache_file('test_distcache.txt')
        kv = {}
        for line in cache_file:
            line = line.strip()
            if not line:
                continue
            k, v = line.split()
            kv[int(k)] = v
        cache_file.close()
        self.kv = kv
    def evaluate(self, arg):
        return self.kv.get(arg)
  • odps.distcache.get_cache_table(resource_name): 返回指定资源表的内容。
    • resource_name为STRING类型,对应当前Project中已存在的资源表名。如果资源名非法或者没有相应的资源,会抛出异常。
    • 返回值为Generator类型,调用者通过遍历获取表的内容,每次遍历得到的是以数组形式存在的表中的一条记录。
    示例如下。
    from odps.udf import annotate
    from odps.distcache import get_cache_table
    @annotate('->string')
    class DistCacheTableExample(object):
        def __init__(self):
            self.records = list(get_cache_table('udf_test'))
            self.counter = 0
            self.ln = len(self.records)
        def evaluate(self):
            if self.counter > self.ln - 1:
                return None
            ret = self.records[self.counter]
            self.counter += 1
            return str(ret)