UDF示例:复杂数据类型

本文为您介绍如何在Java UDF和Python UDF中使用复杂数据类型。

命令说明

本示例将注册一个名称为UDF_COMPLEX_DATA的自定义函数。

说明

本示例将介绍array、map、struct三种复杂数据类型的使用。Java UDF通过重载的方式使用同一个自定义函数名。Python UDF需要分别注册UDF_COMPLEX_DATA_ARRAY、UDF_COMPLEX_DATA_MAPUDF_COMPLEX_DATA_STRUCT实现上述三种数据类型的使用。

  • 命令格式:

    array<string> UDF_COMPLEX_DATA(array<bigint> <as>) 
    map<string, string> UDF_COMPLEX_DATA(map<string,bigint> <ms>) 
    struct<output_name:string,output_time:string> UDF_COMPLEX_DATA(<input_name:string,input_timestamp:bigint> <st>) 
  • 命令功能:

    将输入的时间戳转换成yyyy-MM-dd HH:mm:ss格式的时间字符串。其中入参分别使用到arraymapstruct复杂数据类型。

  • 参数说明:

    • as:ARRAY<BIGINT>类型,时间戳列表,必填。

    • ms:MAP<STRING, BIGINT>类型,其中value是时间戳,必填。

    • st:STRUCT类型,其中FIELD为input_timestamp的值是时间戳。

开发和使用步骤

1. 代码开发

Java UDF 代码示例

package com.aliyun; // package名称,可以根据您的情况定义

import com.aliyun.odps.data.Struct;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;

import java.text.SimpleDateFormat;
import java.util.*;

@Resolve("struct<input_name:string, input_timestamp:bigint>->map<string,string>")
public class ComplexDataTypeExample extends UDF{
    private static final String PATTERN = "yyyy-MM-dd HH:mm:ss";

    /**
     * 将时间戳列表转换为时间字符串列表
     * @param timestamps 时间戳列表
     * @return 时间字符串列表
     */
    public List<String> evaluate(List<Long> timestamps) {
        if (timestamps == null) {
            return null;
        }
        List<String> result = new ArrayList<>();
        SimpleDateFormat formatter = new SimpleDateFormat(PATTERN);
        for (Long timestamp : timestamps) {
            Date date = new Date(timestamp < 9999999999L ? timestamp * 1000 : timestamp);
            String dateString = formatter.format(date);
            result.add(dateString);
        }
        return result;
    }

    /**
     * 将时间戳map转换为时间字符串map
     * @param timestamps 时间戳map,其中value为时间戳
     * @return 时间字符串map
     */
    public Map<String, String> evaluate(Map<String, Long> timestamps) {
        if (timestamps == null) {
            return null;
        }
        Map<String, String> result = new HashMap<>(timestamps.size());
        SimpleDateFormat formatter = new SimpleDateFormat(PATTERN);
        for (String key : timestamps.keySet()) {
            Long timestamp = timestamps.get(key);
            Date date = new Date(timestamp < 9999999999L ? timestamp * 1000 : timestamp);
            String dateString = formatter.format(date);
            result.put(key, dateString);
        }
        return result;
    }

    /**
     * 将时间戳转换为时间字符串
     * @param input 时间戳Struct
     * @return 时间字符串Struct
     */
    public Map<String, String> evaluate(Struct input) {
        if (input == null) {
            return null;
        }
        SimpleDateFormat formatter = new SimpleDateFormat(PATTERN);
        String nameValue = (String) input.getFieldValue("input_name");
        Long timestampValue = (Long) input.getFieldValue("input_timestamp");
        Date date = new Date(timestampValue < 9999999999L ? timestampValue * 1000 : timestampValue);
        String dateString = formatter.format(date);
        Map<String, String> result = new HashMap<>(8);
        result.put("output_name", nameValue);
        result.put("output_time", dateString);
        return result;
    }
}

示例代码中,定义了3个重载的evaluate方法。其中:

  • 第一个用ARRAY作为参数,ARRAY对应java.util.List。

  • 第二个用MAP作为参数,MAP对应java.util.Map。

  • 第三个用STRUCT作为参数,STRUCT对应com.aliyun.odps.data.Struct。

    说明

    com.aliyun.odps.data.Struct无法通过反射分析获取到field name和field type,需要辅助使用@Resolve annotation,即如果您需要在UDF中使用STRUCT,要求在UDF class上也标注上@Resolve注解,该注解只会影响参数或返回值中包含com.aliyun.odps.data.Struct的重载。

使用Java语言编写UDF代码必须继承UDF类,本例中evaluate方法定义了三个string类型的入参和string类型的返回值,输入参数和返回值的数据类型将作为SQL语句中UDF的函数签名Signature,其他代码规范和要求请参考:UDF开发规范与通用流程(Java)

Python3 UDF 代码示例

下面示例中输入数据类型为map<string,bigint>,该示例代码在本文中将注册为自定义函数:UDF_COMPLEX_DATA_MAP

from odps.udf import annotate
import datetime
@annotate('map<string,bigint>->map<string,datetime>')
class MapExample:
    def evaluate(self, intput_dict):
        output_dict = dict()
        for key in intput_dict:
            value = intput_dict[key]
            t = datetime.datetime.fromtimestamp(value)
            output_dict[key] = t
        return output_dict

下面示例中输入数据类型为array<bigint>,该示例代码在本文中将注册为自定义函数:UDF_COMPLEX_DATA_ARRAY

from odps.udf import annotate
import datetime
@annotate('array<bigint>->array<datetime>')
class ArrayExample:
    def evaluate(self, intput_list):
        output_list = list()
        for item in intput_list:
            t = datetime.datetime.fromtimestamp(item)
            output_list.append(t)
        return output_list

下面示例中输入数据类型为struct<input_name:string,input_timestamp:bigint>,该示例代码在本文中将注册为自定义函数:UDF_COMPLEX_DATA_STRUCT

from odps.udf import annotate
import datetime, collections
@annotate('struct<input_name:string,input_timestamp:bigint>->struct<output_name:string,output_time:datetime>')
class StructExample:
    def evaluate(self, intput_namedtuple):
        OutputNamedTuple = collections.namedtuple('output_namedtuple', ['output_name', 'output_time'])
        name_val = intput_namedtuple.input_name
        time_val = datetime.datetime.fromtimestamp(intput_namedtuple.input_timestamp)
        output_namedtuple = OutputNamedTuple(name_val, time_val)
        return output_namedtuple

MaxCompute默认使用Python 2,可以在Session级别使用命令set odps.sql.python.version=cp37开启Python 3。更多python3 UDF规范请参考:UDF开发规范与通用流程(Python3)

2. 上传资源和注册函数

完成UDF代码开发和调试之后,将资源上传至MaxCompute并注册函数,本示例注册函数名:UDF_COMPLEX_DATA (Python UDF需要分别注册函数名:UDF_COMPLEX_DATA_ARRAY、UDF_COMPLEX_DATA_MAPUDF_COMPLEX_DATA_STRUCT)。Java UDF上传资源与注册函数详情步骤请参见:打包、上传及注册,Python UDF请参见:上传及注册

3. 使用示例

成功注册UDF后,执行以下命令,将时间戳(array类型)转换为时间字符串。

set odps.sql.python.version=cp37; -- python3 UDF需要使用该命令开启python3
SELECT UDF_COMPLEX_DATA(array(1554047999, 1554047989)); -- Java UDF调用
SELECT UDF_COMPLEX_DATA_ARRAY(array(1554047999, 1554047989)); -- Python UDF调用

执行结果如下:

+---------------------------------------------+
| _c0                                         |
+---------------------------------------------+
| [2019-03-31 23:59:59, 2019-03-31 23:59:49]  |
+---------------------------------------------+

执行以下命令,将时间戳(map类型)转换为时间字符串。

set odps.sql.python.version=cp37; -- python3 UDF需要使用该命令开启python3
SELECT UDF_COMPLEX_DATA_TYPE(map('date1', 1554047989, 'date2', 1554047999)); -- Java UDF调用
SELECT UDF_COMPLEX_DATA_TYPE_MAP(map('date1', 1554047989, 'date2', 1554047999)); -- Python UDF调用

执行结果如下:

+----------------------------------------------------------------+
| _c0                                                            |
+----------------------------------------------------------------+
| {"date1":"2019-03-31 23:59:49","date2":"2019-03-31 23:59:59"}  |
+----------------------------------------------------------------+

执行以下命令,将时间戳(struct类型)转换为时间字符串。

set odps.sql.python.version=cp37; -- python3 UDF需要使用该命令开启python3
SELECT UDF_COMPLEX_DATA_TYPE(struct('date', 1554047989)); -- Java UDF调用
SELECT UDF_COMPLEX_DATA_TYPE_MAP(struct('date', 1554047989)); -- Python UDF调用

执行结果如下:

+-------------------------------------------------------------+
| _c0                                                         |
+-------------------------------------------------------------+
| {"output_name":"date","output_time":"2019-03-31 23:59:49"}  |
+-------------------------------------------------------------+