MaxCompute基于新一代的SQL引擎推出新功能UDT(User Defined Type)。MaxCompute的UDT功能允许您在SQL中直接调用第三方语言的类使用其方法、或直接使用第三方对象获取其数据内容。

UDT介绍

很多SQL引擎中UDT与MaxCompute的复杂类型STRUCT类似,相比之下,MaxCompute中的UDT与Create Type的概念更类似,Type中包含数据域和方法。MaxCompute不需要用特殊的DDL语法来定义新类型,通过UDT可以在SQL中直接使用新类型。通过如下示例,为您直观地介绍UDT的功能。

例如,在SQL语句中调用Java的java.lang包。您可以使用以下两种方法:
  • 通过UDT功能在SQL语句中直接调用java.lang
    --打开新类型,因为下面的操作会用到INTEGER,即INT类型。
    set odps.sql.type.system.odps2=true;    
    SELECT java.lang.Integer.MAX_VALUE;
    和Java语言一样,java.lang包可以省略,所以上述示例可以简写为如下语句。
    set odps.sql.type.system.odps2=true;
    SELECT Integer.MAX_VALUE;
    输出结果如下。
    +-----------+
    | max_value |
    +-----------+
    | 2147483647 |
    +-----------+
  • 使用UDF在SQL语句中调用java.lang
    1. 定义一个UDF的类。
      package com.aliyun.odps.test;
      public class IntegerMaxValue extends com.aliyun.odps.udf.UDF {
        public Integer evaluate() {
           return Integer.MAX_VALUE;
        } 
      }
    2. 将上面的UDF编译,并打成Jar包,然后上传Jar包,并创建Function。
      add jar odps-test.jar;
      create function integer_max_value as 'com.aliyun.odps.test.IntegerMaxValue' using 'odps-test.jar';
    3. 在SQL中使用。
      select integer_max_value();

    由上例可以看出,UDT简化了上述一系列的过程,方便您使用其它语言扩展SQL的功能。

应用场景

UDT的常用场景如下:
  • MaxCompute没有提供,但可以使用其它语言简单实现的功能。

    例如,只需调用一次Java内置类的方法即可实现,但MaxCompute却没有提供简单的方法实现的功能。如果使用UDF实现,整个过程会过于繁杂。

  • SQL中需要调用第三方库实现相关功能。希望能够在SQL中直接调用,而不需要再Wrap一层UDF。
  • SQL中需要直接调用第三方语言源代码。Select Transform支持把脚本写到SQL语句中,提升可读性和代码易维护性。但是某些语言无法这样使用,例如Java源代码必须经过编译才能执行,通过UDT功能将这些语言也可以直接写入SQL中。

实现原理

通过以下示例为您介绍UDT的执行过程。
--示例数据。
@table1 := select * from values ('100000000000000000000') as t(x);
@table2 := select * from values (100L) as t(y);
--代码逻辑。
--new创建对象。
@a := select new java.math.BigInteger(x) x from @table1; 
--静态方法调用。         
@b := select java.math.BigInteger.valueOf(y) y from @table2;  
--实例方法调用。    
select /*+mapjoin(b)*/ x.add(y).toString() from @a a join @b b;  

--输出结果如下所示。
100000000000000000100
整个示例的运行过程如下图所示。

该UDT共有三个Stage:M1、R2和J3。如果您熟悉MapReduce原理即可知道,由于Join的存在需要做数据Reshuffle,所以会出现多个Stage。通常,不同的Stage是在不同的进程、不同的物理机器上运行的。

M1只执行new java.math.BigInteger(x)操作。

J3在不同阶段执行了java.math.BigInteger.valueOf(y)x.add(y).toString()操作。这几个操作不仅分阶段执行,而且在不同的进程、不同的物理机器上执行。UDT把这个过程封装起来,将这个过程变得看起来和在同一个JVM中执行的效果几乎一样。

从上述示例中,您可以看到子查询的结果允许UDT类型的列。例如上面变量a的x列是java.math.BigInteger类型,而不是内置类型。UDT类型的数据可以被带到下一个Operator中,再调用其它方法,甚至可以参与数据Shuffle。

功能说明

  • UDT仅支持Java语言,Java SDK的类都是默认可用的。
    说明 目前Runtime使用的JDK版本是JDK1.8,可能不支持更新版本的JDK。
  • UDT支持您上传自己的Jar包,并且直接引用。当前提供了一些Flag方便您的使用。
    • set odps.sql.session.resources指定引用的资源,可以指定多个,用逗号隔开,例如set odps.sql.session.resources=foo.sh,bar.txt;
      说明 这个Flag和select transform中指定资源的Flag相同,所以这个Flag会同时影响两个功能。例如UDT概述中UDF的Jar包,用UDT使用。
      set odps.sql.type.system.odps2=true;
      set odps.sql.session.resources=odps-test.jar; 
      --指定要引用的Jar。这些Jar需要提前上传至Project,并且需要是Jar类型的资源。
      select new com.aliyun.odps.test.IntegerMaxValue().evaluate();
    • odps.sql.session.java.imports指定默认的Java Package,可以指定多个,用逗号隔开。和Java的import语句类似,可以提供完整类路径,例如java.math.BigInteger,也可以使用*。暂不支持static import
      UDT概述中UDF的Jar包,使用UDT功能还有如下写法。
      set odps.sql.type.system.odps2=true;
      set odps.sql.session.resources=odps-test.jar;
      set odps.sql.session.java.imports=com.aliyun.odps.test.*;  
      -- 指定默认的Package。
      select new IntegerMaxValue().evaluate();
  • UDT支持资源(Resource)的访问,您可以在SQL中通过com.aliyun.odps.udf.impl.UDTExecutionContext.get()静态方法获取ExecutionContext对象,从而访问当前的ExecutionContext,进而访问资源(例如文件资源和表格资源)。
  • UDT支持的操作:
    • 实例化对象的new操作。
    • 实例化数组的new操作,包括使用初始化列表创建数组,例如new Integer[] { 1, 2, 3 }
    • 方法调用,包括静态方法调用。
    • 域访问,包括静态域。
    说明
    • 仅支持公有方法和公有域的访问。
    • UDT中的标识符是大小写敏感的,包括Package、类名、方法名和域名。
    • 暂不支持匿名类和Lambda表达式。
    • 暂不支持无返回值的函数调用(因为UDT均出现在Expression中,没有返回值的函数调用无法嵌入到Expression中,这个问题在后续的版本中会有解决方案)。
  • UDT支持的数据类型:
    • UDT支持类型转换,支持SQL的风格,例如cast(1 as java.lang.Object)。但不支持Java风格的类型转换,例如(Object)1
    • UDT内置类型与特定Java类型有一一映射关系,详情请参见Java UDF中的数据类型映射表,这个映射关系对UDT也有效。
      • 内置类型的数据能够直接调用其映射到的Java类型的方法,例如'123'.length() , 1L.hashCode()
      • UDT类型能够直接参与内置函数或者UDF的运算, 例如chr(Long.valueOf('100')),其中Long.valueOf返回的是java.lang.Long类型的数据,而内置函数Chr接受的数据类型是内置类型BIGINT。
      • Java的PRIMITIVE类型可以自动转化为其BOXING类型,并应用前两条规则。
      说明 部分内置的新数据类型需要先设置set odps.sql.type.system.odps2=true;才可使用,否则会报错。
    • UDT扩展了类型转换规则:
      • UDT对象可以被隐式类型转换为其基类对象。
      • UDT对象可以被强制类型转换为其基类或子类对象。
      • 没有继承关系的两个对象之间遵守原来的类型转换规则,注意这时可能会导致内容的变化。例如java.lang.Long类型的数据是可以强制转换为java.lang.Integer的,应用的是内置类型的BIGINT强制转换为INT的过程,而这个过程会导致数据内容的变化,甚至可能导致精度的损失。
      说明 目前除隐式类型转换变成内置类型外,UDT对象不能存储到硬盘,即不能将UDT对象INSERT到表中(实际上DDL不支持UDT,不能创建这样的表)。内置类型支持BINARY,即支持自己实现序列化的过程,将byte[]的数据存储到硬盘,下次读出时再还原回来。因此需要您自己调用序列化反序列化方法,变成BINARY数据类型再存储到硬盘。

      屏显的最终结果不可以是UDT类型。对于屏显的场景,由于所有的Java类都有toString()方法,而java.lang.String类型是合法的。所以Debug时,可以用这种方法观察UDT的内容。

      您也可以设置set odps.sql.udt.display.tostring=true;,MaxCompute会自动帮您把所有以UDT为最终输出的列Wrap上java.util.Objects.toString(...),以方便调试。这个Flag只对屏显语句生效,对INSERT语句不生效,所以它只在调试的过程中使用。

    • UDT支持比较完整的泛型。例如java.util.Arrays.asList(new java.math.BigInteger('1')),编译器可以根据参数类型判断出该方法的返回值是java.util.List<java.math.BigInteger>类型。
      说明 构造函数需要指定类型参数,否则需要使用java.lang.Object,这点和Java保持一致。

      new java.util.ArrayList(java.util.Arrays.asList('1', '2'))的结果是java.util.ArrayList<Object>类型,而new java.util.ArrayList<String>(java.util.Arrays.asList('1', '2'))的结果是java.util.ArrayList<String>类型。

  • 所有的运算符都是MaxCompute SQL的语义,不是UDT的语义。例如:
    • STRING的相加操作:String.valueOf(1) + String.valueOf(2)的结果是3(STRING隐式转换为DOUBLE,并且DOUBLE相加) ,而不是12(Java中STRING相加是Concatenate的语义)。
    • =操作:SQL中的=不是赋值而是判断相等。而对于Java对象来说,判断相等应该用Equals方法,而非=操作。
  • UDT对同一对象的概念是模糊的,这是由数据的Reshuffle导致的。对象有可能会在不同进程、不同物理机器之间传输。在传输过程中,同一个对象可能分别引用了不同的对象(例如对象先被Shuffle到两台机器,然后下次又Shuffle回一起)。 所以在使用UDT时,应该使用equals方法判断相等,避免使用=判断相等。

    某行某列的对象,其内部包含的各个数据对象的相关性是可以保证的。不同行或者不同列的对象的数据相关性是不保证的。

  • UDT不能用作Shuffle Key,包括JoinGroup ByDistribute BySort ByOrder ByCluster By等结构的Key。

    UDT可以在Expression中间的任意阶段使用,但不能作为最终输出。例如,不可以使用语句group by new java.math.BigInteger('123'),但可以使用语句group by new java.math.BigInteger('123').hashCode()。因为hashCode方法的返回值是int.class类型,可以当做内置类型INT来使用(应用上述内置类型与特定Java类型规则)。

  • UDT不仅可以实现Scalar函数的功能,配合内置函数COLLECT_SETEXPLODE,UDT还可以实现Aggregator和Table Function功能。

功能优势

UDT在功能优势:
  • 使用简单,无需定义任何Function。
  • 支持JDK的所有功能,扩展了SQL的能力。
  • 代码可与SQL放于同一文件,便于管理。
  • 可直接使用其它类库,代码重用率高。
  • 可以使用面向对象的思想设计某些功能。
后续待完善功能:
  • 支持无返回值的函数调用,或支持(有返回值但忽略返回值)直接取操作数本身的函数调用。例如,调用List的add方法会返回执行完add操作的List。
  • 支持匿名类和Lambda表达式。
  • 支持用作Shuffle Key。
  • 支持Java外的其他语言,例如Python。

性能

因为UDT和UDF的执行过程非常接近,所以UDT与UDF的性能几乎一致。优化后的计算引擎使得UDT在特定场景下的性能更高。
  • UDT对象只有在跨进程时才需要做序列化和反序列化,因此在执行不需要数据Reshuffle的操作(如JOINGGREGATE)时,UDT可节省序列化和反序列化的开销。
  • 因为UDT的Runtime基于Codegen而非反射实现的,所以不存在反射带来的性能损失。在您使用过程中,连续多个UDT操作会合并在一个FunctionCall里一起执行。例如在之前的示例中,values[x].add(values[y]).divide(java.math.BigInteger.valueOf(2))实际上只会调用一次UDT。所以,UDT操作的单元虽然较小,却并不会因多次函数调用而造成接口额外开销。

安全性

在安全控制方面,UDT和UDF完全一样,都会受到Java沙箱Policy的限制。因此如果要使用受限的操作,需要打开沙箱隔离,或者申请沙箱白名单。