全部产品

Spark UDF

本文档主要介绍了如何在Spark中管理并使用用户自定义函数UDF(User Define Function)。

使用元数据服务管理用户自定义函数UDF

  • 注册UDF

    Spark元数据支持UDF使用Hive 1.2.1标准来进行开发,注册UDF的示例如下:

    CREATE FUNCTION function_name AS class_name USING resource_location_list;
    说明

    注册UDF之前请先配置任务可以访问数据湖的元数据服务,具体请参考Spark访问数据湖元数据服务

    参数名称

    参数说明

    function_name

    注册方法名,在注册前需要通过USE DatabaseName来指定此UDF的作用范围,或者以显式方式指定它的应用范围。

    class_name

    完整的class_name需要携带package信息,它的开发规范可以参考Spark和Hive的FUNCION开发规范。

    resource_location_list

    这个方法使用到的JAR包或者文件放置的位置, 需要显式指定依赖的是JAR还是URI USING (JAR 'oss://test/function.jar',FILE 'oss://test/model.csv'。

  • 查询当前数据库的所有UDF

    USE databasename;
    SHOW USER FUNCTIONS;
    说明

    如果不加USER关键词,查询到的是Spark默认的Function,默认Function不允许被删除。

  • 删除UDF

    USE databasename;
    DROP FUNCTION functionname;
    说明

    数据湖元数据管理不支持针对UDF的Alter语法,如果需要修改元数据的一些配置,请DROP对应的UDF后重新创建。

使用UDF

  1. 实现UDF。

    初始化一个Maven管理工程,并在依赖中加入如下代码:

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>1.2.1</version>
    </dependency>

    在Package的org.test.udf中实现一个Ten.java ,它会为数据加10然后进行返回。

    package org.test.udf;
    import org.apache.hadoop.hive.ql.exec.UDF;
    public class Ten extends UDF {
      public long evaluate(long value) {
        return value + 10;
      }
    }
  2. 注册UDF到Spark。

    编译maven工程为udf.jar并上传到OSS中,之后您可以注册这个方法并通过Serverless Spark来执行SQL访问它。

    from pyspark.sql import SparkSession
    if __name__ == "__main__":
      spark = SparkSession.builder.appName("UDF Example").getOrCreate()
      # create database
      spark.sql("CREATE DATABASE db LOCATION 'oss://test/db'")
      # set database active
      spark.sql("use db")
      # register the udf
      UDFspark.sql("CREATE FUNCTION addten as 'org.test.udf.Ten' USING JAR 'oss://test/udf.jar'")
      # check the udf register success 
      spark.sql("SHOW USER FUNCTIONS").show()
      # create a new table
      spark.sql("CREATE TABLE tb(col1 int, col2 string) USING parquet ""LOCATION 'oss://test/db/tb'")
      spark.sql("INSERT INTO tb VALUES(1,'a'),(2, 'b')")
      # try to use this udf
      spark.sql("SELECT addten(col1), * FROM db.tb").show()spark.stop()
  3. 在Spark SQL中调用注册到Spark的UDF。

    在注册UDF后,您无需反复进行注册,可以直接从元数据服务中调用这个方法。

    {
       "sqls":[
          "use db",
          "show user functions",
          "select addten(7)"
       ],
       "name":"UDF Example",
       "conf":{
          "spark.driver.resourceSpec":"small",
          "spark.sql.hive.metastore.version":"dla",
          "spark.dla.eni.enable":"true",
          "spark.dla.connectors":"oss",
          "spark.executor.instances":1,
          "spark.dla.job.log.oss.uri":"oss://test/spark-logs/",
          "spark.executor.resourceSpec":"small"
       }
    }

    由于addten这个UDF已经被注册到了数据库db中,执行完上述的Spark作业后,,可以在日志中看到输出17