Java UDTF

采用Java语言编写UDTF函数可有效处理复杂数据处理任务并自定义逻辑,并且通过合理利用Java语言的特性,能更好地满足特定的数据处理需求,提升开发效率和处理性能。本文将介绍UDTF函数的代码结构、使用说明和示例。

UDTF代码结构

您可以通过IntelliJ IDEA(Maven)或MaxCompute Studio工具使用Java语言编写UDTF代码,代码中需要包含如下信息:

  • Java包(Package):可选。

    您可以将定义的Java类打包,为后续查找和使用类提供方便。

  • 继承UDTF类:必选。

    必须携带的UDTF类为com.aliyun.odps.udf.UDTFcom.aliyun.odps.udf.annotation.Resolve(对应@Resolve注解)和com.aliyun.odps.udf.UDFException(对应实现Java类的方法)。当您需要使用其他UDTF类或者需要用到复杂数据类型时,请根据MaxCompute UDF概述添加需要的类。

  • 自定义Java类:必选。

    UDTF代码的组织单位,定义了实现业务需求的变量及方法。

  • @Resolve注解:必选。

    格式为@Resolve(<signature>)signature为函数签名,用于定义函数的输入参数和返回值的数据类型。UDTF无法通过反射分析获取函数签名,只能通过@Resolve注解方式获取函数签名,例如@Resolve("smallint->varchar(10)")。更多@Resolve注解信息,请参见@Resolve注解

  • 实现Java类的方法:必选。

    Java类实现包含如下4个方法,您可以根据实际需要进行选择。

    接口定义

    描述

    public void setup(ExecutionContext ctx) throws UDFException

    初始化方法,在UDTF处理输入的数据前,MaxCompute会调用用户自定义的初始化行为。在每个Workersetup会被先调用一次。

    public void process(Object[] args) throws UDFException

    SQL中每一条记录都会对应调用一次processprocess的参数为SQL语句中指定的UDTF输入参数。输入参数以Object[]的形式传入,输出结果通过forward函数输出。您需要在process函数内自行调用forward,以决定输出数据。

    说明

    当未使用processclose方法触发forward调用时,可能会导致数据丢失,请务必谨慎使用,以避免出现此情况。例如:通过后台线程执行了forward调用,此时一定要确保process方法在forward调用结束后再结束,否则可能导致数据丢失。

    public void close() throws UDFException

    UDTF的结束方法。只会被调用一次,即在处理完最后一条记录之后被调用。

    public void forward(Object …o) throws UDFException

    调用forward方法输出数据,每调用一次forward代表输出一条记录。在SQL查询语句中调用UDTF时,可以通过as子句将forward输出的结果进行重命名。

    编写Java UDTF时可以使用Java TypeJava Writable Type,MaxCompute项目支持处理的数据类型与Java数据类型的详细映射关系,请参见数据类型

UDTF代码示例如下。

//将定义的Java类组织在org.alidata.odps.udtf.examples包中。
package org.alidata.odps.udtf.examples;
//继承UDTF类。
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.UDTFCollector;
import com.aliyun.odps.udf.annotation.Resolve;
import com.aliyun.odps.udf.UDFException;
//自定义Java类。  
//@Resolve注解。
@Resolve("string,bigint->string,bigint")
public class MyUDTF extends UDTF {     
     //实现Java类的方法。
     @Override
     public void process(Object[] args) throws UDFException {
         String a = (String) args[0];
         Long b = (Long) args[1];
         for (String t: a.split("\\s+")) {
         forward(t, b);
       }
     }
   }

使用限制

  • 不支持通过自定义函数访问外网。如果您需要通过自定义函数访问外网,请根据业务情况填写并提交表单,MaxCompute技术支持团队会及时联系您完成网络开通操作。表单填写指导,请参见网络开通流程

  • select语句中使用UDTF时,不允许存在其他列或表达式。错误示例如下。

    --查询语句中同时携带了UDTF和其他列。
    select value, user_udtf(key) as mycol ...
  • UDTF不能嵌套使用。错误示例如下。

    --user_udtf1嵌套了user_udtf2,不允许嵌套。
    select user_udtf1(user_udtf2(key)) as mycol...;
  • 不支持在同一个select子句中与group bydistribute bysort by联用。错误示例如下。

    --UDTF不能与group by联用。
    select user_udtf(key) as mycol ... group by mycol;

注意事项

在编写Java UDTF时,您需要注意:

  • 不同UDTF JAR包中不建议存在类名相同但实现逻辑不一样的类。例如UDTF1、UDTF2分别对应资源JARudtf1.jar、udtf2.jar,两个JAR包里都包含名称为com.aliyun.UserFunction.class的类但实现逻辑不一样,当同一条SQL语句中同时调用UDTF1UDTF2时,MaxCompute会随机加载其中一个类,此时会导致UDTF执行结果不符合预期甚至编译失败。

  • Java UDTF中输入或返回值的数据类型是对象,数据类型首字母必须大写,例如String。

  • SQL中的NULL值通过Java中的NULL表示。Java Primitive Type无法表示SQL中的NULL值,不允许使用。

@Resolve注解

@Resolve注解格式如下。

@Resolve(<signature>)

signature为函数签名字符串,用于标识输入参数和返回值的数据类型。执行UDTF时,UDTF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。

'arg_type_list -> type_list'

其中:

  • type_list:表示返回值的数据类型。UDTF可以返回多列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。

  • arg_type_list:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。

    arg_type_list还支持星号(*)或为空(''):

    • arg_type_list为星号(*)时,表示输入参数为任意个数。

    • arg_type_list为空('')时,表示无输入参数。

    更多Resolve注解语法扩展详情,请参见UDAFUDTF动态参数说明

合法@Resolve注解示例如下。

@Resolve注解示例

说明

@Resolve('bigint,boolean->string,datetime')

输入参数类型为BIGINT、BOOLEAN,返回值类型为STRING、DATETIME。

@Resolve('*->string, datetime')

输入任意个参数,返回值类型为STRING、DATETIME。

@Resolve('->double, bigint, string')

无输入参数,返回值类型为DOUBLE、BIGINT、STRING。

@Resolve("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>")

输入参数类型为ARRAY、STRUCT、MAP,返回值类型为MAP、STRUCT。

数据类型

MaxCompute中不同数据类型版本支持的数据类型不同。从MaxCompute 2.0版本开始,扩展了更多的新数据类型,同时还支持ARRAY、MAP、STRUCT等复杂类型。更多MaxCompute数据类型版本信息,请参见数据类型版本说明

为确保编写Java UDTF过程中使用的数据类型与MaxCompute支持的数据类型保持一致,您需要关注二者间的数据类型映射关系。具体映射关系如下。

MaxCompute Type

Java Type

Java Writable Type

TINYINT

java.lang.Byte

ByteWritable

SMALLINT

java.lang.Short

ShortWritable

INT

java.lang.Integer

IntWritable

BIGINT

java.lang.Long

LongWritable

FLOAT

java.lang.Float

FloatWritable

DOUBLE

java.lang.Double

DoubleWritable

DECIMAL

java.math.BigDecimal

BigDecimalWritable

BOOLEAN

java.lang.Boolean

BooleanWritable

STRING

java.lang.String

Text

VARCHAR

com.aliyun.odps.data.Varchar

VarcharWritable

BINARY

com.aliyun.odps.data.Binary

BytesWritable

DATE

java.sql.Date

DateWritable

DATETIME

java.util.Date

DatetimeWritable

TIMESTAMP

java.sql.Timestamp

TimestampWritable

INTERVAL_YEAR_MONTH

不涉及

IntervalYearMonthWritable

INTERVAL_DAY_TIME

不涉及

IntervalDayTimeWritable

ARRAY

java.util.List

不涉及

MAP

java.util.Map

不涉及

STRUCT

com.aliyun.odps.data.Struct

不涉及

说明

MaxCompute项目采用MaxCompute 2.0数据类型版本时,UDTF的输入或返回值才可以使用Java Writable Type。

使用说明

按照开发流程,完成Java UDTF开发后,您即可通过MaxCompute SQL调用Java UDTF。调用方法如下:

  • 在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,您可以参照内建函数的使用方法使用自定义函数。

  • 跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:select B:udf_in_other_project(arg0, arg1) as res from table_t;。更多跨项目分享信息,请参见基于Package跨项目访问资源

使用MaxCompute Studio完整开发及调用Java UDTF的操作,请参见使用示例

使用示例

以通过MaxCompute Studio开发UDTF为例,开发并调用Java UDTF的操作步骤如下:

  1. 准备工作。

    使用MaxCompute Studio开发调试UDF时,您需要先安装MaxCompute Studio并连接MaxCompute项目,做好UDF开发前准备工作。操作详情请参见:

    1. 安装MaxCompute Studio

    2. 创建MaxCompute项目连接

    3. 创建MaxCompute Java Module

  2. 编写UDTF代码。

    1. Project区域,右键单击Module的源码目录(即src > main > java),选择new > MaxCompute Java新建Java Class

    2. Create new MaxCompute java class对话框,单击UDTF并填写Name后,按Enter键。例如Java Class名称为MyUDTF。选择类型并填写名称

      Name为创建的MaxCompute Java Class名称。如果还没有创建Package,在此处填写packagename.classname,会自动生成Package。

    3. 在代码编写区域写入如下代码。编写UDTF代码UDTF代码示例如下。

      package org.alidata.odps.udtf.examples;
      import com.aliyun.odps.udf.UDTF;
      import com.aliyun.odps.udf.UDTFCollector;
      import com.aliyun.odps.udf.annotation.Resolve;
      import com.aliyun.odps.udf.UDFException;
      // TODO define input and output types, e.g., "string,string->string,bigint".
         @Resolve("string,bigint->string,bigint")
         public class MyUDTF extends UDTF {
           @Override
           public void process(Object[] args) throws UDFException {
             String a = (String) args[0];
             Long b = (Long) args[1];
             for (String t: a.split("\\s+")) {
               forward(t, b);
             }
           }
         }
  3. 在本地运行调试UDTF,确保代码可以运行成功。

    更多调试操作,请参见通过本地运行调试UDF

    本地调试UDTF

    说明

    运行参数可参照图示数据填写。

  4. 将创建的UDTF打包为JAR包,上传至MaxCompute项目并注册函数。例如函数名称为user_udtf

    更多打包操作,请参见操作步骤

    注册函数

  5. MaxCompute Studio的左侧导航栏,单击Project Explorer,在目标MaxCompute项目上单击右键,启动MaxCompute客户端,并执行SQL命令调用新创建的UDTF。

    假设待查询目标表my_table的数据结构如下。

    +------------+------------+
    | col0       | col1       |
    +------------+------------+
    | A B        | 1          |
    | C D        | 2          |
    +------------+------------+

    执行如下SQL命令调用UDTF。

    select user_udtf(col0, col1) as (c0, c1) from my_table;

    返回结果如下。

    +----+------------+
    | c0 | c1         |
    +----+------------+
    | A  | 1          |
    | B  | 1          |
    | C  | 2          |
    | D  | 2          |
    +----+------------+

相关文档

关于Java UDTF的使用示例详情,请参见Java UDTF使用示例