自定义函数迁移指南

本文为您介绍自定义函数从Blink迁移到Flink全托管的迁移说明。

POM依赖

POM中依赖的Flink版本需要和作业使用的引擎版本相同。因此您可以按照以下步骤进行。

  1. 在Flink全托管开发控制台查看VVR镜像中的Flink版本信息。

    数据开发 > ETL页面右侧更多配置引擎版本中,查看VVR镜像版本。例如下图中的引擎版本为vvr-4.0.18-flink-1.13引擎版本

  2. 确认VVR镜像版本为vvr-a-flink-b后,POM中依赖的Flink版本即为b.0。在POM文件中写入如下依赖。

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>1.13.0</version>
      <scope>provided</scope>
    </dependency>
    说明

    如果您对作业中的引擎版本进行了升级,则无需更改依赖并重新编译UDF JAR包。

自定义标量函数(UDF)

  • getParameterTypes与getResultType

    如果您的Blink UDF中实现了getParameterTypes与getResultType接口,则您需要删除这两个方法。目前Flink全托管通过一套全新的注解@DataTypeHint指定输入和输出类型,来实现UDF的类型推导。代码示例如下。

    import org.apache.flink.table.annotation.DataTypeHint;
    import org.apache.flink.table.annotation.InputGroup;
    import org.apache.flink.table.functions.ScalarFunction;
    import org.apache.flink.types.Row;
    
    // 有多个eval方法的自定义函数。
    public static class OverloadedFunction extends ScalarFunction {
    
      // no hint required
      public Long eval(long a, long b) {
        return a + b;
      }
    
      // 定义输出的decimal的精度和小数位。
      public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
        return BigDecimal.valueOf(a + b);
      }
    
      // 定义输出的row的数据类型。
      @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
      public Row eval(int i) {
        return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
      }
    
      // 允许任意类型的输入,并输出序列化定制后的值。
      @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
      public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
        return MyUtils.serializeToByteBuffer(o);
      }
    }

    关于所有数据类型的名称,请参见list-of-data-types

  • FunctionContext#getJobParameter

    在Blink中,UDF可以通过FunctionContext#getJobParameter方法获取作业的所有配置项,这样就可以通过修改配置项控制UDF的行为。

    但在Flink中,FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此Blink UDF迁移至Flink后,需要在Flink开发控制台运维中心 > 作业运维的目标作业部署详情中,将UDF用到的配置项全部写入pipeline.global-job-parameters中,才能在UDF中通过FunctionContext#getJobParameter获取作业的所有配置项。

    自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下:

    1. 运维中心 > 作业运维页面,单击目标作业名称,单击部署详情页签的运行参数配置区域的编辑,在其他配置中添加pipeline.global-job-parameters配置项,代码示例如下。

      pipeline.global-job-parameters: | 
        'k1:{hi,hello}',
        'k2:"str:ing,str:ing"',
        'k3:"str""ing,str:ing"'

      该配置定义了以下的map。

      key

      value

      k1

      {hi,hello}

      k2

      str:ing,str:ing

      k3

      str"ing,str:ing

      pipeline.global-job-parameters配置项填写的具体操作步骤如下。

      步骤

      动作

      具体操作

      示例

      步骤1

      重新定义key-value。

      将key和value之间通过冒号(:)分隔,并将每一对key-value用单引号(')包围起来。

      说明
      • 如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。

      • 如果key或value中含有半角冒号(:)和双引号("),则需要通过连写两个双引号("")进行转义。

      • key = k1,value = {hi,hello},则定义为'k1:{hi,hello}'

      • key = k2,value = str:ing,str:ing,则定义为'k2:"str:ing,str:ing"'

      • key = k3,value = str"ing,str:ing,则定义为'k3:"str""ing,str:ing"'

      步骤2

      按照YAML文件的格式,形成最终的pipeline.global-job-parameters

      将不同的key-value放在不同的行里,并将所有key-value用逗号(,)连接。

      说明
      • YAML文件的多行字符串以竖线(| )开始。

      • YAML文件的多行字符串,每一行需要有相同的缩进。

      pipeline.global-job-parameters: | 
        'k1:{hi,hello}',
        'k2:"str:ing,str:ing"',
        'k3:"str""ing,str:ing"'
    2. 在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容。

      代码示例如下。

      context.getJobParameter("k1", null); // 获得字符串 {hi,hello}。
      context.getJobParameter("k2", null); // 获得字符串 str:ing,str:ing。
      context.getJobParameter("k3", null); // 获得字符串 str"ing,str:ing。
      context.getJobParameter("pipeline.global-job-parameters", null); // null,只能获得pipeline.global-job-parameters里定义的内容,而不能获得任意的作业配置项。

自定义聚合函数(UDAF)

  • getResultType

    如果您的Blink UDAF中实现了getResultType方法用于设定返回类型,则您需要删除这个方法并通过@FunctionHint注解设定返回类型。其写法如下。

    @FunctionHint(
        output = @DataTypeHint("DECIMAL(12, 3)")
    )
    public class MyUDAF extends AggregateFunction<BigDecimal, MyAcc> {
    
        @Override
        public BigDecimal getValue(MyAcc acc) { ... }
    }

    关于@DataTypeHint的具体写法,请参见自定义标量函数(UDF)getParameterTypes与getResultType

  • getAccumulatorType

    如果您的Blink UDAF中实现了getAccumulatorType方法用于设定accumulator类型,则您需要删除这个方法并通过@FunctionHint注解指定accumulator类型。其写法如下。

    @FunctionHint(
        accumulator = @DataTypeHint(value = "RAW", bridgedTo = MyAcc.class)
    )
    public class MyUDAF extends AggregateFunction<BigDecimal, MyAcc> {
        void accumulate(MyAcc acc, int in) { ... }
    }

    关于@DataTypeHint的具体写法,请参见自定义标量函数(UDF)getParameterTypes与getResultType

  • getUserDefinedInputTypes

    如果您的Blink UDAF中实现了getUserDefinedInputTypes方法用于设定输入类型,则您需要删除这个方法并通过@DataTypeHint注解指定输入类型,其写法如下。

    public class MyUDAF extends AggregateFunction<BigDecimal, MyAcc> {
        void accumulate(MyAcc acc, @DataTypeHint("DECIMAL(12, 3)") BigDecimal in) { ... }
    }

    关于@DataTypeHint的具体写法,请参见自定义标量函数(UDF)getParameterTypes与getResultType

计算输入的int或decimal的sum,并返回指定精度的decimal的Flink全托管UDAF代码示例如下。

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.AggregateFunction;

import java.math.BigDecimal;

@FunctionHint(
        accumulator = @DataTypeHint(value = "RAW", bridgedTo = MyUDAF.MyAcc.class),
        output = @DataTypeHint("DECIMAL(12, 3)"))
public class MyUDAF extends AggregateFunction<BigDecimal, MyUDAF.MyAcc> {

    public void accumulate(MyAcc acc, int in) {
        acc.add(in);
    }

    public void accumulate(MyAcc acc, @DataTypeHint("DECIMAL(12, 3)") BigDecimal in) {
        acc.add(in.longValue());
    }

    public void retract(MyAcc acc, int in) {
        acc.add(-in);
    }

    public void retract(MyAcc acc, @DataTypeHint("DECIMAL(12, 3)") BigDecimal in) {
        acc.add(-in.longValue());
    }

    public void merge(MyAcc acc, Iterable<MyAcc> it) {
        for (MyAcc other : it) {
            acc.add(other.getValue());
        }
    }

    @Override
    public BigDecimal getValue(MyAcc acc) {
        return BigDecimal.valueOf(acc.getValue());
    }

    @Override
    public MyAcc createAccumulator() {
        return new MyAcc();
    }

    public static class MyAcc {
        private long value = 0;

        public long getValue() {
            return value;
        }

        public void add(long v) {
            value += v;
        }
    }
}

通过以下SQL可以使用上述UDAF。

CREATE TABLE T (
  a INT,
  b DECIMAL(12, 3)
) WITH (
  ...
);

SELECT * FROM T;
/*
+-------------+----------------+
|           a |              b |
+-------------+----------------+
|           1 |         10.000 |
|           2 |         20.000 |
+-------------+----------------+
*/

CREATE FUNCTION my_udaf as 'package.name.MyUDAF';

select my_udaf(a), my_udaf(b) from T;
/*
+----------------+----------------+
|         EXPR$0 |         EXPR$1 |
+----------------+----------------+
|          3.000 |         30.000 |
+----------------+----------------+
*/

自定义表值函数(UDTF)

  • getParameterTypes与getResultType

    如果您的Blink UDTF实现了getParameterTypes与getResultType方法指定输入与输出类型,则您需要删除这两个方法并使用注解指定类型。您可以通过以下任意一种方式进行:

    • 如果每个eval方法的输出类型一致,则可以使用以下写法。

      @FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
      public static class OverloadedFunction extends TableFunction<Row> {
      
        public void eval(int a, int b) {
          collect(Row.of("Sum", a + b));
        }
      
        public void eval() {
          collect(Row.of("Empty args", -1));
        }
      }
    • 如果同一个eval方法定义多种输入输出类型,则可以使用以下写法。

      // 输入两个int,返回int。
      @FunctionHint(
        input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
        output = @DataTypeHint("INT")
      )
      // 输入两个bigint,返回bigint。
      @FunctionHint(
        input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
        output = @DataTypeHint("BIGINT")
      )
      // 没有入参,返回boolean。
      @FunctionHint(
        input = {},
        output = @DataTypeHint("BOOLEAN")
      )
      public static class OverloadedFunction extends TableFunction<Object> {
      
        public void eval(Object... o) {
          if (o.length == 0) {
            collect(false);
          }
          collect(o[0]);
        }
      }
    • 如果需要指定特定eval方法的输入类型,或需要了解@DataTypeHint的具体写法,详情请参见getParameterTypes与getResultType

  • FunctionContext#getJobParameter

    详情请参见FunctionContext#getJobParameter

根据输入的常量确定返回类型

Blink UDF、UDAF和UDTF提供DataType getResultType(Object[] arguments, Class[] argTypes) 方法。如果输入的第i个参数(从0开始)为常量,则arguments[i] 将会包含这个常量。该接口允许您根据输入的常量不同,而产生不同的返回类型。

Flink全托管不提供该接口,但有一个等价的接口TypeInference getTypeInference(DataTypeFactory typeFactory)。代码示例如下。

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;

public static class LiteralFunction extends ScalarFunction {

  // 该eval方法根据type值的不同,返回类型也不同。
  public Object eval(String s, String type) {
    switch (type) {
      case "INT":
        return Integer.valueOf(s);
      case "DOUBLE":
        return Double.valueOf(s);
      case "STRING":
      default:
        return s;
    }
  }

  // 禁用自动的反射式类型推导,使用如下逻辑进行类型推导。
  @Override
  public TypeInference getTypeInference(DataTypeFactory typeFactory) {
    return TypeInference.newBuilder()
      // 指定输入参数的类型,必要时参数会被隐式转换。
      .typedArguments(DataTypes.STRING(), DataTypes.STRING())
      // 描述一个根据输入的字符串常量推导返回类型的strategy。
      .outputTypeStrategy(callContext -> {
        // 检查参数1必须是非NULL常量。
        if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
          throw callContext.newValidationError("Literal expected for second argument.");
        }
        // 基于参数1的值返回数据类型。
        final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");
        switch (literal) {
          case "INT":
            return Optional.of(DataTypes.INT().notNull());
          case "DOUBLE":
            return Optional.of(DataTypes.DOUBLE().notNull());
          case "STRING":
          default:
            return Optional.of(DataTypes.STRING());
        }
      })
      .build();
  }
}