本文为您介绍Flink自定义函数开发时的注意事项、自定义函数的分类、定义和参数传递方法。

注意事项

为了避免JAR包依赖冲突,在开发自定义函数时您需要注意以下几点:
  • 作业开发页面选择的Flink版本,请和Pom依赖中的Flink版本保持一致。
  • Flink相关依赖,scope请使用provided,即在依赖中添加<scope>provided</scope>
  • 其他第三方依赖请采用Shade方式打包,Shade打包详情请参见Apache Maven Shade Plugin

Flink 依赖冲突问题,详情请参见如何解决Flink依赖冲突问题?

UDX分类

Flink支持以下3类自定义函数。
UDX分类 描述
UDF(User Defined Scalar Function) 用户自定义标量值函数,将0个、1个或多个标量值映射到一个新的标量值。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。详情请参见自定义标量函数(UDF)
UDAF(User Defined Aggregation Function) 自定义聚合函数,将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情请参见自定义聚合函数(UDAF)
UDTF(User Defined Table-valued Function) 自定义表值函数,将0个、1个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。详情请参见自定义表值函数(UDTF)

自定义函数参数传递

如果您的自定义函数中某些参数值后期需要修改,直接在UDF代码里修改会很麻烦。此时,您可以在Flink全托管控制台更多Flink配置项中配置该参数后,在UDF代码中使用该参数。后续您可以直接在Flink全托管控制台修改该参数值,从而达到快速修改UDF参数值的目的。

自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下:
  1. 在作业开发页面右侧高级配置面板更多Flink配置中添加参数及取值。代码示例如下。
    maxJobCreationAttempts: value
  2. 在UDF代码中通过ConfigOptions.key()方式获取自定义函数参数。代码示例如下。
    public void open(FunctionContext context) throws Exception {
        LOG.info(String.format("maxJobCreationAttempts:%s%n",
        ConfigOptions.key("maxJobCreationAttempts").stringType().noDefaultValue()));
    }