本文介绍了如何通过使用Flink内部数据类型(Internal Data Types)来编写高效的 UDF,从而减少类型转换开销、降低GC压力,并提升整体作业性能。
背景与概述
Apache Flink提供了强大的SQL接口支持,用户可通过自定义函数(User Defined Function, UDF)扩展其功能。然而,传统的UDF实现方式往往依赖Java原生类型(如Map、List),在运行时需频繁转换为Flink引擎使用的内部数据格式,造成不必要的性能损耗。
Flink 数据类型体系简介
类型 | 名称 | 描述 |
External Data Type | 外部数据类型 | 面向用户的Java类型,如Map,List,String等。 |
Internal Data Type | 内部数据类型 | Flink引擎优化后的二进制表示形式,如MapData,ArrayData,RowData。 |
在 UDF 执行过程中,外部类型需被转换为内部类型进行处理,这一过程会带来额外的CPU和内存开销。
高效UDF实现的核心原则
直接使用 Internal Data Type 作为输入参数与返回值类型
避免在UDF中使用Java原生对象,改用Flink内部类型以减少序列化/反序列化开销。
自定义类型推断逻辑
确保输入输出类型都声明为Internal Data Type,便于Flink进行优化。
避免创建临时对象
Internal Data Type支持更高效的访问方式,避免在循环或高频调用中创建新对象。
UDF示例
功能说明
该UDF用于从Map类型字段中提取所有键并以数组形式返回。
输入 | 输出 |
SELECT mapkey(MAP['A',1,'B',2]); | [A, B] |
SELECT mapkey(STR_TO_MAP('a=1,b=2')); | [a, b] |
完整代码示例
Java
package com.aliyun.example;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeInference.newBuilder;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.api.DataTypes;
import java.util.Optional;
import java.util.List;
public class MapKeyUDF extends ScalarFunction {
public ArrayData eval(MapData input) {
if (input == null) return null;
return input.keyArray();
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return newBuilder()
.inputTypeStrategy(MAP)
.outputTypeStrategy(nullableIfArgs(MAP_KEYS))
.build();
}
private static final InputTypeStrategy MAP = new InputTypeStrategy() {
@Override
public ArgumentCount getArgumentCount() {
return ConstantArgumentCount.of(1);
}
@Override
public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) {
return Optional.of(
callContext.getArgumentDataTypes().stream()
.map(DataTypeUtils::toInternalDataType)
.collect(Collectors.toList()));
}
@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
return null;
}
};
private static final TypeStrategy MAP_KEYS = callContext ->
Optional.of(
DataTypeUtils.toInternalDataType(DataTypes.ARRAY(
((KeyValueDataType) callContext.getArgumentDataTypes().get(0)).getKeyDataType()
))
);
private static final TypeStrategy nullableIfArgs(TypeStrategy strategy) {
return callContext -> strategy.infer(callContext).map(DataType::copy);
}
}
依赖配置
<!-- Flink Table Runtime -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table Common -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table API Java Bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
关键实现点分析
使用 Internal Data Type 作为参数和返回值
使用
MapData
作为数据接入,而不是Java的Map
,避免类型转换。返回
ArrayData
作为数据输出,而不是Java的List
,进一步减少了内存开销。
public ArrayData eval(MapData input) { if (input == null) { return null; } return input.keyArray(); }
输入类型策略
确保输入的参数被转换为
Internal Data Type
。限制函数只接受一个参数。
private static final InputTypeStrategy MAP = new InputTypeStrategy() { @Override public ArgumentCount getArgumentCount() { // 限制函数只接受一个参数。 return ConstantArgumentCount.of(1); } @Override public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) { // 确保输入的参数被转换为Internal Data Type return Optional.of( callContext.getArgumentDataTypes().stream() .map(DataTypeUtils::toInternalDataType) .collect(Collectors.toList())); } @Override public List<Signature> getExpectedSignatures(FunctionDefinition definition) { // 无签名返回,一般内部可见 return null; } };
输出类型策略
明确指定输出类型为数组,并且其元素类型与输入
Map
的键类型一致。通过确保使用内部类型表示
private static final TypeStrategy MAP_KEYS = callContext -> Optional.of( DataTypeUtils.toInternalDataType(DataTypes.ARRAY( ((KeyValueDataType) callContext.getArgumentDataTypes().get(0)).getKeyDataType() )) );
性能优势
使用Internal Data Type实现UDF可带来以下性能优势:
减少类型转换开销 :避免 Java 类型与 Flink 内部类型的频繁转换。
降低 GC 压力 :减少对象创建,降低垃圾回收频率。
提高数据处理效率 :利用 Flink 引擎对内部数据类型的专门优化。
减少内存占用 :相比 Java 对象,内部数据类型通常占用更少内存。
注意事项
尽管使用 Internal Data Type 能显著提升性能,但也需要注意以下几点:
操作受限:内部数据类型支持的操作可能比Java类型少。
学习曲线 :需要熟悉Flink内部数据类型的API。
代码可读性 :相比标准Java类型,代码可能不够直观。
调试难度 :内部数据类型的调试可能更复杂。
最佳实践建议
通过使用 Internal Data Type 编写 Flink UDF,可以显著提升性能,特别是在处理大规模数据时效果尤为明显。虽然这种方式有一定的学习和使用成本,但在性能敏感的场景下,这种投入是值得的。尽管如此,也不能一味地追求采用内部数据结构处理,需要有一定评估与测试。
评估性能需求:对于性能要求高的UDF,优先考虑使用Internal Data Type。
熟悉内部类型API:深入了解
ArrayData
、MapData
、RowData
等类的使用方法。正确实现类型推断:确保正确覆写
getTypeInference
方法,明确输入/输出类型。测试验证:通过性能测试对比不同实现方式的效果
代码注释:为使用内部数据类型的代码添加充分的注释,提高可维护性。
相关文档
主要路径:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data
内部数据类型示例:
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/