当 Spark SQL 内置函数无法满足您的特定业务需求时,您可以创建自定义函数(UDF)来扩展 Spark 的功能。本文将引导您完成 Python UDF 与 Java/Scala UDF 的完整使用流程。
支持版本
仅以下引擎版本支持本文示例:
esr-5.x:esr-5.0.0及之后版本。
esr-4.x:esr-4.6.0及之后版本。
esr-3.x:esr-3.5.0及之后版本。
esr-2.x:esr-2.9.0及之后版本。
Python UDF
示例将演示一个主函数 my_adds.my_add,它通过导入两个独立的 Python 模块 module1 和 module2,分别执行加 0.5 与加 0.3 的操作,并将结果合并返回。
步骤一:下载并上传文件
为便于快速测试与验证,我们提供示例所需的代码文件。请按以下步骤完成文件获取与上传。
下载示例文件。单击以下链接,下载测试所需的三个文件:
module1.tgz:封装了将输入整数加
0.5并返回浮点数的逻辑。module2.tgz:封装了将输入整数加
0.3并返回浮点数的功能。my_adds.py:主函数文件,调用
PythonUDF.your_add(a)和PythonUDF2.add2(a),并将两个结果相加后返回最终值。
上传文件。将以下文件上传至您已授权访问的对象存储 Bucket 中。详情请参见简单上传。
依赖模块包:
module1.tgz依赖模块包:
module2.tgz主逻辑文件:
my_adds.py
如果您的自定义函数(UDF)需要引用自定义包,则需将自定义依赖包解压到Spark的Python环境中。以下是my_adds.py的代码示例:
### my_adds.py内容
import sys
def my_add(a: int) -> float:
# 确保依赖路径已添加到 sys.path
if not sys.path.__contains__("./module1.tgz"):
sys.path.insert(0, "./module1.tgz/")
from module1 import PythonUDF
if not sys.path.__contains__("./module2.tgz"):
sys.path.insert(0, "./module2.tgz/")
from module2 import PythonUDF2
b = PythonUDF.your_add(a) + PythonUDF2.add2(a)
return b步骤二:注册 UDF
在 Spark SQL 中注册函数,使其可以像内置函数一样被调用。您可以根据需要选择注册为永久函数或临时函数。
进入您的工作空间,创建SparkSQL任务,详情请参见SparkSQL开发快速入门。
在新增的Spark SQL页签中,使用
CREATE FUNCTION语句注册 UDF。注册为永久函数:函数信息将持久化保存在数据目录中,可被所有 SQL 会话复用。推荐用于生产环境,便于共享和管理。
-- AS 后的格式为 "[python文件名].[函数名]" -- 推荐使用公共读或已授权访问的OSS Bucket CREATE OR REPLACE FUNCTION adds AS "my_adds.my_add" USING FILE "oss://<bucket>/demo/udf/my_adds.py", FILE "oss://<bucket>/demo/udf/module1.tgz", FILE "oss://<bucket>/demo/udf/module2.tgz";注册为临时函数:函数仅在当前 SQL 会话中生效,会话结束后自动失效。推荐用于开发和测试阶段。
-- 使用 TEMPORARY 关键字创建临时函数 CREATE TEMPORARY FUNCTION adds AS "my_adds.my_add" USING FILE "oss://<bucket>/demo/udf/my_adds.py", FILE "oss://<bucket>/demo/udf/module1.tgz", FILE "oss://<bucket>/demo/udf/module2.tgz";
步骤三:使用 UDF
注册成功后,您可以在 SQL 语句中调用该函数。
在Spark SQL页签中执行以下 SQL 语句进行测试。
-- 准备测试数据 CREATE TABLE IF NOT EXISTS test_tbl (id INT, name STRING); TRUNCATE TABLE test_tbl; INSERT INTO test_tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'), (4, 'David'); -- 调用已注册的 Python UDF SELECT id, adds(id) AS result FROM test_tbl;查看返回结果。

Java/Scala UDF
本示例提供了一个预编译的 Java UDF 示例 JAR 包,您无需自行开发代码或进行项目构建,即可完成函数的注册与调用。Scala UDF 的流程与本文相似。
步骤一:下载并上传文件
为便于快速测试与验证,我们提供示例所需的JAR包。请按以下步骤完成文件获取与上传。
下载示例文件。单击以下链接,下载测试所需的文件:
udf-1.0-SNAPSHOT.jar:封装了将输入字符串追加
":HelloWorld"并返回的逻辑。上传文件。将下载好的文件上传至您已授权访问的对象存储 Bucket 中。详情请参见简单上传。
步骤二:注册 UDF
在 Spark SQL 中,必须先注册 UDF 才能调用。您可以根据使用场景选择注册为永久函数或临时函数。
进入您的工作空间,创建SparkSQL任务,详情请参见SparkSQL开发快速入门。
在新增的Spark SQL页签中,使用
CREATE FUNCTION语句注册 UDF,并通过USING JAR指定 JAR 文件的 OSS 路径。注册为永久函数:函数信息将持久化保存在数据目录中,可被所有 SQL 会话复用。推荐用于生产环境,便于共享和管理。
# 注册到DLF/DLF1.0/HMS中 # AS 后面是UDF中创建的类 CREATE FUNCTION myfunc AS "org.example.MyUDF" USING jar "oss://path/to/udf-1.0-SNAPSHOT.jar";注册为临时函数:函数仅在当前 SQL 会话中生效,会话结束后自动失效。推荐用于开发和测试阶段。
# 使用temporary function仅在当前session生效 CREATE TEMPORARY FUNCTION myfunc AS "org.example.MyUDF" USING jar "oss://path/to/udf-1.0-SNAPSHOT.jar";
步骤三:使用 UDF
注册成功后,即可在 SQL 中调用。
在任务编辑器中执行以下 SQL 语句进行测试。
SELECT myfunc("abc");查看返回结果。
