本文为您介绍自定义函数从Blink迁移到Flink全托管的迁移说明。
POM依赖
POM中依赖的Flink版本需要和作业使用的引擎版本相同。因此您可以按照以下步骤进行。
在Flink全托管开发控制台查看VVR镜像中的Flink版本信息。
在
页面右侧更多配置的引擎版本中,查看VVR镜像版本。例如下图中的引擎版本为vvr-4.0.18-flink-1.13。确认VVR镜像版本为vvr-a-flink-b后,POM中依赖的Flink版本即为b.0。在POM文件中写入如下依赖。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.13.0</version> <scope>provided</scope> </dependency>
说明如果您对作业中的引擎版本进行了升级,则无需更改依赖并重新编译UDF JAR包。
自定义标量函数(UDF)
getParameterTypes与getResultType
如果您的Blink UDF中实现了getParameterTypes与getResultType接口,则您需要删除这两个方法。目前Flink全托管通过一套全新的注解@DataTypeHint指定输入和输出类型,来实现UDF的类型推导。代码示例如下。
import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.InputGroup; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.types.Row; // 有多个eval方法的自定义函数。 public static class OverloadedFunction extends ScalarFunction { // no hint required public Long eval(long a, long b) { return a + b; } // 定义输出的decimal的精度和小数位。 public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) { return BigDecimal.valueOf(a + b); } // 定义输出的row的数据类型。 @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>") public Row eval(int i) { return Row.of(String.valueOf(i), Instant.ofEpochSecond(i)); } // 允许任意类型的输入,并输出序列化定制后的值。 @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class) public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) { return MyUtils.serializeToByteBuffer(o); } }
关于所有数据类型的名称,请参见list-of-data-types。
FunctionContext#getJobParameter
在Blink中,UDF可以通过FunctionContext#getJobParameter方法获取作业的所有配置项,这样就可以通过修改配置项控制UDF的行为。
但在Flink中,FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此Blink UDF迁移至Flink后,需要在Flink开发控制台
的目标作业部署详情中,将UDF用到的配置项全部写入pipeline.global-job-parameters中,才能在UDF中通过FunctionContext#getJobParameter获取作业的所有配置项。自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下:
在pipeline.global-job-parameters配置项,代码示例如下。
页面,单击目标作业名称,单击部署详情页签的运行参数配置区域的编辑,在其他配置中添加pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'
该配置定义了以下的map。
key
value
k1
{hi,hello}
k2
str:ing,str:ing
k3
str"ing,str:ing
pipeline.global-job-parameters配置项填写的具体操作步骤如下。
步骤
动作
具体操作
示例
步骤1
重新定义key-value。
将key和value之间通过冒号(:)分隔,并将每一对key-value用单引号(')包围起来。
说明如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。
如果key或value中含有半角冒号(:)和双引号("),则需要通过连写两个双引号("")进行转义。
当
key = k1,value = {hi,hello}
,则定义为'k1:{hi,hello}'
。当
key = k2,value = str:ing,str:ing
,则定义为'k2:"str:ing,str:ing"'
当
key = k3,value = str"ing,str:ing
,则定义为'k3:"str""ing,str:ing"'
步骤2
按照YAML文件的格式,形成最终的pipeline.global-job-parameters。
将不同的key-value放在不同的行里,并将所有key-value用逗号(,)连接。
说明YAML文件的多行字符串以竖线(| )开始。
YAML文件的多行字符串,每一行需要有相同的缩进。
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'
在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容。
代码示例如下。
context.getJobParameter("k1", null); // 获得字符串 {hi,hello}。 context.getJobParameter("k2", null); // 获得字符串 str:ing,str:ing。 context.getJobParameter("k3", null); // 获得字符串 str"ing,str:ing。 context.getJobParameter("pipeline.global-job-parameters", null); // null,只能获得pipeline.global-job-parameters里定义的内容,而不能获得任意的作业配置项。
自定义聚合函数(UDAF)
getResultType
如果您的Blink UDAF中实现了getResultType方法用于设定返回类型,则您需要删除这个方法并通过@FunctionHint注解设定返回类型。其写法如下。
@FunctionHint( output = @DataTypeHint("DECIMAL(12, 3)") ) public class MyUDAF extends AggregateFunction<BigDecimal, MyAcc> { @Override public BigDecimal getValue(MyAcc acc) { ... } }
关于@DataTypeHint的具体写法,请参见自定义标量函数(UDF)getParameterTypes与getResultType。
getAccumulatorType
如果您的Blink UDAF中实现了getAccumulatorType方法用于设定accumulator类型,则您需要删除这个方法并通过@FunctionHint注解指定accumulator类型。其写法如下。
@FunctionHint( accumulator = @DataTypeHint(value = "RAW", bridgedTo = MyAcc.class) ) public class MyUDAF extends AggregateFunction<BigDecimal, MyAcc> { void accumulate(MyAcc acc, int in) { ... } }
关于@DataTypeHint的具体写法,请参见自定义标量函数(UDF)getParameterTypes与getResultType。
getUserDefinedInputTypes
如果您的Blink UDAF中实现了getUserDefinedInputTypes方法用于设定输入类型,则您需要删除这个方法并通过@DataTypeHint注解指定输入类型,其写法如下。
public class MyUDAF extends AggregateFunction<BigDecimal, MyAcc> { void accumulate(MyAcc acc, @DataTypeHint("DECIMAL(12, 3)") BigDecimal in) { ... } }
关于@DataTypeHint的具体写法,请参见自定义标量函数(UDF)getParameterTypes与getResultType。
计算输入的int或decimal的sum,并返回指定精度的decimal的Flink全托管UDAF代码示例如下。
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.AggregateFunction;
import java.math.BigDecimal;
@FunctionHint(
accumulator = @DataTypeHint(value = "RAW", bridgedTo = MyUDAF.MyAcc.class),
output = @DataTypeHint("DECIMAL(12, 3)"))
public class MyUDAF extends AggregateFunction<BigDecimal, MyUDAF.MyAcc> {
public void accumulate(MyAcc acc, int in) {
acc.add(in);
}
public void accumulate(MyAcc acc, @DataTypeHint("DECIMAL(12, 3)") BigDecimal in) {
acc.add(in.longValue());
}
public void retract(MyAcc acc, int in) {
acc.add(-in);
}
public void retract(MyAcc acc, @DataTypeHint("DECIMAL(12, 3)") BigDecimal in) {
acc.add(-in.longValue());
}
public void merge(MyAcc acc, Iterable<MyAcc> it) {
for (MyAcc other : it) {
acc.add(other.getValue());
}
}
@Override
public BigDecimal getValue(MyAcc acc) {
return BigDecimal.valueOf(acc.getValue());
}
@Override
public MyAcc createAccumulator() {
return new MyAcc();
}
public static class MyAcc {
private long value = 0;
public long getValue() {
return value;
}
public void add(long v) {
value += v;
}
}
}
通过以下SQL可以使用上述UDAF。
CREATE TABLE T (
a INT,
b DECIMAL(12, 3)
) WITH (
...
);
SELECT * FROM T;
/*
+-------------+----------------+
| a | b |
+-------------+----------------+
| 1 | 10.000 |
| 2 | 20.000 |
+-------------+----------------+
*/
CREATE FUNCTION my_udaf as 'package.name.MyUDAF';
select my_udaf(a), my_udaf(b) from T;
/*
+----------------+----------------+
| EXPR$0 | EXPR$1 |
+----------------+----------------+
| 3.000 | 30.000 |
+----------------+----------------+
*/
自定义表值函数(UDTF)
getParameterTypes与getResultType
如果您的Blink UDTF实现了getParameterTypes与getResultType方法指定输入与输出类型,则您需要删除这两个方法并使用注解指定类型。您可以通过以下任意一种方式进行:
如果每个eval方法的输出类型一致,则可以使用以下写法。
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>")) public static class OverloadedFunction extends TableFunction<Row> { public void eval(int a, int b) { collect(Row.of("Sum", a + b)); } public void eval() { collect(Row.of("Empty args", -1)); } }
如果同一个eval方法定义多种输入输出类型,则可以使用以下写法。
// 输入两个int,返回int。 @FunctionHint( input = {@DataTypeHint("INT"), @DataTypeHint("INT")}, output = @DataTypeHint("INT") ) // 输入两个bigint,返回bigint。 @FunctionHint( input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")}, output = @DataTypeHint("BIGINT") ) // 没有入参,返回boolean。 @FunctionHint( input = {}, output = @DataTypeHint("BOOLEAN") ) public static class OverloadedFunction extends TableFunction<Object> { public void eval(Object... o) { if (o.length == 0) { collect(false); } collect(o[0]); } }
如果需要指定特定eval方法的输入类型,或需要了解@DataTypeHint的具体写法,详情请参见getParameterTypes与getResultType。
FunctionContext#getJobParameter
根据输入的常量确定返回类型
Blink UDF、UDAF和UDTF提供DataType getResultType(Object[] arguments, Class[] argTypes) 方法。如果输入的第i个参数(从0开始)为常量,则arguments[i] 将会包含这个常量。该接口允许您根据输入的常量不同,而产生不同的返回类型。
Flink全托管不提供该接口,但有一个等价的接口TypeInference getTypeInference(DataTypeFactory typeFactory)。代码示例如下。
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;
public static class LiteralFunction extends ScalarFunction {
// 该eval方法根据type值的不同,返回类型也不同。
public Object eval(String s, String type) {
switch (type) {
case "INT":
return Integer.valueOf(s);
case "DOUBLE":
return Double.valueOf(s);
case "STRING":
default:
return s;
}
}
// 禁用自动的反射式类型推导,使用如下逻辑进行类型推导。
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
// 指定输入参数的类型,必要时参数会被隐式转换。
.typedArguments(DataTypes.STRING(), DataTypes.STRING())
// 描述一个根据输入的字符串常量推导返回类型的strategy。
.outputTypeStrategy(callContext -> {
// 检查参数1必须是非NULL常量。
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
// 基于参数1的值返回数据类型。
final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");
switch (literal) {
case "INT":
return Optional.of(DataTypes.INT().notNull());
case "DOUBLE":
return Optional.of(DataTypes.DOUBLE().notNull());
case "STRING":
default:
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}