本文为您介绍如何为Flink自定义聚合函数(UDAF)开发、注册和使用流程。

定义

自定义聚合函数(UDAF),将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情参见User-defined Functions
说明 User-defined Functions和下文中的ASI_UDX_Demo属于第三方搭建的网站,访问时可能会存在无法打开或访问延迟的问题。

UDAF开发

说明 Flink为您提供了UDF示例,便于您快速开发业务。Flink UDF示例中包含UDSF、UDAF和UDTF的实现,示例中已为您配置对应版本的开发环境,您无需进行环境搭建。
  1. 下载并解压ASI_UDX_Demo示例到本地。
    解压完成后,会生成ASI_UDX-main文件夹。其中:
    • pom.xml:项目级别的配置文件,主要描述了项目的Maven坐标、依赖关系、开发者需要遵循的规则、缺陷管理系统,组织和Licenses,以及其他所有的项目相关因素。
    • \ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java:自定义聚合函数(UDAF)示例的Java代码。
  2. 在IntelliJ IDEA中,选择file > open,打开刚才解压缩完成的ASI_UDX-main
  3. 双击打开\ASI_UDX-main\后,配置pom.xml
    该示例中,pom.xml文件已配置了Flink 1.12版开发自定义函数需要的最小化依赖信息。如果您的业务:
    • 没有其他依赖:不用配置pom.xml文件,继续下一步。
    • 有其他依赖:在pom.xml文件中添加您所需的依赖信息。
    Flink 1.12版最小化依赖如下。
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>
             <version>1.12.7</version>
             <scope>provided</scope>
         </dependency>
    </dependencies>
    说明 在填写version时,建议根据目标作业的VVR版本,填写为对应Flink大版本的最新小版本。关于VVR和Flink的对应关系,详情请参见Flink计算引擎(VVR)和Apache Flink的版本对应关系
  4. 双击打开\ASI_UDX-main\src\main\java\ASI_UDAF后,根据您的业务,配置ASI_UDAF.java
    该示例中,ASI_UDAF.java实现了累积求和的代码,详情如下。
    package ASI_UDAF;
    
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.Iterator;
    
    public class ASI_UDAF {
        public static class AccSum {
            public long sum;
        }
    
        public static class MySum extends AggregateFunction<Long, AccSum> {
    
            @Override
            public Long getValue(AccSum acSum) {
                return acSum.sum;
            }
    
            @Override
            public AccSum createAccumulator() {
                AccSum acCount = new AccSum();
                acCount.sum = 0;
                return acCount;
            }
    
            public void accumulate(AccSum acc, long num) {
                acc.sum += num;
            }
    
            /**
             * Support retract a msg generated by upstream operator.
             */
            public void retract(AccSum acc, long num) {
                acc.sum -= num;
            }
    
            /**
             * Support local-global two stage aggregate optimization.
             */
            public void merge(AccSum acc, Iterable<AccSum> it) {
                Iterator<AccSum> iter = it.iterator();
                while (iter.hasNext()) {
                    AccSum accSum = iter.next();
                    if (null != accSum) {
                        acc.sum += accSum.sum;
                    }
                }
            }
        }
    }
    该UDAF的是一个简单累加和的操作。例如,同一个分组键(GROUP BY字段)的3条输入数据分别为1、2、3,输出结果有以下两种情况:
    • 在未启用MiniBatch优化时,即默认配置,则输出的结果为1、3、6。
    • 在开启了MiniBatch优化时,因为输出的数据条数取决于设置的MiniBatch参数和输入数据的分布情况,所以能确定的是最后输出一条结果为6, 但输出的中间结果条数不确定。
    说明 MiniBatch优化详情,请参见高性能Flink SQL优化技巧
  5. 在下载文件中pom.xml所在目录执行如下打包命令。
    mvn package -Dcheckstyle.skip

    \ASI_UDX-main\target\目录下会出现ASI_UDX-1.0-SNAPSHOT.jar的JAR包,即代表完成了UDAF开发工作。

UDAF使用

您可以通过以下两种方式在SQL作业中使用自定义UDAF:
  • 方式一:先注册UDAF,再在作业中直接使用已注册的UDAF。
    通过该方式进行函数注册的优点为便于后续开发进行代码复用。UDAF注册过程,请参见管理自定义函数(UDF)。如果注册完的函数名称为ASI_UDAF$MySum,则在作业中直接使用的代码示例如下。
    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a BIGINT NOT NULL
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `ASI_UDAF$MySum`(a)
    FROM ASI_UDAF_Source;
  • 方式二:先将自定义函数JAR包上传至Flink作业开发高级配置页签的附加依赖文件选项,再在作业的SQL语句中添加创建临时函数的语句,并使用该函数。
    上传自定义函数JAR包到附加依赖文件中后,只能在本作业中使用该自定义函数,其他作业中不可使用。如果创建的临时函数名称为mysum,则在作业中使用该函数的代码示例如下。
    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a   BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; --创建临时函数mysum。
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `mysum`(a)
    FROM ASI_UDAF_Source;
说明 SQL作业开发完成后,需要在作业运维页面,单击目标作业名称操作列的启动。启动成功后,ASI_UDAF_Sink表中就会插入ASI_UDAF_Source表中a字段数据的累加和。