本文将向您介绍编写和使用用户自定义函数(UDF)的方法。
自定义函数(UDF)
如果内置的函数不能满足您的需求,Flink CDC数据摄入作业也支持使用Java语言编写自定义UDF函数,并像内置函数一样调用。
此处类路径对应的JAR包需要在“更多配置”中作为附加依赖文件上传。
Flink CDC UDF定义
如需编写Flink CDC UDF类,您需要在pom.xml
中引入基础依赖包:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${CDC 社区版本号}</version>
<scope>provided</scope>
</dependency>
请参考下表选定使用的社区版本号:
实时计算 VVR 引擎版本 | 对应的社区版本号 |
1.20-vvr-11.0 及更高版本 | 3.4.0 |
1.17-vvr-8.0.11 版本 | 3.3.0 |
1.17-vvr-8.0.10 及更早版本 | 3.2.1 |
满足以下要求的Java类可以作为Flink CDC数据摄入作业UDF函数使用:
实现了
org.apache.flink.cdc.common.udf.UserDefinedFunction
接口。拥有一个公共无参构造器。
至少含有一个名为
eval
的公共方法。
UDF函数类可以通过@Override以下接口来实现更精细的语义控制:
重写
getReturnType
方法来手动指定方法的返回类型。重写
open
和close
方法来插入生命周期函数。
例如,将传入的整型参数增加1后返回的UDF函数定义如下。
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
// 由于eval函数的返回类型不明确,需要
// 使用getReturnType写明确指定类型
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}
Flink CDC UDF注册
通过在Flink CDC数据摄入作业的pipeline
块中加入如下所示的定义即可注册UDF函数:
pipeline:
user-defined-function:
- name: inc
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
此处类路径对应的JAR包,需要在更多配置中作为附加依赖文件上传。
UDF函数名称可以在此处任意调整,无需与UDF类名一致。
Flink CDC UDF使用
在完成UDF函数注册后,即可在projection和filter语句块中,像内置函数一样直接调用UDF函数。代码示例如下。
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100
Flink SQL UDF兼容性
继承自ScalarFunction
的Flink UDF函数也可以直接作为CDC YAML UDF函数注册并使用,但存在以下限制:
不支持带参数的
ScalarFunction
。Flink风格的TypeInformation类型标注会被忽略。
open
和close
生命周期钩子函数不会被调用。