Flink CDC自定义函数

本文将向您介绍编写和使用用户自定义函数(UDF)的方法。

自定义函数(UDF)

如果内置的函数不能满足您的需求,Flink CDC数据摄入作业也支持使用Java语言编写自定义UDF函数,并像内置函数一样调用。

说明

此处类路径对应的JAR包需要在“更多配置”中作为附加依赖文件上传。

Flink CDC UDF定义

如需编写Flink CDC UDF类,您需要在pom.xml中引入基础依赖包:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cdc-common</artifactId>
    <version>${CDC 社区版本号}</version>
    <scope>provided</scope>
</dependency>

请参考下表选定使用的社区版本号:

实时计算 VVR 引擎版本

对应的社区版本号

1.20-vvr-11.0 及更高版本

3.4.0

1.17-vvr-8.0.11 版本

3.3.0

1.17-vvr-8.0.10 及更早版本

3.2.1

满足以下要求的Java类可以作为Flink CDC数据摄入作业UDF函数使用:

  • 实现了org.apache.flink.cdc.common.udf.UserDefinedFunction接口。

  • 拥有一个公共无参构造器。

  • 至少含有一个名为eval的公共方法。

UDF函数类可以通过@Override以下接口来实现更精细的语义控制:

  • 重写getReturnType方法来手动指定方法的返回类型。

  • 重写openclose方法来插入生命周期函数。

例如,将传入的整型参数增加1后返回的UDF函数定义如下。

public class AddOneFunctionClass implements UserDefinedFunction {
    
    public Object eval(Integer num) {
        return num + 1;
    }
    
    @Override
    public DataType getReturnType() {
        // 由于eval函数的返回类型不明确,需要
        // 使用getReturnType写明确指定类型
        return DataTypes.INT();
    }
    
    @Override
    public void open() throws Exception {
        // ...
    }

    @Override
    public void close() throws Exception {
        // ...
    }
}

Flink CDC UDF注册

通过在Flink CDC数据摄入作业的pipeline块中加入如下所示的定义即可注册UDF函数:

pipeline:
  user-defined-function:
    - name: inc
      classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
    - name: format
      classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
说明
  • 此处类路径对应的JAR包,需要在更多配置中作为附加依赖文件上传。

  • UDF函数名称可以在此处任意调整,无需与UDF类名一致。

Flink CDC UDF使用

在完成UDF函数注册后,即可在projectionfilter语句块中,像内置函数一样直接调用UDF函数。代码示例如下。

transform:
  - source-table: db.\.*
    projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
    filter: inc(id) < 100

Flink SQL UDF兼容性

继承自ScalarFunctionFlink UDF函数也可以直接作为CDC YAML UDF函数注册并使用,但存在以下限制:

  • 不支持带参数的ScalarFunction

  • Flink风格的TypeInformation类型标注会被忽略。

  • openclose生命周期钩子函数不会被调用。