MaxCompute使用的Python 2版本为2.7。本文为您介绍如何通过Python 2语言编写UDTF。

UDTF代码结构

您可以通过MaxCompute Studio工具使用Python 2语言编写UDTF代码,代码中需要包含如下信息:
  • 编码声明:可选。

    固定声明格式为#coding:utf-8# -*- coding: utf-8 -*-,二者等效。当Python 2代码中出现中文字符时,运行程序会报错,您需要在代码头部增加编码声明。

  • 导入模块:必选。

    至少要包含from odps.udf import annotatefrom odps.udf import BaseUDTFfrom odps.udf import annotate用于导入函数签名模块,MaxCompute才可以识别后续代码中定义的函数签名。from odps.udf import BaseUDTF为Python UDTF的基类,您需要通过此类在派生类中实现processclose等方法。

    当UDTF代码中需要引用文件资源或表资源时,需要包含from odps.distcache import get_cache_file(文件资源)或from odps.distcache import get_cache_table(表资源)。

  • 函数签名:可选。

    格式为@annotate(<signature>)signature用于定义函数的输入参数和返回值的数据类型。如果不指定函数签名,在SQL中调用UDTF时,可以匹配任意类型的输入参数,但返回值类型无法推导,所有输出参数都将会是STRING类型。更多函数签名信息,请参见函数签名及数据类型

  • 自定义Python类(派生类):必选。

    UDTF代码的组织单位,定义了实现业务需求的变量及方法。您还可以在代码中引用MaxCompute内置的第三方库或引用文件、表资源。更多信息,请参见第三方库引用资源

  • 实现Python类的方法:必选。

    Python类实现包含如下4个方法,您可以根据实际需要进行选择。

    方法定义 描述
    BaseUDTF.init() 初始化方法。派生类如果需要实现此方法,必须在一开始调用基类的初始化方法super(BaseUDTF, self).init()init方法在整个UDTF生命周期中只会被调用一次,即在处理第一条记录之前。当UDTF需要保存内部状态时,可以通过此方法初始化所有状态。
    BaseUDTF.process([args, ...]) SQL中每一条记录都会对应调用一次processprocess的参数为SQL语句中指定的UDTF输入参数。
    BaseUDTF.forward([args, ...]) UDTF的输出方法。此方法由用户代码调用。每调用一次forward,就会输出一条记录。forward的参数为SQL语句中指定的UDTF的输出参数。

    如果Python代码中未指定函数签名,在调用forward方法时,必须将所有输出值转换为STRING类型。

    BaseUDTF.close() UDTF的结束方法。只会被调用一次,即在处理完最后一条记录之后被调用。
UDTF代码示例如下。
#coding:utf-8
#导入函数签名模块及基类。
from odps.udf import annotate
from odps.udf import BaseUDTF
#函数签名。
@annotate('string -> string')
#自定义Python类。
class Explode(BaseUDTF):
#实现Python类的方法。
   def process(self, arg):
       props = arg.split(',')
       for p in props:
           self.forward(p)

使用限制

MaxCompute Python 2 UDTF使用的Python版本为2.7,并以沙箱模式执行用户代码,即代码是在一个受限的运行环境中执行的。在该环境中,以下行为会被禁止:
  • 读写本地文件。
  • 启动子进程。
  • 启动线程。
  • 使用Socket通信。
  • 其他系统调用。
基于上述原因,您上传的代码都必须通过标准Python实现。Python标准库中涉及到上述功能的模块或C扩展模块都会被禁止使用。具体标准库的可用模块说明如下:
  • 所有基于标准Python实现(不依赖扩展模块)的模块都可用。
  • C扩展模块中下列模块可用:
    • array、audioop
    • binascii、bisect
    • cmath、_codecs_cn、_codecs_hk、_codecs_iso2022、_codecs_jp、_codecs_kr、_codecs_tw、_collections、cStringIO
    • datetime
    • _functools、future_builtins、
    • _heapq、_hashlib
    • itertools
    • _json
    • _locale、_lsprof
    • math、_md5、_multibytecodec
    • operator
    • _random
    • _sha256、_sha512、_sha、_struct、strop
    • time
    • unicodedata
    • _weakref
    • cPickle
  • 沙箱限制了您的代码最多可向标准输出和标准错误输出写入数据的大小为20 KB,即sys.stdout/sys.stderr最多能写入20 KB数据,多余的字符会被忽略。

第三方库

MaxCompute的Python 2运行环境中安装了除Python标准库外比较常用的第三方库,作为标准库的补充,例如Numpy。
说明 使用第三方库存在限制,例如禁止本地访问、网络I/O受限,因此第三方库中涉及到相关功能的API也被禁止使用。

函数签名及数据类型

函数签名格式如下。
@annotate(<signature>)
signature为函数签名字符串,用于标识输入参数和返回值的数据类型。执行UDTF时,UDTF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。
'arg_type_list -> type_list'
其中:
  • type_list:表示返回值的数据类型。UDTF可以返回多列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
  • arg_type_list:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
    arg_type_list还支持星号(*)或为空(''):
    • arg_type_list为星号(*)时,表示输入参数为任意个数。
    • arg_type_list为空('')时,表示无输入参数。
说明 在编写UDTF代码过程中,您可以根据MaxCompute项目的数据类型版本选取合适的数据类型,更多数据类型版本及各版本支持的数据类型信息,请参见数据类型版本说明
合法的函数签名示例如下。
函数签名示例 说明
@annotate('bigint,boolean->string,datetime') 输入参数类型为BIGINT、BOOLEAN,返回值类型为STRING、DATETIME。
@annotate('*->string, datetime') 输入任意个参数,返回值类型为STRING、DATETIME。
@annotate('->double, bigint, string') 无输入参数,返回值类型为DOUBLE、BIGINT、STRING。
@annotate("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>") 输入参数类型为ARRAY、STRUCT、MAP,返回值类型为MAP、STRUCT。

为确保编写Python UDTF过程中使用的数据类型与MaxCompute支持的数据类型保持一致,您需要关注二者间的数据类型映射关系。具体映射关系如下。

MaxCompute SQL Type Python 2 Type
BIGINT INT
STRING STR
DOUBLE FLOAT
BOOLEAN BOOL
DATETIME INT
FLOAT FLOAT
CHAR STR
VARCHAR STR
BINARY BYTEARRAY
DATE INT
DECIMAL DECIMAL.DECIMAL
ARRAY LIST
MAP DICT
STRUCT COLLECTIONS.NAMEDTUPLE
说明
  • DATETIME类型对应的Python类型是INT,值为Epoch UTC Time起至今的毫秒数。您可以通过Python标准库中的DATETIME模块处理日期时间类型。
  • odps.udf.int(value,[silent=True])增加了参数silent。当silent为True时,如果value无法转为INT,则会返回None(不会返回异常)。
  • NULL值对应Python的None。

引用资源

Python UDTF可以通过odps.distcache模块引用资源,支持引用文件资源和表资源。

  • odps.distcache.get_cache_file(resource_name):返回指定文件资源的内容。
    • resource_name为STRING类型,对应当前MaxCompute项目中已存在的文件资源名。如果文件资源名非法或者没有相应的文件资源,会返回异常。
      说明 使用UDTF访问资源,在创建UDTF时需要声明引用的资源,否则会报错。
    • 返回值为File-like对象。在使用完此对象后,您需要调用close方法释放打开的资源文件。
  • odps.distcache.get_cache_table(resource_name):返回指定表资源的内容。
    • resource_name支持STRING类型,对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。
    • 返回值为GENERATOR类型,调用者以遍历方式获取表的内容,每次遍历可得到以数组形式存在的表中的一条记录。
引用文件资源和表资源的代码示例如下。
# -*- coding: utf-8 -*-
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
    """读取资源文件和资源表里的pageid、adid,生成dict
    """
    def __init__(self):
        import json
        cache_file = get_cache_file('test_json.txt')
        self.my_dict = json.load(cache_file)
        cache_file.close()
        records = list(get_cache_table('table_resource1'))
        for record in records:
            self.my_dict[record[0]] = [record[1]]
    """输入pageid,输出pageid以及它对应的所有adid
    """
    def process(self, pageid):
        for adid in self.my_dict[pageid]:
            self.forward(pageid, adid)

使用说明

按照开发流程,完成Python 2 UDTF开发后,您即可通过MaxCompute SQL调用Python 2 UDTF。调用方法如下:
  • 在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,您可以参照内建函数的使用方法使用自定义函数。
  • 跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:select B:udf_in_other_project(arg0, arg1) as res from table_t;。更多跨项目分享信息,请参见基于Package的跨项目空间资源访问

使用MaxCompute Studio完整开发及调用Python 2 UDTF的操作,请参见开发Python UDF