MaxCompute基于新一代的SQL引擎推出新功能UDT(User Defined Type)。MaxCompute的UDT功能允许您在SQL中直接调用第三方语言的类使用其方法,或直接使用第三方对象获取其数据内容。

UDT介绍

很多SQL引擎中UDT与MaxCompute的复杂类型STRUCT类似,相比之下,MaxCompute中的UDT与Create Type的概念更类似,Type中包含数据域和方法。MaxCompute不需要用特殊的DDL语法来定义新类型,通过UDT可以在SQL中直接使用新类型。通过如下示例,为您直观地介绍UDT的功能。

例如,在SQL语句中调用Java的java.lang包。您可以使用以下两种方法:

  • 通过UDT功能在SQL语句中直接调用java.lang

    --打开新类型,因为下面的操作会用到INTEGER,即INT类型。
    set odps.sql.type.system.odps2=true;    
    SELECT java.lang.Integer.MAX_VALUE;

    和Java语言一样,java.lang包可以省略,所以上述示例可以简写为如下语句。

    set odps.sql.type.system.odps2=true;
    SELECT Integer.MAX_VALUE;

    输出结果如下。

    +-----------+
    | max_value |
    +-----------+
    | 2147483647 |
    +-----------+
  • 使用UDF在SQL语句中调用java.lang

    1. 代码开发(定义一个UDF的类。)

      package com.aliyun.odps.test;
      public class IntegerMaxValue extends com.aliyun.odps.udf.UDF {
        public Integer evaluate() {
           return Integer.MAX_VALUE;
        } 
      }
    2. 进行打包、上传及注册操作(将上面的UDF编译,并打成JAR包,然后上传JAR包,并创建Function。)

      add jar odps-test.jar;
      create function integer_max_value as 'com.aliyun.odps.test.IntegerMaxValue' using 'odps-test.jar';
    3. 在SQL中调用UDF。

      select integer_max_value();

    由上例可以看出,UDT简化了上述一系列的过程,方便您使用其它语言扩展SQL的功能。

应用场景

UDT的常用场景如下:

  • MaxCompute没有提供,但可以使用其它语言简单实现的功能。

    例如,只需调用一次Java内置类的方法即可实现,但MaxCompute却没有提供简单的方法实现这个功能。如果使用UDF实现,整个过程会过于繁杂。

  • SQL中需要调用第三方库实现相关功能。希望能够在SQL中直接调用,而不需要再Wrap一层UDF。

  • SQL中需要直接调用第三方语言源代码。Select Transform支持把脚本写到SQL语句中,提升可读性和代码易维护性。但是某些语言无法这样使用,例如Java源代码必须经过编译才能执行,通过UDT功能将这些语言也可以直接写入SQL中。

使用限制

目前版本不支持使用UDF/UDAF/UDT读取以下场景的表数据:

  • 做过表结构修改(Schema Evolution)的表数据。

  • 包含复杂数据类型的表数据。

  • 包含JSON数据类型的表数据。

  • Transactional表的表数据。

实现原理

通过以下示例为您介绍UDT的执行过程。

--示例数据。
@table1 := select * from values ('100000000000000000000') as t(x);
@table2 := select * from values (100L) as t(y);
--代码逻辑。
--new创建对象。
@a := select new java.math.BigInteger(x) x from @table1; 
--静态方法调用。         
@b := select java.math.BigInteger.valueOf(y) y from @table2;  
--实例方法调用。    
select /*+mapjoin(b)*/ x.add(y).toString() from @a a join @b b;  

--输出结果如下所示。
100000000000000000100

整个示例的运行过程如下图所示。

该UDT共有三个Stage:M1、R2和J3。如果您熟悉MapReduce原理即可知道,由于Join的存在需要做数据Reshuffle,所以会出现多个Stage。通常,不同的Stage是在不同的进程、不同的物理机器上运行的。

M1只执行new java.math.BigInteger(x)操作。

J3在不同阶段执行了java.math.BigInteger.valueOf(y)x.add(y).toString()操作。这几个操作不仅分阶段执行,而且在不同的进程、不同的物理机器上执行。UDT把这个过程封装起来,将这个过程变得看起来和在同一个JVM中执行的效果几乎一样。

从上述示例中,您可以看到子查询的结果允许UDT类型的列。例如上面变量a的x列是java.math.BigInteger类型,而不是内置类型。UDT类型的数据可以被带到下一个Operator中,再调用其它方法,甚至可以参与数据Shuffle。

功能说明

  • UDT仅支持Java语言,Java SDK的类都是默认可用的。

    说明

    Runtime使用的JDK版本是JDK1.8,可能不支持更新版本的JDK。

  • UDT支持您上传自己的JAR包,并且直接引用。当前提供了一些Flag方便您的使用。

    • set odps.sql.session.resources指定引用的资源,可以指定多个,用逗号隔开,例如set odps.sql.session.resources=foo.sh,bar.txt;

      说明

      这个Flag和select transform中指定资源的Flag相同,所以这个Flag会同时影响两个功能。例如UDT概述中UDF的JAR包,用于UDT使用。

      set odps.sql.type.system.odps2=true;
      set odps.sql.session.resources=odps-test.jar; 
      --指定要引用的JAR。这些JAR需要提前上传至Project,并且需要是JAR类型的资源。
      select new com.aliyun.odps.test.IntegerMaxValue().evaluate();
    • odps.sql.session.java.imports指定默认的Java Package,可以指定多个,用逗号隔开。和Java的import语句类似,可以提供完整类路径,例如java.math.BigInteger,也可以使用*。暂不支持static import

      UDT概述中UDF的JAR包,使用UDT功能还有如下写法。

      set odps.sql.type.system.odps2=true;
      set odps.sql.session.resources=odps-test.jar;
      set odps.sql.session.java.imports=com.aliyun.odps.test.*;  
      -- 指定默认的Package。
      select new IntegerMaxValue().evaluate();
  • UDT支持资源(Resource)的访问,您可以在SQL中通过com.aliyun.odps.udf.impl.UDTExecutionContext.get()静态方法获取ExecutionContext对象,从而访问当前的ExecutionContext,进而访问资源(例如文件资源和表格资源)。

  • UDT支持的操作:

    • 实例化对象的new操作。

    • 实例化数组的new操作,包括使用初始化列表创建数组,例如new Integer[] { 1, 2, 3 }

    • 方法调用,包括静态方法调用。

    • 域访问,包括静态域。

    说明
    • 仅支持公有方法和公有域的访问。

    • UDT中的标识符是大小写敏感的,包括Package、类名、方法名和域名。

    • 暂不支持匿名类和Lambda表达式。

    • 暂不支持无返回值的函数调用(因为UDT均出现在Expression中,没有返回值的函数调用无法嵌入到Expression中,这个问题在后续的版本中会有解决方案)。

  • UDT支持的数据类型:

    • UDT支持类型转换,支持SQL的风格,例如cast(1 as java.lang.Object)。但不支持Java风格的类型转换,例如(Object)1

    • UDT内置类型与特定Java类型有一一映射关系,详情请参见Java UDF中的数据类型映射表,这个映射关系对UDT也有效。

      • 内置类型的数据能够直接调用其映射到的Java类型的方法,例如'123'.length() , 1L.hashCode()

      • UDT类型能够直接参与内置函数或者UDF的运算, 例如chr(Long.valueOf('100')),其中Long.valueOf返回的是java.lang.Long类型的数据,而内置函数Chr接受的数据类型是内置类型BIGINT。

      • Java的PRIMITIVE类型可以自动转化为其BOXING类型,并应用前两条规则。

      说明

      部分内置的新数据类型需要先设置set odps.sql.type.system.odps2=true;才可使用,否则会报错。

    • UDT扩展了类型转换规则:

      • UDT对象可以被隐式类型转换为其基类对象。

      • UDT对象可以被强制类型转换为其基类或子类对象。

      • 没有继承关系的两个对象之间遵守原来的类型转换规则,注意这时可能会导致内容的变化。例如java.lang.Long类型的数据是可以强制转换为java.lang.Integer的,应用的是内置类型的BIGINT强制转换为INT的过程,而这个过程会导致数据内容的变化,甚至可能导致精度的损失。

      说明

      目前除隐式类型转换变成内置类型外,UDT对象不能存储到硬盘,即不能将UDT对象INSERT到表中(实际上DDL不支持UDT,不能创建这样的表)。内置类型支持BINARY,即支持自己实现序列化的过程,将byte[]的数据存储到硬盘,下次读出时再还原回来。因此需要您自己调用序列化反序列化方法,将其转换为BINARY数据类型再存储到硬盘。

      屏显的最终结果不可以是UDT类型。对于屏显的场景,由于所有的Java类都有toString()方法,而java.lang.String类型是合法的。所以Debug时,可以用这种方法观察UDT的内容。

      您也可以设置set odps.sql.udt.display.tostring=true;,MaxCompute会自动帮您把所有以UDT为最终输出的列Wrap上java.util.Objects.toString(...),以方便调试。这个Flag只对屏显语句生效,对INSERT语句不生效,所以它只在调试的过程中使用。

    • UDT支持比较完整的泛型。例如java.util.Arrays.asList(new java.math.BigInteger('1')),编译器可以根据参数类型判断出该方法的返回值是java.util.List<java.math.BigInteger>类型。

      说明

      构造函数需要指定类型参数,否则需要使用java.lang.Object,这点和Java保持一致。

      new java.util.ArrayList(java.util.Arrays.asList('1', '2'))的结果是java.util.ArrayList<Object>类型,而new java.util.ArrayList<String>(java.util.Arrays.asList('1', '2'))的结果是java.util.ArrayList<String>类型。

  • 所有的运算符都是MaxCompute SQL的语义,不是UDT的语义。例如:

    • STRING的相加操作:String.valueOf(1) + String.valueOf(2)的结果是3(STRING隐式转换为DOUBLE,并且DOUBLE相加) ,而不是12(Java中STRING相加是Concatenate的语义)。

    • =操作:SQL中的=不是赋值而是判断相等。而对于Java对象来说,判断相等应该用Equals方法,而非=操作。

  • UDT对同一对象的概念是模糊的,这是由数据的Reshuffle导致的。对象有可能会在不同进程、不同物理机器之间传输。在传输过程中,同一个对象可能分别引用了不同的对象(例如对象先被Shuffle到两台机器,然后下次又Shuffle回一起)。 所以在使用UDT时,应该使用equals方法判断相等,避免使用=判断相等。

    某行某列的对象,其内部包含的各个数据对象的相关性是可以保证的。不同行或者不同列的对象的数据相关性是不保证的。

  • UDT不能用作Shuffle Key,包括JoinGroup ByDistribute BySort ByOrder ByCluster By等结构的Key。

    UDT可以在Expression中间的任意阶段使用,但不能作为最终输出。例如,不可以使用语句group by new java.math.BigInteger('123'),但可以使用语句group by new java.math.BigInteger('123').hashCode()。因为hashCode方法的返回值是int.class类型,可以当做内置类型INT来使用(应用上述内置类型与特定Java类型规则)。

  • UDT不仅可以实现Scalar函数的功能,配合内置函数COLLECT_SET其他函数,UDT还可以实现Aggregator和Table Function功能。

功能优势

UDT的功能优势:

  • 使用简单,无需定义任何函数。

  • 支持JDK的所有功能,扩展了SQL的能力。

  • 代码可与SQL放于同一文件,便于管理。

  • 可直接使用其它类库,代码重用率高。

  • 可以使用面向对象的思想设计某些功能。

后续待完善功能:

  • 支持无返回值的函数调用,或支持(有返回值但忽略返回值)直接取操作数本身的函数调用。例如,调用List的add方法会返回执行完add操作的List。

  • 支持匿名类和Lambda表达式。

  • 支持用作Shuffle Key。

  • 支持Java外的其他语言,例如Python。

性能

因为UDT和UDF的执行过程非常接近,所以UDT与UDF的性能几乎一致。优化后的计算引擎使得UDT在特定场景下的性能更高。

  • UDT对象只有在跨进程时才需要做序列化和反序列化,因此在执行不需要数据Reshuffle的操作(如JOINAGGREGATE)时,UDT可节省序列化和反序列化的开销。

  • 因为UDT的Runtime基于Codegen而非反射实现的,所以不存在反射带来的性能损失。在您使用过程中,连续多个UDT操作会合并在一个FunctionCall里一起执行。例如在之前的示例中,values[x].add(values[y]).divide(java.math.BigInteger.valueOf(2))实际上只会调用一次UDT。所以,UDT操作的单元虽然较小,却并不会因多次函数调用而造成额外的接口开销。

安全性

在安全控制方面,UDT和UDF完全一样,都会受到Java沙箱Policy的限制。因此如果要使用受限的操作,需要打开沙箱隔离,或者申请沙箱白名单。