全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

编写UDF

更新时间:2017-10-22 14:28:32

MaxCompute 的 UDF 包括:UDF,UDAF,UDTF 三种函数。通常情况下,此三种函数被统称为 UDF。如果您使用 Maven,可以从 Maven 库 中搜索 odps-sdk-udf 获取不同版本的 Java SDK。

相关配置信息如下所示:

  1. <dependency>
  2. <groupId>com.aliyun.odps</groupId>
  3. <artifactId>odps-sdk-udf</artifactId>
  4. <version>0.20.7-public</version>
  5. </dependency>

注意

  • groupId,artifactId,version 信息请以在 Maven 库中查询到的信息为准 。
  • UDF 目前只支持 Java 语言接口,如果您想编写 UDF 程序,可以通过 添加资源 的方式将 UDF 代码上传到项目空间中,使用 注册函数 语句创建 UDF。

  • 本章节中会分别给出 UDF,UDAF,UDTF 的代码示例,运行 UDF 的示例请参见 UDF 开发插件介绍

  • Java 和 MaxCompute 的数据类型对应关系,请参见 参数与返回值类型

UDF 示例

下面将为您介绍实现字符小写转换功能的 UDF 完整流程。

操作步骤

  1. 编写代码。

    按照 MaxCompute UDF 框架的规定,实现函数功能,并进行编译。代码实现,示例如下:

    1. package org.alidata.odps.udf.examples;
    2. import com.aliyun.odps.udf.UDF;
    3. public final class Lower extends UDF {
    4. public String evaluate(String s) {
    5. if (s == null) { return null; }
    6. return s.toLowerCase();
    7. }
    8. }

    将这个 Jar 包命名为 my_lower.jar 。

  2. 添加资源。

    在运行 UDF 之前,必须指定引用的 UDF 代码。您编写的代码通过资源的形式添加到 MaxCompute 中。Java UDF 必须被打成 Jar 包,以 Jar 资源添加到 MaxCompute 中,UDF 框架会自动加载 Jar 包,运行用户自定义的 UDF。 MaxCompute MapReduce 也用到了资源这一特有概念,详情请参见 MapReduce

    执行如下命令:

    1. add jar my_lower.jar;
    2. -- 如果存在同名的资源请将这个 jar 包重命名,
    3. -- 并注意修改下面示例命令中相关 jar 包的名字;
    4. -- 又或者直接使用-f选项覆盖原有的 jar 资源
  3. 注册 UDF 函数。

    您的 Jar 包被上传后,使得 MaxCompute 有条件自动获取代码并运行。但此时仍然无法使用这个 UDF,因为 MaxCompute 中并没有关于这个 UDF 的任何信息。因此需要您在 MaxCompute 中注册一个唯一的函数名,并指定这个函数名与哪个 Jar 资源的哪个函数对应。关于如何注册 UDF,请参见 注册函数

    运行如下命令:

    1. CREATE FUNCTION test_lower AS org.alidata.odps.udf.examples.Lower USING my_lower.jar;

    在 SQL 中使用此函数:

    1. select test_lower('A') from my_test_table;

UDAF 示例

UDAF 的注册方式与 UDF 基本相同,使用方式与内建函数中的 聚合函数 相同。计算平均值的 UDAF 的代码示例,如下所示:

  1. package org.alidata.odps.udf.examples;
  2. import com.aliyun.odps.io.LongWritable;
  3. import com.aliyun.odps.io.Text;
  4. import com.aliyun.odps.io.Writable;
  5. import com.aliyun.odps.udf.Aggregator;
  6. import com.aliyun.odps.udf.UDFException;
  7. /**
  8. * project: example_project
  9. * table: wc_in2
  10. * partitions: p2=1,p1=2
  11. * columns: colc,colb,cola
  12. */
  13. public class UDAFExample extends Aggregator {
  14. @Override
  15. public void iterate(Writable arg0, Writable[] arg1) throws UDFException {
  16. LongWritable result = (LongWritable) arg0;
  17. for (Writable item : arg1) {
  18. Text txt = (Text) item;
  19. result.set(result.get() + txt.getLength());
  20. }
  21. }
  22. @Override
  23. public void merge(Writable arg0, Writable arg1) throws UDFException {
  24. LongWritable result = (LongWritable) arg0;
  25. LongWritable partial = (LongWritable) arg1;
  26. result.set(result.get() + partial.get());
  27. }
  28. @Override
  29. public Writable newBuffer() {
  30. return new LongWritable(0L);
  31. }
  32. @Override
  33. public Writable terminate(Writable arg0) throws UDFException {
  34. return arg0;
  35. }
  36. }

UDTF 示例

UDTF 的注册和使用方式与 UDF 相同。代码示例如下:

  1. package org.alidata.odps.udtf.examples;
  2. import com.aliyun.odps.udf.UDTF;
  3. import com.aliyun.odps.udf.UDTFCollector;
  4. import com.aliyun.odps.udf.annotation.Resolve;
  5. import com.aliyun.odps.udf.UDFException;
  6. // TODO define input and output types, e.g., "string,string->string,bigint".
  7. @Resolve({"string,bigint->string,bigint"})
  8. public class MyUDTF extends UDTF {
  9. @Override
  10. public void process(Object[] args) throws UDFException {
  11. String a = (String) args[0];
  12. Long b = (Long) args[1];
  13. for (String t: a.split("\\s+")) {
  14. forward(t, b);
  15. }
  16. }
  17. }

MaxCompute 提供了很多内建函数来满足您的计算需求,同时您还可以通过创建自定义函数来满足不同的计算需求。详情请参见 创建自定义函数

本文导读目录