使用UDAF实现数据排序和聚合

本文提供了一个自定义聚合函数(UDAF),实现将多行数据合并为一行并按照指定列进行排序,并以居民用电户电网终端数据为例,介绍如何在实时计算控制台使用该函数进行数据聚合和排序。

示例数据

居民用电户电网终端数据表electric_info,包括事件标识event_id,用户标识user_id,事件时间event_time,用户终端状态status。需要将用户的终端状态按照事件时间升序排列。

  • electric_info

    event_id

    user_id

    event_time

    status

    1

    1222

    2023-06-30 11:14:00

    LD

    2

    1333

    2023-06-30 11:12:00

    LD

    3

    1222

    2023-06-30 11:11:00

    TD

    4

    1333

    2023-06-30 11:12:00

    LD

    5

    1222

    2023-06-30 11:15:00

    TD

    6

    1333

    2023-06-30 11:18:00

    LD

    7

    1222

    2023-06-30 11:19:00

    TD

    8

    1333

    2023-06-30 11:10:00

    TD

    9

    1555

    2023-06-30 11:16:00

    TD

    10

    1555

    2023-06-30 11:17:00

    LD

  • 预期结果

    user_id

    status

    1222

    TD,LD,TD,TD

    1333

    TD,LD,LD,LD

    1555

    TD,LD

步骤一:准备数据源

本文以云数据库RDS为例。

  1. 快速创建RDS MySQL实例

    说明

    RDS MySQL版实例需要与Flink工作空间处于同一VPC。不在同一VPC下时请参见网络连通性

  2. 创建数据库和账号

    创建名称为electric的数据库,并创建高权限账号或具有数据库electric读写权限的普通账号。

  3. 通过DMS登录RDS MySQL,在electric数据库中创建表electric_info和electric_info_SortListAgg,并插入数据。

    CREATE TABLE `electric_info` (
      event_id bigint NOT NULL PRIMARY KEY COMMENT '事件id',
      user_id bigint NOT NULL COMMENT '用户标识', 
      event_time timestamp NOT NULL COMMENT '事件时间',
      status varchar(10) NOT NULL COMMENT '用户终端状态'
    );
    
    CREATE TABLE `electric_info_SortListAgg` (
      user_id bigint NOT NULL PRIMARY KEY COMMENT '用户标识', 
      status_sort varchar(50) NULL COMMENT '用户终端状态按事件时间升序'
    );
    
    -- 准备数据
    INSERT INTO electric_info VALUES 
    (1,1222,'2023-06-30 11:14','LD'),
    (2,1333,'2023-06-30 11:12','LD'),
    (3,1222,'2023-06-30 11:11','TD'),
    (4,1333,'2023-06-30 11:12','LD'),
    (5,1222,'2023-06-30 11:15','TD'),
    (6,1333,'2023-06-30 11:18','LD'),
    (7,1222,'2023-06-30 11:19','TD'),
    (8,1333,'2023-06-30 11:10','TD'),
    (9,1555,'2023-06-30 11:16','TD'),
    (10,1555,'2023-06-30 11:17','LD');

步骤二:注册UDF

  1. 下载ASI_UDX-1.0-SNAPSHOT.jar

    pom.xml文件已配置了Flink 1.17.1版该自定义函数需要的最小化依赖信息。关于使用自定义函数的更多信息,详情请参见自定义函数

  2. 本示例中ASI_UDAF实现了多行数据合并一行并按照指定列进行排序,详情如下。后续您可以根据实际业务情况进行修改。

    package ASI_UDAF;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    public class ASI_UDAF{
    	/**Accumulator class*/
    	public static class AcList {
    		public  List<String> list;
    	}
    
    	/**Aggregate function class*/
    	public static class SortListAgg extends AggregateFunction<String,AcList> {
    		public String getValue(AcList asc) {
    			/**Sort the data in the list according to a specific rule*/
    			asc.list.sort(new Comparator<String>() {
    				@Override
    				public int compare(String o1, String o2) {
    					return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]);
    				}
    			});
    			/**Traverse the sorted list, extract the required fields, and join them into a string*/
    			List<String> ret = new ArrayList<String>();
    			Iterator<String> strlist = asc.list.iterator();
    			while (strlist.hasNext()) {
    				ret.add(strlist.next().split("#")[0]);
    			}
    			String str = StringUtils.join(ret, ',');
    			return str;
    		}
    
    		/**Method to create an accumulator*/
    		public AcList createAccumulator() {
    			AcList ac = new AcList();
    			List<String> list = new ArrayList<String>();
    			ac.list = list;
    			return ac;
    		}
    
    		/**Accumulation method: add the input data to the accumulator*/
    		public void accumulate(AcList acc, String tuple1) {
    			acc.list.add(tuple1);
    		}
    
    		/**Retraction method*/
    		public void retract(AcList acc, String num) {
    		}
    	}
    }
  3. 进入注册UDF页面。

    注册UDF方式的优点是便于后续开发进行代码复用。对于Java类型的UDF,您也可以通过依赖文件项进行上传,详情请参见自定义聚合函数(UDAF)

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 单击数据开发 > ETL

    4. 单击左侧的函数页签,单击注册UDF

      image.png

  4. 选择文件位置上传步骤1中的JAR文件,单击确定

    注册UDF

    说明
    • 当您开通Flink工作空间时绑定了OSS Bucket,则您的UDF JAR文件会被上传到该OSS Bucket的sql-artifacts目录下;当您开通Flink工作空间时选择了全托管存储时,则您的UDF JAR文件会被存储在资源管理页面的资源文件中。

    • 此外,Flink开发控制台会解析您UDF JAR文件中是否使用了Flink UDF、UDAF和UDTF接口的类,并自动提取类名,填充到Function Name字段中。

  5. 管理函数对话框,单击创建函数

    在SQL编辑器页面左侧函数列表,您可以看到已注册成功的UDF。

步骤三:创建Flink作业

  1. 数据开发 > ETL页面,单击新建

    image.png

  2. 单击空白的流作业草稿

  3. 单击下一步

  4. 新建作业草稿对话框,填写作业配置信息。

    作业参数

    说明

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    存储位置

    指定该作业的存储位置。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    引擎版本

    当前作业使用的Flink的引擎版本。需要与pom中的version一致。

    引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

  5. 编写DDL和DML代码。

    --创建临时表electric_info
    CREATE TEMPORARY TABLE electric_info (
      event_id bigint not null,
      `user_id` bigint not null, 
      event_time timestamp(6) not null,
      status string not null,
      primary key(event_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info'
    );
    
    CREATE TEMPORARY TABLE electric_info_sortlistagg (
      `user_id` bigint not null, 
      status_sort varchar(50) not null,
      primary key(user_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info_sortlistagg'
    );
    
    --将electric_info表中的数据聚合并插入到electric_info_sortlistagg表中
    --将status和event_time拼接成的字符串作为参数传递给已注册的自定义函数ASI_UDAF$SortListAgg
    INSERT INTO electric_info_sortlistagg 
    SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING)))
    FROM electric_info GROUP BY user_id;

    参数说明如下,您可以根据实际情况进行修改。MySQL连接器更多参数详情请参见MySQL

    参数

    说明

    备注

    connector

    连接器类型。

    本示例固定值为mysql

    hostname

    MySQL数据库的IP地址或者Hostname。

    本文填写为RDS MySQL实例的内网地址。

    username

    MySQL数据库服务的用户名。

    无。

    password

    MySQL数据库服务的密码。

    本示例通过使用名为mysql_pw密钥的方式填写密码值,避免信息泄露,详情请参见变量管理

    database-name

    MySQL数据库名称。

    本示例填写为步骤一:准备数据源中创建的数据库electric。

    table-name

    MySQL表名。

    本示例填写为electric或electric_info_sortlistagg。

    port

    MySQL数据库服务的端口号。

    无。

  6. (可选)单击右上方的深度检查调试,功能详情请参见SQL作业开发

  7. 单击部署,单击确定

  8. 运维中心 > 作业运维页面,单击目标作业名称操作列下的启动,选择无状态启动

步骤四:查询结果

在RDS中使用如下语句查看用户的终端状态按照事件时间升序排列结果。

SELECT * FROM `electric_info_sortlistagg`;

结果如下:

image.png

相关文档