Spark UDF

本文档主要介绍了如何在Spark中管理并使用用户自定义函数UDF(User Define Function)。

重要

云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见Spark SQL开发介绍

使用元数据服务管理用户自定义函数UDF

  • 注册UDF

    Spark元数据支持UDF使用Hive 1.2.1标准来进行开发,注册UDF的示例如下:

    CREATE FUNCTION function_name AS class_name USING resource_location_list;

    参数名称

    参数说明

    function_name

    注册方法名,在注册前需要通过USE DatabaseName来指定此UDF的作用范围,或者以显式方式指定它的应用范围。

    class_name

    完整的class_name需要携带package信息,它的开发规范可以参考Spark和Hive的FUNCTION开发规范。

    resource_location_list

    这个方法使用到的JAR包或者文件放置的位置, 需要显式指定依赖的是JAR还是FILE: USING JAR 'oss://test/function.jar',FILE 'oss://test/model.csv'。

  • 查询当前数据库的所有UDF

    USE databasename;
    SHOW USER FUNCTIONS;
    说明

    如果不加USER关键词,查询到的是Spark默认的Function,默认Function不允许被删除。

  • 删除UDF

    USE databasename;
    DROP FUNCTION functionname;
    说明

    数据湖元数据管理不支持针对UDF的Alter语法,如果需要修改元数据的一些配置,请DROP对应的UDF后重新创建。

使用UDF

  1. 实现UDF。

    初始化一个Maven管理工程,并在依赖中加入如下代码:

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>1.2.1</version>
    </dependency>

    在Package的org.test.udf中实现一个Ten.java ,它会为数据加10然后进行返回。

    package org.test.udf;
    import org.apache.hadoop.hive.ql.exec.UDF;
    public class Ten extends UDF {
      public long evaluate(long value) {
        return value + 10;
      }
    }
  2. 注册UDF到Spark并使用。

    编译Maven工程为udf.jar并上传到OSS中,之后您可以注册这个方法并通过Serverless Spark来执行SQL访问它。

    -- here is the spark conf
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=5;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=sparksqltest;
    set spark.sql.hive.metastore.version=dla;
    set spark.dla.connectors=oss;
    -- here is your sql statement
    use db;
    CREATE FUNCTION addten as 'com.aliyun.dla.udf.Ten' USING JAR 'oss://path/to/your/udf.jar';
    select addten(7);
  3. 检查结果

    在注册UDF后,您无需反复进行注册,可以直接调用这个方法。

    由于addten这个UDF已经被注册到了数据库db中,执行完上述的Spark作业后,可以在日志中看到输出17