本文为您介绍如何通过Java语言编写UDAF。

UDAF代码结构

您可以通过IntelliJ IDEA(Maven)或MaxCompute Studio工具使用Java语言编写UDAF代码,代码中需要包含如下信息:

  • Java包(Package):可选。

    您可以将定义的Java类打包,为后续查找和使用类提供方便。

  • 继承UDAF类:必选。

    必须携带的UDAF类为com.aliyun.odps.udf.Aggregatorcom.aliyun.odps.udf.annotation.Resolve(对应@Resolve注解)。com.aliyun.odps.udf.UDFException(可选,对应实现Java类初始化和结束的方法)。当您需要使用其他UDAF类或者需要用到复杂数据类型时,请根据MaxCompute UDF概述添加需要的类。

  • @Resolve注解:必选。

    格式为@Resolve(<signature>)signature为函数签名,用于定义函数的输入参数和返回值的数据类型。UDAF无法通过反射分析获取函数签名,只能通过@Resolve注解方式获取函数签名,例如@Resolve("smallint->varchar(10)")。更多@Resolve注解信息,请参见@Resolve注解

  • 自定义Java类:必选。

    UDAF代码的组织单位,定义了实现业务需求的变量及方法。

  • 实现Java类的方法:必选。

    实现Java类需要继承com.aliyun.odps.udf.Aggregator类并实现如下方法。

    import com.aliyun.odps.udf.ContextFunction;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.UDFException;
    public abstract class Aggregator implements ContextFunction {
        //初始化方法。
        @Override
        public void setup(ExecutionContext ctx) throws UDFException {
        }
        //结束方法。
        @Override
        public void close() throws UDFException {
        }
        //创建聚合Buffer。
        abstract public Writable newBuffer();
        //iterate方法。
        //buffer为聚合buffer,是指一个阶段性的汇总数据,即在不同的Map任务中,group by后得出的数据(可理解为一个集合),每行执行一次。
        //Writable[]表示一行数据,在代码中指代传入的列。例如writable[0]表示第一列,writable[1]表示第二列。
        //args为SQL中调用UDAF时指定的参数,不能为NULL,但是args里面的元素可以为NULL,代表对应的输入数据是NULL。
        abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
        //terminate方法。
        abstract public Writable terminate(Writable buffer) throws UDFException;
        //merge方法。
        abstract public void merge(Writable buffer, Writable partial) throws UDFException;
    }

    其中:iteratemergeterminate是最重要的三个方法,UDAF的主要逻辑依赖于这三个方法的实现。此外,还需要您实现自定义的Writable buffer。

    Writable buffer将内存中的对象转换成字节序列(或其他数据传输协议)以便于储存到磁盘(持久化)和网络传输。因为MaxCompute使用分布式计算的方式来处理聚合函数,因此需要知道如何序列化和反序列化数据,以便于数据在不同的设备之间进行传输。

    编写Java UDAF时可以使用Java Type或Java Writable Type,MaxCompute项目支持处理的数据类型与Java数据类型的详细映射关系,请参见数据类型

UDAF代码示例如下。

//将定义的Java类组织在org.alidata.odps.udaf.examples包中。
package org.alidata.odps.udaf.examples;
//继承UDAF类。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
//自定义Java类。
//@Resolve注解。
@Resolve("double->double")
public class AggrAvg extends Aggregator {
//实现Java类的方法。
  private static class AvgBuffer implements Writable {
    private double sum = 0;
    private long count = 0;
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeDouble(sum);
      out.writeLong(count);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
      sum = in.readDouble();
      count = in.readLong();
    }
  }
  private DoubleWritable ret = new DoubleWritable();
  @Override
  public Writable newBuffer() {
    return new AvgBuffer();
  }
  @Override
  public void iterate(Writable buffer, Writable[] args) throws UDFException {
    DoubleWritable arg = (DoubleWritable) args[0];
    AvgBuffer buf = (AvgBuffer) buffer;
    if (arg != null) {
      buf.count += 1;
      buf.sum += arg.get();
    }
  }
  @Override
  public Writable terminate(Writable buffer) throws UDFException {
    AvgBuffer buf = (AvgBuffer) buffer;
    if (buf.count == 0) {
      ret.set(0);
    } else {
      ret.set(buf.sum / buf.count);
    }
    return ret;
  }
  @Override
  public void merge(Writable buffer, Writable partial) throws UDFException {
    AvgBuffer buf = (AvgBuffer) buffer;
    AvgBuffer p = (AvgBuffer) partial;
    buf.sum += p.sum;
    buf.count += p.count;
  }
}
说明

上述UDAF代码示例中,iteratemerge方法里的buffer为复用buffer,用于将输入的各行数据通过用户自定义实现的方式聚合到该buffer中。

使用限制

  • 访问外网

    MaxCompute默认不支持通过自定义函数访问外网。如果您需要通过自定义函数访问外网,请根据业务情况填写并提交网络连接申请表单,MaxCompute技术支持团队会及时联系您完成网络开通操作。表单填写指导,请参见网络开通流程

  • 访问VPC网络

    MaxCompute默认不支持通过UDF访问VPC网络。如果您的UDF涉及访问VPC网络中的资源时,需要先创建MaxCompute与目标VPC网络间的网络连接,才可以直接通过UDF访问VPC网络中的资源,操作详情请参见通过UDF访问VPC网络资源

  • 读取表数据

    目前版本不支持使用UDF/UDAF/UDTF读取以下场景的表数据:

    • 做过表结构修改(Schema Evolution)的表数据。

    • 包含复杂数据类型的表数据。

    • 包含JSON数据类型的表数据。

    • Transactional表的表数据。

注意事项

在编写Java UDAF时,您需要注意:

  • 不同UDAF JAR包中不建议存在类名相同但实现逻辑不一样的类。例如UDAF1、UDAF2分别对应资源JAR包udaf1.jar、udaf2.jar,两个JAR包里都包含名称为com.aliyun.UserFunction.class的类但实现逻辑不一样,当同一条SQL语句中同时调用UDAF1和UDAF2时,MaxCompute会随机加载其中一个类,此时会导致UDAF执行结果不符合预期甚至编译失败。

  • Java UDAF中输入或返回值的数据类型是对象,数据类型首字母必须大写,例如String。

  • SQL中的NULL值通过Java中的NULL表示。Java Primitive Type无法表示SQL中的NULL值,不允许使用。

@Resolve注解

@Resolve注解格式如下。

@Resolve(<signature>)

signature为字符串,用于标识输入参数和返回值的数据类型。执行UDAF时,UDAF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。

'arg_type_list -> type'

其中:

  • arg_type_list:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。

    arg_type_list还支持星号(*)或为空(''):

    • arg_type_list为星号(*)时,表示输入参数为任意个数。

    • arg_type_list为空('')时,表示无输入参数。

  • type:表示返回值的数据类型。UDAF只返回一列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。

说明

在编写UDAF代码过程中,您可以根据MaxCompute项目的数据类型版本选取合适的数据类型,更多数据类型版本及各版本支持的数据类型信息,请参见数据类型版本说明

合法@Resolve注解示例如下。

@Resolve注解示例

说明

@Resolve('bigint,double->string')

输入参数类型为BIGINT、DOUBLE,返回值类型为STRING。

@Resolve('*->string')

输入任意个参数,返回值类型为STRING。

@Resolve('->double')

无输入参数,返回值类型为DOUBLE。

@Resolve('array<bigint>->struct<x:string, y:int>')

输入参数类型为ARRAY<BIGINT>,返回值类型为STRUCT<x:STRING, y:INT>。

数据类型

在MaxCompute中不同数据类型版本支持的数据类型不同。从MaxCompute 2.0版本开始,扩展了更多的新数据类型,同时还支持ARRAY、MAP、STRUCT等复杂类型。更多MaxCompute数据类型版本信息,请参见数据类型版本说明

为确保编写Java UDAF过程中使用的数据类型与MaxCompute支持的数据类型保持一致,您需要关注二者间的数据类型映射关系。具体映射关系如下。

MaxCompute Type

Java Type

Java Writable Type

TINYINT

java.lang.Byte

ByteWritable

SMALLINT

java.lang.Short

ShortWritable

INT

java.lang.Integer

IntWritable

BIGINT

java.lang.Long

LongWritable

FLOAT

java.lang.Float

FloatWritable

DOUBLE

java.lang.Double

DoubleWritable

DECIMAL

java.math.BigDecimal

BigDecimalWritable

BOOLEAN

java.lang.Boolean

BooleanWritable

STRING

java.lang.String

Text

VARCHAR

com.aliyun.odps.data.Varchar

VarcharWritable

BINARY

com.aliyun.odps.data.Binary

BytesWritable

DATE

java.sql.Date

DateWritable

DATETIME

java.util.Date

DatetimeWritable

TIMESTAMP

java.sql.Timestamp

TimestampWritable

INTERVAL_YEAR_MONTH

不涉及

IntervalYearMonthWritable

INTERVAL_DAY_TIME

不涉及

IntervalDayTimeWritable

ARRAY

java.util.List

不涉及

MAP

java.util.Map

不涉及

STRUCT

com.aliyun.odps.data.Struct

不涉及

说明

当MaxCompute项目采用MaxCompute 2.0数据类型版本时,UDAF的输入或返回值才可以使用Java Writable Type。

使用说明

按照开发流程,完成Java UDAF开发后,您即可通过MaxCompute SQL调用Java UDAF。调用方法如下:

  • 在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,您可以参照内建函数的使用方法使用自定义函数。

  • 跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:select B:udf_in_other_project(arg0, arg1) as res from table_t;。更多跨项目分享信息,请参见基于Package跨项目访问资源

使用MaxCompute Studio完整开发及调用Java UDAF的操作,请参见使用示例

使用示例

以通过MaxCompute Studio开发计算平均值的UDAF函数AggrAvg为例,实现逻辑如下。

求平均值逻辑

  1. 输入数据分片:MaxCompute会按照MapReduce处理流程对输入数据按照一定的大小进行分片,每片的大小适合一个Worker在适当的时间内完成。

    分片大小需要您通过odps.stage.mapper.split.size参数进行配置。分片逻辑请参见MapReduce流程说明

  2. 计算平均值第一阶段:每个Worker统计分片内数据的个数及汇总值。您可以将每个分片内的数据个数及汇总值视为一个中间结果。

  3. 计算平均值第二阶段:汇总第一阶段中每个分片内的信息。

  4. 最终输出:r.sum/r.count即是所有输入数据的平均值。

开发并调用Java UDAF的操作步骤如下:

  1. 准备工作。

    使用MaxCompute Studio开发调试UDF时,您需要先安装MaxCompute Studio并连接MaxCompute项目,做好UDF开发前准备工作。操作详情请参见:

    1. 安装MaxCompute Studio

    2. 创建MaxCompute项目连接

    3. 创建MaxCompute Java Module

  2. 编写UDAF代码。

    1. Project区域,右键单击Module的源码目录(即src > main > java),选择new > MaxCompute Java新建Java Class

    2. Create new MaxCompute java class对话框,单击UDAF并填写Name后,按Enter键。例如Java Class名称为AggrAvg。创建Java Class

      Name为创建的MaxCompute Java Class名称。如果还没有创建Package,在此处填写packagename.classname,会自动生成Package。

    3. 在代码编写区域写入如下代码。编写代码UDAF代码示例如下。

      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
      import com.aliyun.odps.io.DoubleWritable;
      import com.aliyun.odps.io.Writable;
      import com.aliyun.odps.udf.Aggregator;
      import com.aliyun.odps.udf.UDFException;
      import com.aliyun.odps.udf.annotation.Resolve;
      @Resolve("double->double")
      public class AggrAvg extends Aggregator {
        private static class AvgBuffer implements Writable {
          private double sum = 0;
          private long count = 0;
          @Override
          public void write(DataOutput out) throws IOException {
            out.writeDouble(sum);
            out.writeLong(count);
          }
          @Override
          public void readFields(DataInput in) throws IOException {
            sum = in.readDouble();
            count = in.readLong();
          }
        }
        private DoubleWritable ret = new DoubleWritable();
        @Override
        public Writable newBuffer() {
          return new AvgBuffer();
        }
        @Override
        public void iterate(Writable buffer, Writable[] args) throws UDFException {
          DoubleWritable arg = (DoubleWritable) args[0];
          AvgBuffer buf = (AvgBuffer) buffer;
          if (arg != null) {
            buf.count += 1;
            buf.sum += arg.get();
          }
        }
        @Override
        public Writable terminate(Writable buffer) throws UDFException {
          AvgBuffer buf = (AvgBuffer) buffer;
          if (buf.count == 0) {
            ret.set(0);
          } else {
            ret.set(buf.sum / buf.count);
          }
          return ret;
        }
        @Override
        public void merge(Writable buffer, Writable partial) throws UDFException {
          AvgBuffer buf = (AvgBuffer) buffer;
          AvgBuffer p = (AvgBuffer) partial;
          buf.sum += p.sum;
          buf.count += p.count;
        }
      }
  3. 在本地运行调试UDAF,确保代码可以运行成功。

    更多调试操作,请参见通过本地运行调试UDF

    调试UDAF

    说明

    运行参数可参照图示数据填写。

  4. 将创建的UDAF打包为JAR包,上传至MaxCompute项目并注册函数。例如函数名称为user_udaf

    更多打包操作,请参见操作步骤

    打包

  5. 在MaxCompute Studio的左侧导航栏,单击Project Explorer,在目标MaxCompute项目上单击右键,启动MaxCompute客户端,并执行SQL命令调用新创建的UDAF。

    假设待查询目标表my_table的数据结构如下。

    +------------+------------+
    | col0       | col1       |
    +------------+------------+
    | 1.2        | 2.0        |
    | 1.6        | 2.1        |
    +------------+------------+

    执行如下SQL命令调用UDAF。

    select user_udaf(col0) as c0 from my_table;

    返回结果如下。

    +----+
    | c0 |
    +----+
    | 1.4|
    +----+