全部产品
MaxCompute

Java UDF

更新时间:2017-09-22 10:12:13   分享:   

MaxCompute 的 UDF 包括:UDF,UDAF 和 UDTF 三种函数,本文将重点介绍如何通过 Java 实现这三种函数。

参数与返回值类型

MaxCompute2.0 版本升级后,Java UDF 支持的数据类型从原来的 Bigint,String,Double,Boolean 扩展了更多基本的数据类型,同时还扩展支持了 ARRAY,MAP,STRUCT 等复杂类型。

  • Java UDF 使用新基本类型的方法,如下所示:

    • UDAF 和 UDTF 通过 @Resolve 注解来获取 signature,如:@Resolve("smallint->varchar(10)")

    • UDF 通过反射分析 evaluate 来获取 signature,此时 MaxCompute 内置类型与 Java 类型符合一一映射关系。

  • JAVA UDF 使用复杂类型的方法,如下所示:

    • UDAF 和 UDTF 通过 @Resolve annotation 来指定 sinature,在 MaxCompute2.0 上线后,您即可在 Resolve annotation 中。如:@Resolve("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>")

    • UDF 通过 evaluate 方法的 signature 来映射 UDF 的输入输出类型,此时参考 MaxCompute 类型与 Java 类型的映射关系。其中 array 对应 java.util.List,map 对应 java.util.Map,struct 对应 com.aliyun.odps.data.Struct。

      注意:

      • com.aliyun.odps.data.Struct 从反射看不出 field name 和 field type,所以需要用 @Resolve annotation 来辅助。即如果需要在 UDF 中使用 struct,要求在 UDF class 上也标注上 @Resolve 注解,这个注解只会影响参数或返回值中包含 com.aliyun.odps.data.Struct 的重载。

      • 目前 class 上只能提供一个 @Resolve annotation,因此一个 UDF 中带有 struct 参数或返回值的重载只能有一个。

MaxCompute 数据类型与 Java 类型的对应关系,如下所示:

MaxCompute Type Java Type
Tinyint java.lang.Byte
Smallint java.lang.Short
Int java.lang.Integer
Bigint java.lang.Long
Float java.lang.Float
Double java.lang.Double
Decimal java.math.BigDecimal
Boolean java.lang.Boolean
String java.lang.String
Varchar com.aliyun.odps.data.Varchar
Binary com.aliyun.odps.data.Binary
Datetime java.util.Date
Timestamp java.sql.Timestamp
Array java.util.List
Map java.util.Map
Struct com.aliyun.odps.data.Struct

注意:

  • Java 中对应的数据类型以及返回值数据类型是对象,首字母请务必大写。
  • SQL 中的 NULL 值通过 Java 中的 NULL 引用表示,因此 Java primitive type 是不允许使用的,因为无法表示 SQL 中的 NULL 值。
  • 此处 Array 类型对应的 Java 类型是 List,而不是数组。

UDF

实现 UDF 需要继承 com.aliyun.odps.udf.UDF 类,并实现 evaluate 方法。evaluate 方法必须是非 static 的 public 方法 。Evaluate 方法的参数和返回值类型将作为 SQL 中 UDF 的函数签名。这意味着您可以在 UDF 中实现多个 evaluate 方法,在调用 UDF 时,框架会依据 UDF 调用的参数类型匹配正确的 evaluate 方法 。

UDF 的示例如下:

  1. package org.alidata.odps.udf.examples;
  2. import com.aliyun.odps.udf.UDF;
  3. public final class Lower extends UDF {
  4. public String evaluate(String s) {
  5. if (s == null) { return null; }
  6. return s.toLowerCase();
  7. }
  8. }

可以通过实现void setup(ExecutionContext ctx)void close()来分别实现 UDF 的初始化和结束代码。

UDF 的使用方式与 MaxCompute SQL 中普通的内建函数相同,详情请参见 内建函数

UDAF

实现 Java UDAF 类需要继承 com.aliyun.odps.udf.Aggregator,并实现如下几个接口:

  1. public abstract class Aggregator implements ContextFunction {
  2. @Override
  3. public void setup(ExecutionContext ctx) throws UDFException {
  4. }
  5. @Override
  6. public void close() throws UDFException {
  7. }
  8. /**
  9. * 创建聚合Buffer
  10. * @return Writable 聚合buffer
  11. */
  12. abstract public Writable newBuffer();
  13. /**
  14. * @param buffer 聚合buffer
  15. * @param args SQL中调用UDAF时指定的参数
  16. * @throws UDFException
  17. */
  18. abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
  19. /**
  20. * 生成最终结果
  21. * @param buffer
  22. * @return Object UDAF的最终结果
  23. * @throws UDFException
  24. */
  25. abstract public Writable terminate(Writable buffer) throws UDFException;
  26. abstract public void merge(Writable buffer, Writable partial) throws UDFException;
  27. }

其中最重要的是 iterate,merge 和 terminate 三个接口,UDAF 的主要逻辑依赖于这三个接口的实现。此外,还需要您实现自定义的 Writable buffer。

以实现求平均值 avg 为例,下图简要说明了在 MaxCompute UDAF 中这一函数的实现逻辑及计算流程:

在上图中,输入数据被按照一定的大小进行分片(有关分片的描述请参见 MapReduce),每片的大小适合一个 worker 在适当的时间内完成。这个分片大小的设置需要您手动配置完成。

UDAF 的计算过程分为两个阶段:

  • 第一阶段:每个 worker 统计分片内数据的个数及汇总值,您可以将每个分片内的数据个数及汇总值视为一个中间结果。

  • 第二阶段:worker 汇总上一个阶段中每个分片内的信息。在最终输出时,r.sum / r.count 即是所有输入数据的平均值。

计算平均值的 UDAF 的代码示例,如下所示:

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import com.aliyun.odps.io.DoubleWritable;
  5. import com.aliyun.odps.io.Writable;
  6. import com.aliyun.odps.udf.Aggregator;
  7. import com.aliyun.odps.udf.UDFException;
  8. import com.aliyun.odps.udf.annotation.Resolve;
  9. @Resolve({"double->double"})
  10. public class AggrAvg extends Aggregator {
  11. private static class AvgBuffer implements Writable {
  12. private double sum = 0;
  13. private long count = 0;
  14. @Override
  15. public void write(DataOutput out) throws IOException {
  16. out.writeDouble(sum);
  17. out.writeLong(count);
  18. }
  19. @Override
  20. public void readFields(DataInput in) throws IOException {
  21. sum = in.readDouble();
  22. count = in.readLong();
  23. }
  24. }
  25. private DoubleWritable ret = new DoubleWritable();
  26. @Override
  27. public Writable newBuffer() {
  28. return new AvgBuffer();
  29. }
  30. @Override
  31. public void iterate(Writable buffer, Writable[] args) throws UDFException {
  32. DoubleWritable arg = (DoubleWritable) args[0];
  33. AvgBuffer buf = (AvgBuffer) buffer;
  34. if (arg != null) {
  35. buf.count += 1;
  36. buf.sum += arg.get();
  37. }
  38. }
  39. @Override
  40. public Writable terminate(Writable buffer) throws UDFException {
  41. AvgBuffer buf = (AvgBuffer) buffer;
  42. if (buf.count == 0) {
  43. ret.set(0);
  44. } else {
  45. ret.set(buf.sum / buf.count);
  46. }
  47. return ret;
  48. }
  49. @Override
  50. public void merge(Writable buffer, Writable partial) throws UDFException {
  51. AvgBuffer buf = (AvgBuffer) buffer;
  52. AvgBuffer p = (AvgBuffer) partial;
  53. buf.sum += p.sum;
  54. buf.count += p.count;
  55. }
  56. }

注意:

  • UDAF 在 SQL 中的使用语法与普通的内建聚合函数相同,详情请参见 聚合函数
  • 关于如何运行 UDTF 的方法与 UDF 类似,详情请参见 运行 UDF

UDTF

Java UDTF 需要继承 com.aliyun.odps.udf.UDTF 类。这个类需要实现 4 个接口,如下表所示:

接口定义 描述
public void setup(ExecutionContext ctx) throws UDFException 初始化方法,在UDTF处理输入数据前,调用用户自定义的初始化行为。在每个Worker内setup会被先调用一次。
public void process(Object[] args) throws UDFException 这个方法由框架调用,SQL中每一条记录都会对应调用一次process,process的参数为SQL语句中指定的UDTF输入参数。输入参数以Object[]的形式传入,输出结果通过forward函数输出。用户需要在process函数内自行调用forward,以决定输出数据。
public void close() throws UDFException UDTF的结束方法,此方法由框架调用,并且只会被调用一次,即在处理完最后一条记录之后。
public void forward(Object …o) throws UDFException 用户调用forward方法输出数据,每次forward代表输出一条记录。对应SQL语句UDTF的as子句指定的列。

UDTF 的程序示例,如下所示:

  1. package org.alidata.odps.udtf.examples;
  2. import com.aliyun.odps.udf.UDTF;
  3. import com.aliyun.odps.udf.UDTFCollector;
  4. import com.aliyun.odps.udf.annotation.Resolve;
  5. import com.aliyun.odps.udf.UDFException;
  6. // TODO define input and output types, e.g., "string,string->string,bigint".
  7. @Resolve({"string,bigint->string,bigint"})
  8. public class MyUDTF extends UDTF {
  9. @Override
  10. public void process(Object[] args) throws UDFException {
  11. String a = (String) args[0];
  12. Long b = (Long) args[1];
  13. for (String t: a.split("\\s+")) {
  14. forward(t, b);
  15. }
  16. }
  17. }

注意:

以上只是程序示例,关于如何在 MaxCompute 中运行 UDTF 的方法与 UDF 类似,详情请参见:运行 UDF

在 SQL 中可以这样使用这个 UDTF,假设在 MaxCompute 上创建 UDTF 时注册函数名为 user_udtf:

  1. select user_udtf(col0, col1) as (c0, c1) from my_table;

假设 my_table 的 col0,col1 的值如下所示:

  1. +------+------+
  2. | col0 | col1 |
  3. +------+------+
  4. | A B | 1 |
  5. | C D | 2 |
  6. +------+------+

则 select 出的结果,如下所示:

  1. +----+----+
  2. | c0 | c1 |
  3. +----+----+
  4. | A | 1 |
  5. | B | 1 |
  6. | C | 2 |
  7. | D | 2 |
  8. +----+----+

使用说明

UDTF 在 SQL 中的常用方式如下:

  1. select user_udtf(col0, col1, col2) as (c0, c1) from my_table;
  2. select user_udtf(col0, col1, col2) as (c0, c1) from
  3. (select * from my_table distribute by key sort by key) t;
  4. select reduce_udtf(col0, col1, col2) as (c0, c1) from
  5. (select col0, col1, col2 from
  6. (select map_udtf(a0, a1, a2, a3) as (col0, col1, col2) from my_table) t1
  7. distribute by col0 sort by col0, col1) t2;

但使用 UDTF 有如下使用限制:

  • 同一个 SELECT 子句中不允许有其他表达式。

    1. select value, user_udtf(key) as mycol ...
  • UDTF 不能嵌套使用。

    1. select user_udtf1(user_udtf2(key)) as mycol...
  • 不支持在同一个 select 子句中与 group by / distribute by / sort by 联用。

    1. select user_udtf(key) as mycol ... group by mycol

其他 UDTF 示例

在 UDTF 中,您可以读取 MaxCompute 的 资源。利用 UDTF 读取 MaxCompute 资源的示例,如下所示:

  1. 编写 UDTF 程序,编译成功后导出 jar 包(udtfexample1.jar)。

    1. package com.aliyun.odps.examples.udf;
    2. import java.io.BufferedReader;
    3. import java.io.IOException;
    4. import java.io.InputStream;
    5. import java.io.InputStreamReader;
    6. import java.util.Iterator;
    7. import com.aliyun.odps.udf.ExecutionContext;
    8. import com.aliyun.odps.udf.UDFException;
    9. import com.aliyun.odps.udf.UDTF;
    10. import com.aliyun.odps.udf.annotation.Resolve;
    11. /**
    12. * project: example_project
    13. * table: wc_in2
    14. * partitions: p2=1,p1=2
    15. * columns: colc,colb
    16. */
    17. @Resolve({ "string,string->string,bigint,string" })
    18. public class UDTFResource extends UDTF {
    19. ExecutionContext ctx;
    20. long fileResourceLineCount;
    21. long tableResource1RecordCount;
    22. long tableResource2RecordCount;
    23. @Override
    24. public void setup(ExecutionContext ctx) throws UDFException {
    25. this.ctx = ctx;
    26. try {
    27. InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
    28. BufferedReader br = new BufferedReader(new InputStreamReader(in));
    29. String line;
    30. fileResourceLineCount = 0;
    31. while ((line = br.readLine()) != null) {
    32. fileResourceLineCount++;
    33. }
    34. br.close();
    35. Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
    36. tableResource1RecordCount = 0;
    37. while (iterator.hasNext()) {
    38. tableResource1RecordCount++;
    39. iterator.next();
    40. }
    41. iterator = ctx.readResourceTable("table_resource2").iterator();
    42. tableResource2RecordCount = 0;
    43. while (iterator.hasNext()) {
    44. tableResource2RecordCount++;
    45. iterator.next();
    46. }
    47. } catch (IOException e) {
    48. throw new UDFException(e);
    49. }
    50. }
    51. @Override
    52. public void process(Object[] args) throws UDFException {
    53. String a = (String) args[0];
    54. long b = args[1] == null ? 0 : ((String) args[1]).length();
    55. forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
    56. + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
    57. }
    58. }
  2. 添加资源到 MaxCompute。

    1. Add file file_resource.txt;
    2. Add jar udtfexample1.jar;
    3. Add table table_resource1 as table_resource1;
    4. Add table table_resource2 as table_resource2;
  3. 在 MaxCompute 中创建 UDTF 函数(my_udtf)。

    1. create function mp_udtf as com.aliyun.odps.examples.udf.UDTFResource using 'udtfexample1.jar, file_resource.txt, table_resource1, table_resource2';
  4. 在 MaxCompute 创建资源表 table_resource1、table_resource2 和物理表 tmp1,并插入相应的数据。

  5. 运行该 UDTF。

    1. select mp_udtf("10","20") as (a, b, fileResourceLineCount) from table_resource1;
    2. 返回:
    3. +-------+------------+-------+
    4. | a | b | fileResourceLineCount |
    5. +-------+------------+-------+
    6. | 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
    7. | 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
    8. +-------+------------+-------+

复杂数据类型示例

如以下代码,定义了一个有三个 overloads 的 UDF,其中第一个用了 array 作为参数,第二个用了 map 作为参数,第三个用了 struct。由于第三个 overloads 用了 struct 作为参数或者返回值,因此要求必须要对 UDF class 打上 @Resolve annotation,来指定 struct 的具体类型。

  1. @Resolve("struct<a:bigint>,string->string")
  2. public class UdfArray extends UDF {
  3. public String evaluate(List<String> vals, Long len) {
  4. return vals.get(len.intValue());
  5. }
  6. public String evaluate(Map<String,String> map, String key) {
  7. return map.get(key);
  8. }
  9. public String evaluate(Struct struct, String key) {
  10. return struct.getFieldValue("a") + key;
  11. }
  12. }

您可以直接将复杂类型传入 UDF 中,如下所示:

  1. create function my_index as 'UdfArray' using 'myjar.jar';
  2. select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;
本文导读目录
本文导读目录
以上内容是否对您有帮助?