UDF开发(Python3)

MaxCompute当前支持利用Python 3语言来开发自定义函数(UDF),以满足特定的业务逻辑需求。本文为您介绍如何通过Python 3语言编写UDF。

UDF代码结构

您可以通过MaxCompute Studio工具使用Python 3语言编写UDF代码,代码中需要包含如下信息:

  • 导入模块:必选。

    至少要包含from odps.udf import annotate,导入函数签名模块,MaxCompute才可以识别后续代码中定义的函数签名。当UDF代码中需要引用文件资源或表资源时,需要包含from odps.distcache import get_cache_file(文件资源)或from odps.distcache import get_cache_table(表资源)。

  • 函数签名:必选。

    格式为@annotate(<signature>)signature用于定义函数的输入参数和返回值的数据类型。更多函数签名信息,请参见函数签名与数据类型

  • 自定义Python类:必选。

    UDF代码的组织单位,定义了实现业务需求的变量及方法。您还可以在代码中引用MaxCompute内置的第三方库或引用文件、表资源。更多信息,请参见第三方库引用资源

  • evaluate方法:必选。

    位于自定义的Python类中。evaluate方法定义了输入参数和返回值。一个Python类中只能包含一个evaluate方法。

UDF代码示例如下。

#导入函数签名模块。
from odps.udf import annotate
#函数签名。
@annotate("bigint,bigint->bigint")
#自定义Python类。
class MyPlus(object):
#evaluate方法。
   def evaluate(self, arg0, arg1):
       if None in (arg0, arg1):
           return None
       return arg0 + arg1

使用限制

  • 访问外网

    MaxCompute默认不支持通过自定义函数访问外网。如果您需要通过自定义函数访问外网,请根据业务情况填写并提交网络连接申请表单,MaxCompute技术支持团队会及时联系您完成网络开通操作。表单填写指导,请参见网络开通流程

  • 访问VPC网络

    MaxCompute默认不支持通过UDF访问VPC网络。如果您的UDF涉及访问VPC网络中的资源时,需要先创建MaxCompute与目标VPC网络间的网络连接,才可以直接通过UDF访问VPC网络中的资源,操作详情请参见通过UDF访问VPC网络资源

  • 读取表数据

    目前版本不支持使用UDF/UDAF/UDTF读取以下场景的表数据:

    • 做过表结构修改(Schema Evolution)的表数据。

    • 包含复杂数据类型的表数据。

    • 包含JSON数据类型的表数据。

    • Transactional表的表数据。

注意事项

Python 3与Python 2不兼容。在您使用Python 3之前,需要考虑兼容性问题,在一个SQL中不允许同时使用Python 3和Python 2。

说明

Python 2官方已于2020年初停止维护,建议您根据项目类型执行迁移操作:全新项目:新MaxCompute停止维护Python 2,

UDF开发:通用流程

开发UDF时通常需进行准备工作、编写UDF代码、上传并注册UDF、调用调试UDF这几个步骤。同时MaxCompute支持多种工具,以下以常见的MaxCompute Studio、DataWorks、odpscmd三种工具为例,以一个具体的示例为您介绍UDF开发的通用流程。

使用MaxCompute Studio

  1. 准备工作。

    使用MaxCompute Studio开发调试UDF时,您需要先安装MaxCompute Studio并连接MaxCompute项目,做好UDF开发前准备工作。操作详情请参见:

    1. 安装MaxCompute Studio

    2. 创建MaxCompute项目连接

    3. 配置Python开发环境

  2. 编写UDF代码。

    1. Project区域MaxCompute Studio目录下,右键单击scripts,选择New > MaxCompute Python

    2. Create new MaxCompute python class对话框中输入类名Name,选择类型为Python UDF,单击OK完成。

    3. 在编辑框中编写UDF代码。

      from odps.udf import annotate
      
      @annotate("string,bigint->string")
      class GetUrlChar(object):
      
          def evaluate(self, url, n):
              if n == 0:
                  return ""
              try:
                  index = url.find(".htm")
                  if index < 0:
                      return ""
                  a = url[:index]
                  index = a.rfind("/")
                  b = a[index + 1:]
                  c = b.split("-")
                  if len(c) < n:
                      return ""
                  return c[-n]
              except Exception:
                  return "Internal error"
      说明

      如果需要本地调试Java UDF,请参见测试UDF

  3. 上传并注册UDF。

    右键单击目标Python程序,选择Deploy to server…。配置函数名称后单击ok操作详情请参见上传及注册

    本示例配置函数名称为UDF_GET_URL_CHAR

  4. 调用UDF。

    在左侧导航栏单击Project Explore,在目标MaxCompute项目上单击右键,选择Open Console并在Console区域输入调用UDF的SQL语句,按Enter键运行即可。SQL命令示例如下。

    set odps.sql.python.version=cp37; -- python3 UDF需要使用该命令开启python3
    select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);

    返回结果如下。

    +-----+
    | _c0 |
    +-----+
    |  a  |
    +-----+

使用DataWorks

  1. 准备工作。

    使用DataWorks开发调试UDF时,您需要先开通DataWorks并绑定MaxCompute项目,做好UDF开发前准备工作。操作详情请参见使用DataWorks连接

  2. 编写UDF代码。

    您可以在任意Python开发工具中开发UDF代码并打包为一个代码包。您可以使用以下UDF代码示例。

    from odps.udf import annotate
    
    @annotate("string,bigint->string")
    class GetUrlChar(object):
    
        def evaluate(self, url, n):
            if n == 0:
                return ""
            try:
                index = url.find(".htm")
                if index < 0:
                    return ""
                a = url[:index]
                index = a.rfind("/")
                b = a[index + 1:]
                c = b.split("-")
                if len(c) < n:
                    return ""
                return c[-n]
            except Exception:
                return "Internal error"
  3. 上传并注册UDF。

    您可以将已打包好的代码包通过DataWorks上传并完成UDF注册,操作详情请参见:

    1. 创建并使用MaxCompute资源

    2. 创建并使用自定义函数

  4. 调用UDF。

    注册完成UDF后,您可以创建一个ODPS SQL节点,在节点中编写并创建SQL命令来调用调试UDF。创建ODPS SQL节点的操作请参见开发ODPS SQL任务,命令示例如下。

    set odps.sql.python.version=cp37; -- python3 UDF需要使用该命令开启python3
    select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);

使用odpscmd

  1. 准备工作。

    使用odpscmd开发调试UDF时,您需要先下载安装odpscmd工具,并配置config文件连接MaxCompute项目,做好UDF开发前准备工作。操作详情请参见使用本地客户端(odpscmd)连接

  2. 编写UDF代码。

    您可以在任意Python开发工具中开发UDF代码并打包为一个代码包。您可以使用以下UDF代码示例。

    from odps.udf import annotate
    
    @annotate("string,bigint->string")
    class GetUrlChar(object):
    
        def evaluate(self, url, n):
            if n == 0:
                return ""
            try:
                index = url.find(".htm")
                if index < 0:
                    return ""
                a = url[:index]
                index = a.rfind("/")
                b = a[index + 1:]
                c = b.split("-")
                if len(c) < n:
                    return ""
                return c[-n]
            except Exception:
                return "Internal error"
  3. 上传并注册UDF。

    您可以将已打包好的代码包通过odpscmd上传并完成UDF注册,操作详情请参见:

    1. ADD PY

    2. CREATE FUNCTION

  4. 调用UDF。

    注册完成UDF后,您可以编写并创建SQL命令来调用调试UDF。命令示例如下。

    set odps.sql.python.version=cp37; -- python3 UDF需要使用该命令开启python3
    select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);

UDF开发:安装第三方库Numpy

MaxCompute内置的Python 3运行环境中未安装第三方库Numpy。如果您需要使用Numpy的UDF,请手动上传Numpy的WHEEL包。从PyPI或镜像下载Numpy包时,包的文件名为numpy-<版本号>-cp37-cp37m-manylinux1_x86_64.whl。上传包的操作请参见资源操作Python UDF使用第三方包

Python 3支持的标准库列表请参见Python 3标准库

UDF开发:函数签名与数据类型

函数签名格式如下。

@annotate(<signature>)

signature为字符串,用于标识输入参数和返回值的数据类型。执行UDF时,UDF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。

'arg_type_list -> type'

其中:

  • arg_type_list:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。

    arg_type_list还支持星号(*)或为空(''):

    • arg_type_list为星号(*)时,表示输入参数为任意个数。

    • arg_type_list为空('')时,表示无输入参数。

  • type:表示返回值的数据类型。UDF只返回一列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。

说明

在编写UDF代码过程中,您可以根据MaxCompute项目的数据类型版本选取合适的数据类型,更多数据类型版本及各版本支持的数据类型信息,请参见数据类型版本说明

合法的函数签名示例如下。

函数签名示例

说明

'bigint,double->string'

输入参数类型为BIGINT、DOUBLE,返回值类型为STRING。

'*->string'

输入任意个参数,返回值类型为STRING。

'->double'

无输入参数,返回值类型为DOUBLE。

'array<bigint>->struct<x:string, y:int>'

输入参数类型为ARRAY<BIGINT>,返回值类型为STRUCT<x:STRING, y:INT>。

'->map<bigint, string>'

无输入参数,返回值类型为MAP<BIGINT, STRING>。

为确保编写Python UDF过程中使用的数据类型与MaxCompute支持的数据类型保持一致,您需要关注二者间的数据类型映射关系。具体映射关系如下。

MaxCompute SQL Type

Python 3 Type

BIGINT

INT

STRING

UNICODE

DOUBLE

FLOAT

BOOLEAN

BOOL

DATETIME

DATETIME.DATETIME

FLOAT

FLOAT

CHAR

UNICODE

VARCHAR

UNICODE

BINARY

BYTES

DATE

DATETIME.DATE

DECIMAL

DECIMAL.DECIMAL

ARRAY

LIST

MAP

DICT

STRUCT

COLLECTIONS.NAMEDTUPLE

UDF开发:引用资源

Python UDF可以通过odps.distcache模块引用资源,支持引用文件资源和表资源。

  • odps.distcache.get_cache_file(resource_name, mode):以指定模式mode返回指定文件资源的内容。

    • resource_name支持STRING类型,对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。

    • mode支持STRING类型,默认值为't'。当mode't'时以文本格式打开文件,当mode'b'时以二进制格式打开文件。

    • 返回值为File-like对象。在使用完此对象后,您需要调用close方法释放打开的资源文件。

    引用文件资源示例如下。

    from odps.udf import annotate
    from odps.distcache import get_cache_file
    @annotate('bigint->string')
    class DistCacheExample(object):
    def __init__(self):
        cache_file = get_cache_file('test_distcache.txt')
        kv = {}
        for line in cache_file:
            line = line.strip()
            if not line:
                continue
            k, v = line.split()
            kv[int(k)] = v
        cache_file.close()
        self.kv = kv
    def evaluate(self, arg):
        return self.kv.get(arg)
  • odps.distcache.get_cache_table(resource_name):返回指定资源表的内容。

    • resource_name对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。支持读取表中BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、FLOAT、CHAR、VARCHAR、BINARY、DATE、DECIMAL、ARRAY、MAP和STRUCT类型数据。

    • 返回值为Generator类型,调用者通过遍历获取表的内容,每次遍历得到的是以数组形式存在的表中的一条记录。

引用表资源示例如下。

from odps.udf import annotate
from odps.distcache import get_cache_table
@annotate('->string')
class DistCacheTableExample(object):
    def __init__(self):
        self.records = list(get_cache_table('udf_test'))
        self.counter = 0
        self.ln = len(self.records)
    def evaluate(self):
        if self.counter > self.ln - 1:
            return None
        ret = self.records[self.counter]
        self.counter += 1
        return str(ret)

UDF开发完成后:UDF调用说明

按照开发流程,完成Python 3 UDF开发后,您即可通过MaxCompute SQL调用Python 3 UDF。调用方法如下。

开启Python 3

MaxCompute默认使用Python 2,如果您要使用Python 3,可以在Session级别设置如下属性开启Python 3,并与SQL语句一起提交执行。

set odps.sql.python.version=cp37;

调用函数

  • 在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,您可以参照内建函数的使用方法使用自定义函数。

  • 跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:select B:udf_in_other_project(arg0, arg1) as res from table_t;。更多跨项目分享信息,请参见基于Package跨项目访问资源

Python 2 UDF迁移

Python 2官方已于2020年初停止维护,建议您根据项目类型执行迁移操作:

  • 全新项目:新MaxCompute项目,或第一次使用Python语言编写UDF的MaxCompute项目。建议所有的Python UDF都直接使用Python 3语言编写。

  • 存量项目:创建了大量Python 2 UDF的MaxCompute项目。请您谨慎开启Python 3。如果您计划逐步将所有Python 2 UDF迁移为Python 3 UDF,推荐方法如下:

    • 新作业和新UDF:使用Python 3语言编写,在Session级别开启Python 3。开启Python 3方法,请参见开启Python 3

    • Python 2 UDF:改写Python 2 UDF,使其可以同时兼容Python 2和Python 3。改写方法请参见将Python 2代码移植到Python 3

      说明

      如果您需要编写公共UDF,并为多个MaxCompute项目授权UDF的操作权限,建议UDF同时兼容Python 2和Python 3。

UDF示例demo