本文为您介绍一些常用的UDT示例。

使用Java数组示例

set odps.sql.type.system.odps2=true;
set odps.sql.udt.display.tostring=true;
select
    new Integer[10],    --创建一个包含10个元素的数组。
    new Integer[] {c1, c2, c3},  --通过初始化列表创建一个长度为3的数组。
    new Integer[][] { new Integer[] {c1, c2}, new Integer[] {c3, c4} },  --创建多维数组。
    new Integer[] {c1, c2, c3} [2], --通过下标操作访问数组元素。
    java.util.Arrays.asList(c1, c2, c3);    --这个创建了一个List<Integer>,也能当做array<int>来用,所以这是另一个创建内置ARRAY数据的方法。
from values (1,2,3,4) as t(c1, c2, c3, c4);

使用JSON示例

UDT的Runtime自带一个JSON的依赖(2.2.4),因此可以直接使用JSON。
set odps.sql.type.system.odps2=true;
set odps.sql.session.java.imports=java.util.*,java,com.google.gson.*; --同时import多个Package时用逗号隔开。
@a := select new Gson() gson;   --构建gson对象。
select 
gson.toJson(new ArrayList<Integer>(Arrays.asList(1, 2, 3))), --将任意对象转成JSON字符串。
cast(gson.fromJson('["a","b","c"]', List.class) as List<String>) --反序列化JSON字符串, 注意gson的接口,直接反序列化出来是List<Object>类型,所以这里强转成了List<String>,方便后续使用。
from @a;

相比于内置函数GET_JSON_OBJECT,上述用法不仅使用方便,还会在对JSON字符串多个部分做内容提取时,先将JSON字符串反序列成格式化数据,以提升工作效率。

除JSON外,MaxCompute Runtime自带的依赖还包括:commons-logging(1.1.1)commons-lang(2.5)commons-io(2.4)protobuf-java(2.4.1)

复杂类型操作示例

内置类型ARRAY和MAP与java.util.Listjava.util.Map存在映射关系。结果如下:
  • Java中实现了java.util.List或者java.util.Map接口类的对象,都可参与MaxCompute SQL的复杂类型操作。
  • MaxCompute中ARRAY、MAP的数据,能够直接调用List或者MAP的接口。
set odps.sql.type.system.odps2=true;
set odps.sql.session.java.imports=java.util.*;
select
    size(new ArrayList<Integer>()),        --对ArrayList数据调用内置函数Size。
    array(1,2,3).size(),                   --对内置类型ARRAY调用List的方法。
    sort_array(new ArrayList<Integer>()),  --对ArrayList的数据进行排序。
    al[1],                                 --虽然Java的List不支持下标操作,但ARRAY支持。
    Objects.toString(a),        --之前不支持将ARRAY类型cast成STRING,现在有绕过方法了。
    array(1,2,3).subList(1, 2)             --求出subList。
from (select new ArrayList<Integer>(array(1,2,3)) as al, array(1,2,3) as a) t;

聚合操作的实现示例

UDT实现聚合的原理是,先用内置函数COLLECT_SETCOLLECT_LIST函数将数据转变成List,之后对该List应用UDT的标量方法求得这一组数据的聚合值。

下述示例实现对BigInteger求中位数(由于数据是java.math.BigInteger类型的,所以不能直接用内置的MEDIAN函数)。
set odps.sql.session.java.imports=java.math.*;
@test_data := select * from values (1),(2),(3),(5) as t(value);
@a := select collect_list(new BigInteger(value)) values from @test_data;  -- 先把数据聚合成List。
@b := select sort_array(values) as values, values.size() cnt from @a;  -- 求中位数的逻辑,先将数据排序。
@c := select if(cnt % 2 == 1, new BigDecimal(values[cnt div 2]), new BigDecimal(values[cnt div 2 - 1].add(values[cnt div 2])).divide(new BigDecimal(2))) med from @b;
-- 最终结果。
select med.toString() from @c;

由于collect_list会先把所有数据都收集到一块,是没有办法实现Partial Aggregate的,这个做法的效率会比内置的Aggregator或者UDAF低,所以在内置Aggregator能实现的情况下,应尽量使用内置的Aggregator。同时把一个Group的所有数据都收集到一起,会增加数据倾斜的风险。

但是另一方面,如果UDAF本身的逻辑就是要将所有数据收集到一块(例如类似内置函数wm_concat的功能),此时使用上述方法,反而可能比UDAF(注意不是内置Aggregator)效率高。

表值函数的实现示例

表值函数允许输入多行多列数据,输出多行多列数据。可以按照下述原理实现:
  1. 对于输入多行多列数据,可以参考聚合函数实现的示例。
  2. 要实现多行的输出,可以用UDT方法输出一个Collection类型的数据(List或者MAP),然后调用explode函数,将Collections展开成多行。
  3. UDT本身就可以包含多个数据域,通过调用不同的getter方法获取各个域的内容即可展开成多列。
下述示例实现将一个JSON字符串的内容展开的功能。
@a := select '[{"a":"1","b":"2"},{"a":"1","b":"2"}]' str; --示例数据。
@b := select new com.google.gson.Gson().fromJson(str, java.util.List.class) l from @a; --反序列化JSON。
@c := select cast(e as java.util.Map<Object,Object>) m from @b lateral view explode(l) t as e;  --用explode打成多行。
@d := select m.get('a') as a, m.get('b') as b from @c; --展开成多列。
select a.toString() a, b.toString() b from @d; --最终结果输出(注意变量d的输出中a, b两列是Object类型)。

函数重载实现示例

MaxCompute的UDF使用重载evaluate方法的方式来重载函数。这种方式不支持泛型,所以当您需要定义一个可以接受任何数据类型的函数时,必须为每种类型都写一个evaluate函数。但是,这种方法依然无法实现个别输入类型(例如ARRAY)的重载函数。在没有提供Resolve注解(Annotation)的情况下,Python UDF或UDTF会根据参数个数决定输入参数,同时支持变长参数,但这种过于灵活的机制也会导致编译器无法静态找到某些错误。

通过UDT实现函数重载,可以很好地解决以上问题。UDT支持泛型、类继承、变长参数,为您提供灵活的函数定义方式,示例如下。
public class UDTClass {
    // 这个函数接受一个数值类型(可以是TINYINT、SMALLINT、INT、BIGINT、FLOAT、DOUBLE以及任何以Number为基类的UDT),返回DOUBLE。
    public static Double doubleValue(Number input) {
        return input.doubleValue();
    }
    // 这个方法,接受一个数值类型参数和一个任意类型的参数,返回值类型与第二个参数的类型相同。
    public static <T extends Number, R> R nullOrValue(T a, R b) {
        return a.doubleValue() > 0 ? b : null;
    }
    // 这个方法接受一个任意元素类型的ARRAY或List,返回BIGINT。
    public static Long length(java.util.List<? extends Object> input) {
        return input.size();
    }
    // 注意这个在不做强制转换的情况下参数只能接受UDT的java.util.Map<Object, Object>对象。如果需要传入任何MAP对象,例如map<bigint,bigint>可以考虑:
    // 1. 定义函数时使用java.util.Map<? extends Object, ? extends Object>。
    // 2. 调用时强转,例如UDTClass.mapSize(cast(mapObj as java.util.Map<Object, Object>))。
    public static Long mapSize(java.util.Map<Object, Object> input) {
        return input.size();
    }
}

特定场景下,UDF需要通过com.aliyun.odps.udf.ExecutionContext(在setup方法中传入)获取一些上下文;UDT也可以通过com.aliyun.odps.udt.UDTExecutionContext.get()方法获取这样的一个ExecutionContext对象。