全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
MaxCompute

JAVA UDF开发

更新时间:2017-12-28 14:51:05

MaxCompute的UDF包括:UDF,UDAF,UDTF三种函数。通常情况下,此三种函数被统称为UDF。

实现JAVA UDF使用Maven的用户可以从Maven库中搜索”odps-sdk-udf”获取不同版本的Java SDK,相关配置信息:

  1. <dependency>
  2. <groupId>com.aliyun.odps</groupId>
  3. <artifactId>odps-sdk-udf</artifactId>
  4. <version>0.20.7</version>
  5. </dependency>

通常情况下,JAVA UDF的开发可以通过以下几种方式:

本章节中会分别给出UDF,UDAF,UDTF的代码示例,并通过两种方式给出开发UDF完整流程步骤示例(UDAF,UDTF操作步骤与UDF操作步骤一样)。

备注:

  • 关于自定义函数注册和注销、查看函数列表的相关命令语句可以参考文档函数操作
  • Java 和 MaxCompute 的数据类型对应关系,请参见 参数与返回值类型

UDF示例

下面我们将给出实现一个字符小写转换功能的UDF实现示例。

使用MaxCompute Studio开发

需要经过如下几个步骤:

  1. 工具环境准备:这里我们假设已经完成环境准备,包括:安装Studio并在Studio上创建MaxCompute项目链接以及创建MaxCompute Java Module

  2. 代码编写:在配置好的Java Module下创建java文件1

    直接选择MaxCompute java然后name里输入‘package名称.文件名’,Kind选择UDF。 然后编辑代码:

    1. package <package名称>;
    2. import com.aliyun.odps.udf.UDF;
    3. public final class Lower extends UDF {
    4. public String evaluate(String s) {
    5. if (s == null) { return null; }
    6. return s.toLowerCase();
    7. }
    8. }

    若需本地调试java udf,可以参考文档开发和调试UDF

  3. 注册MaxCompute UDF:如下图,右击UDF的java文件,选择‘Deploy to server’,弹框里选择注册到那个MaxCompute project,输入function name,Resource name也可以修改。2填写好后,点击‘OK’注册成功会有提示。

  4. 试用UDF:打开sql脚本,执行代码如select Lower_test(‘ABC’);结果如下图:3

    注意:Studio中编写sql脚本可以参考文档编写 SQL 脚本

使用Eclipse插件开发

需要经过如下几个步骤:

  1. 创建工程(这里我们假设已经在Eclipse插件创建好一个MaxCompute(原名ODPS)工程,具体操作请参考创建MaxCompute工程。)

  2. 代码编写:按照ODPS UDF框架的规定,实现函数功能,并进行编译。下面给出一个简单的代码实现:

    1. package org.alidata.odps.udf.examples;
    2. import com.aliyun.odps.udf.UDF;
    3. public final class Lower extends UDF {
    4. public String evaluate(String s) {
    5. if (s == null) { return null; }
    6. return s.toLowerCase();
    7. }
    8. }

    将这个jar包命名为”my_lower.jar”。

    备注:

  3. 添加资源:在运行UDF之前,必须指定引用的UDF代码。代码通过资源的形式添加到MaxCompute中。Java UDF必须被打成jar包,以jar资源添加到MaxCompute中,UDF框架会自动加载jar包,运行用户自定义的UDF。

    MaxCompute MapReduce也用到了资源这一特有概念,MapReduce文档中对资源的使用也有阐述。

    执行命令:

    1. add jar my_lower.jar;
    2. -- 如果存在同名的资源请将这个jar包重命名,
    3. -- 并注意修改下面示例命令中相关jar包的名字;
    4. -- 又或者直接使用-f选项覆盖原有的jar资源
  4. 注册UDF函数:jar包被上传后,使得MaxCompute有条件自动获取代码并运行。但此时仍然无法使用这个UDF,因为MaxCompute中并没有关于这个UDF的任何信息。因此需要在MaxCompute中注册一个唯一的函数名,并指定这个函数名与哪个jar资源的哪个类对应。

    执行命令:

    1. CREATE FUNCTION test_lower AS org.alidata.odps.udf.examples.Lower USING my_lower.jar;
  5. 在sql中使用此函数进行验证:

    1. select test_lower('A') from my_test_table;

UDAF 示例

UDAF 的注册方式与 UDF 基本相同,使用方式与内建函数中的 聚合函数 相同。计算平均值的 UDAF 的代码示例,如下所示:

  1. package org.alidata.odps.udf.examples;
  2. import com.aliyun.odps.io.LongWritable;
  3. import com.aliyun.odps.io.Text;
  4. import com.aliyun.odps.io.Writable;
  5. import com.aliyun.odps.udf.Aggregator;
  6. import com.aliyun.odps.udf.UDFException;
  7. /**
  8. * project: example_project
  9. * table: wc_in2
  10. * partitions: p2=1,p1=2
  11. * columns: colc,colb,cola
  12. */
  13. public class UDAFExample extends Aggregator {
  14. @Override
  15. public void iterate(Writable arg0, Writable[] arg1) throws UDFException {
  16. LongWritable result = (LongWritable) arg0;
  17. for (Writable item : arg1) {
  18. Text txt = (Text) item;
  19. result.set(result.get() + txt.getLength());
  20. }
  21. }
  22. @Override
  23. public void merge(Writable arg0, Writable arg1) throws UDFException {
  24. LongWritable result = (LongWritable) arg0;
  25. LongWritable partial = (LongWritable) arg1;
  26. result.set(result.get() + partial.get());
  27. }
  28. @Override
  29. public Writable newBuffer() {
  30. return new LongWritable(0L);
  31. }
  32. @Override
  33. public Writable terminate(Writable arg0) throws UDFException {
  34. return arg0;
  35. }
  36. }

UDTF 示例

UDTF 的注册和使用方式与 UDF 相同。代码示例如下:

  1. package org.alidata.odps.udtf.examples;
  2. import com.aliyun.odps.udf.UDTF;
  3. import com.aliyun.odps.udf.UDTFCollector;
  4. import com.aliyun.odps.udf.annotation.Resolve;
  5. import com.aliyun.odps.udf.UDFException;
  6. // TODO define input and output types, e.g., "string,string->string,bigint".
  7. @Resolve({"string,bigint->string,bigint"})
  8. public class MyUDTF extends UDTF {
  9. @Override
  10. public void process(Object[] args) throws UDFException {
  11. String a = (String) args[0];
  12. Long b = (Long) args[1];
  13. for (String t: a.split("\\s+")) {
  14. forward(t, b);
  15. }
  16. }
  17. }

MaxCompute 提供了很多内建函数来满足您的计算需求,同时您还可以通过创建自定义函数来满足不同的计算需求。详情请参见 创建自定义函数

本文导读目录