使用UDF函数

当 Spark SQL 内置函数无法满足您的特定业务需求时,您可以创建自定义函数(UDF)来扩展 Spark 的功能。本文将引导您完成 Python UDF 与 Java/Scala UDF 的完整使用流程。

支持版本

仅以下引擎版本支持本文示例:

  • esr-5.x:esr-5.0.0及之后版本。

  • esr-4.x:esr-4.6.0及之后版本。

  • esr-3.x:esr-3.5.0及之后版本。

  • esr-2.x:esr-2.9.0及之后版本。

Python UDF

示例将演示一个主函数 my_adds.my_add,它通过导入两个独立的 Python 模块 module1 和 module2,分别执行加 0.5 与加 0.3 的操作,并将结果合并返回。

步骤一:下载并上传文件

为便于快速测试与验证,我们提供示例所需的代码文件。请按以下步骤完成文件获取与上传。

  1. 下载示例文件。单击以下链接,下载测试所需的三个文件:

    • module1.tgz:封装了将输入整数加 0.5 并返回浮点数的逻辑。

    • module2.tgz:封装了将输入整数加 0.3 并返回浮点数的功能。

    • my_adds.py:主函数文件,调用 PythonUDF.your_add(a) 和 PythonUDF2.add2(a),并将两个结果相加后返回最终值。

  2. 上传文件。将以下文件上传至您已授权访问的对象存储 Bucket 中。详情请参见简单上传

    • 依赖模块包:module1.tgz

    • 依赖模块包:module2.tgz

    • 主逻辑文件:my_adds.py

说明

如果您的自定义函数(UDF)需要引用自定义包,则需将自定义依赖包解压到SparkPython环境中。以下是my_adds.py的代码示例:

### my_adds.py内容
import sys

def my_add(a: int) -> float:
    # 确保依赖路径已添加到 sys.path
    if not sys.path.__contains__("./module1.tgz"):
        sys.path.insert(0, "./module1.tgz/")
        from module1 import PythonUDF
    if not sys.path.__contains__("./module2.tgz"):
        sys.path.insert(0, "./module2.tgz/")
        from module2 import PythonUDF2

    b = PythonUDF.your_add(a) + PythonUDF2.add2(a)
    return b

步骤二:注册 UDF

在 Spark SQL 中注册函数,使其可以像内置函数一样被调用。您可以根据需要选择注册为永久函数或临时函数。

  1. 进入您的工作空间,创建SparkSQL任务,详情请参见SparkSQL开发快速入门

  2. 在新增的Spark SQL页签中,使用 CREATE FUNCTION 语句注册 UDF。

    • 注册为永久函数:函数信息将持久化保存在数据目录中,可被所有 SQL 会话复用。推荐用于生产环境,便于共享和管理。

      -- AS 后的格式为 "[python文件名].[函数名]"
      -- 推荐使用公共读或已授权访问的OSS Bucket
      CREATE OR REPLACE FUNCTION adds AS "my_adds.my_add"
      USING FILE "oss://<bucket>/demo/udf/my_adds.py",
            FILE "oss://<bucket>/demo/udf/module1.tgz",
            FILE "oss://<bucket>/demo/udf/module2.tgz";
    • 注册为临时函数:函数仅在当前 SQL 会话中生效,会话结束后自动失效。推荐用于开发和测试阶段。

      -- 使用 TEMPORARY 关键字创建临时函数
      CREATE TEMPORARY FUNCTION adds AS "my_adds.my_add"
      USING FILE "oss://<bucket>/demo/udf/my_adds.py",
            FILE "oss://<bucket>/demo/udf/module1.tgz",
            FILE "oss://<bucket>/demo/udf/module2.tgz";

步骤三:使用 UDF

注册成功后,您可以在 SQL 语句中调用该函数。

  1. Spark SQL页签中执行以下 SQL 语句进行测试。

    -- 准备测试数据
    CREATE TABLE IF NOT EXISTS test_tbl (id INT, name STRING);
    TRUNCATE TABLE test_tbl;
    INSERT INTO test_tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'), (4, 'David');
    
    -- 调用已注册的 Python UDF
    SELECT id, adds(id) AS result FROM test_tbl;
  2. 查看返回结果。image

Java/Scala UDF

本示例提供了一个预编译的 Java UDF 示例 JAR 包,您无需自行开发代码或进行项目构建,即可完成函数的注册与调用。Scala UDF 的流程与本文相似。

步骤一:下载并上传文件

为便于快速测试与验证,我们提供示例所需的JAR包。请按以下步骤完成文件获取与上传。

  1. 下载示例文件。单击以下链接,下载测试所需的文件:

    udf-1.0-SNAPSHOT.jar:封装了将输入字符串追加 ":HelloWorld" 并返回的逻辑。

  2. 上传文件。将下载好的文件上传至您已授权访问的对象存储 Bucket 中。详情请参见简单上传

步骤二:注册 UDF

在 Spark SQL 中,必须先注册 UDF 才能调用。您可以根据使用场景选择注册为永久函数临时函数

  1. 进入您的工作空间,创建SparkSQL任务,详情请参见SparkSQL开发快速入门

  2. 在新增的Spark SQL页签中,使用 CREATE FUNCTION 语句注册 UDF,并通过 USING JAR 指定 JAR 文件的 OSS 路径。

    • 注册为永久函数:函数信息将持久化保存在数据目录中,可被所有 SQL 会话复用。推荐用于生产环境,便于共享和管理。

      # 注册到DLF/DLF1.0/HMS中
      # AS 后面是UDF中创建的类
      CREATE FUNCTION myfunc AS "org.example.MyUDF" 
      USING jar "oss://path/to/udf-1.0-SNAPSHOT.jar";
    • 注册为临时函数:函数仅在当前 SQL 会话中生效,会话结束后自动失效。推荐用于开发和测试阶段。

      # 使用temporary function仅在当前session生效
      CREATE TEMPORARY FUNCTION myfunc AS "org.example.MyUDF" 
      USING jar "oss://path/to/udf-1.0-SNAPSHOT.jar";

步骤三:使用 UDF

注册成功后,即可在 SQL 中调用。

  1. 任务编辑器中执行以下 SQL 语句进行测试。

    SELECT myfunc("abc");
  2. 查看返回结果。image