本文档主要介绍了如何在Spark中管理并使用用户自定义函数UDF(User Define Function)。
使用元数据服务管理用户自定义函数UDF
注册UDF
Spark元数据支持UDF使用Hive 1.2.1标准来进行开发,注册UDF的示例如下:
CREATE FUNCTION function_name AS class_name USING resource_location_list;
说明注册UDF之前请先配置任务可以访问数据湖的元数据服务,具体请参考Spark访问数据湖元数据服务。
参数名称
参数说明
function_name
注册方法名,在注册前需要通过
USE DatabaseName
来指定此UDF的作用范围,或者以显式方式指定它的应用范围。class_name
完整的class_name需要携带package信息,它的开发规范可以参考Spark和Hive的
FUNCION
开发规范。resource_location_list
这个方法使用到的JAR包或者文件放置的位置, 需要显式指定依赖的是JAR还是URI
USING (JAR 'oss://test/function.jar',FILE 'oss://test/model.csv'。
查询当前数据库的所有UDF
USE databasename; SHOW USER FUNCTIONS;
说明如果不加
USER
关键词,查询到的是Spark默认的Function,默认Function不允许被删除。
删除UDF
USE databasename; DROP FUNCTION functionname;
说明数据湖元数据管理不支持针对UDF的Alter语法,如果需要修改元数据的一些配置,请DROP对应的UDF后重新创建。
使用UDF
实现UDF。
初始化一个Maven管理工程,并在依赖中加入如下代码:
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency>
在Package的
org.test.udf
中实现一个Ten.java
,它会为数据加10然后进行返回。package org.test.udf; import org.apache.hadoop.hive.ql.exec.UDF; public class Ten extends UDF { public long evaluate(long value) { return value + 10; } }
注册UDF到Spark。
编译maven工程为
udf.jar
并上传到OSS中,之后您可以注册这个方法并通过Serverless Spark来执行SQL访问它。from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.appName("UDF Example").getOrCreate() # create database spark.sql("CREATE DATABASE db LOCATION 'oss://test/db'") # set database active spark.sql("use db") # register the udf UDFspark.sql("CREATE FUNCTION addten as 'org.test.udf.Ten' USING JAR 'oss://test/udf.jar'") # check the udf register success spark.sql("SHOW USER FUNCTIONS").show() # create a new table spark.sql("CREATE TABLE tb(col1 int, col2 string) USING parquet ""LOCATION 'oss://test/db/tb'") spark.sql("INSERT INTO tb VALUES(1,'a'),(2, 'b')") # try to use this udf spark.sql("SELECT addten(col1), * FROM db.tb").show()spark.stop()
在Spark SQL中调用注册到Spark的UDF。
在注册UDF后,您无需反复进行注册,可以直接从元数据服务中调用这个方法。
{ "sqls":[ "use db", "show user functions", "select addten(7)" ], "name":"UDF Example", "conf":{ "spark.driver.resourceSpec":"small", "spark.sql.hive.metastore.version":"dla", "spark.dla.eni.enable":"true", "spark.dla.connectors":"oss", "spark.executor.instances":1, "spark.dla.job.log.oss.uri":"oss://test/spark-logs/", "spark.executor.resourceSpec":"small" } }
由于
addten
这个UDF已经被注册到了数据库db
中,执行完上述的Spark作业后,,可以在日志中看到输出17
。
在文档使用中是否遇到以下问题
更多建议
匿名提交