本文将介绍如何基于FlinkKafkaPartitioner实现数据的自定义分区逻辑,将数据按需写入 Kafka 的不同分区。
分区模式
Kafka连接器可以通过设置sink.partitioner
参数来配置合适的分区模式。如果都不满足您的需求,则需要通过自定义分区映射来满足不同的数据写入需求。
模式 | 分区逻辑 |
default(默认值) |
|
fixed | 每一个并发对应一个固定分区。
|
round-robin | Flink并发中的数据将被轮流依次分配至Kafka的各个分区。 |
SQL作业实现自定义分区
步骤一:编写自定义分区器
分区器需要继承FlinkKafkaPartitioner类,重写partition方法。
KafkaSinkPartitioner.java
自定义分区逻辑:从数据中的日期字段(字符串)提取最后两位数字,对分区数取模,确保相同日期的数据落入同一分区(目标分区数为 3)。
package com.aliyun;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.data.GenericRowData;
public class KafkaSinkPartitioner extends FlinkKafkaPartitioner {
@Override
public int partition(Object record, byte[] key, byte[] value, String topic, int[] partitionSize) {
// 在FlinkSQL中,record的实际类型是GenericRowData
if (record instanceof GenericRowData){
GenericRowData grData = (GenericRowData) record;
// 可以通过如下类的方法获取该行所有数据
for (int i = 0; i < grData.getArity(); i++){
Object field = grData.getField(i);
System.out.println("index: " + i + " :" + field);
}
// 根据日期,取日期最后两位,将相同日期的数据写入到同一个分区
String dateString = grData.getString(2).toString();
int date = Integer.parseInt(dateString.substring(dateString.length() - 2));
return date % 3;
}
else {
throw new IllegalArgumentException("record is not GenericRowData");
}
}
}
步骤二:编写SQL作业
需要在SQL作业中配置sink.partitioner
参数来使用。本示例使用阿里云消息队列Kafka进行样例演示。
CREATE TEMPORARY TABLE KafkaSource (
order_id STRING,
order_name STRING,
dt STRING
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.group.id' = 'test-group', -- 消费者组ID
'properties.bootstrap.servers' = '<bootstrap.servers>', -- 填入对应的Kafka broker地址。
'format' = 'csv', -- value部分时使用的数据格式
'scan.startup.mode' = 'earliest-offset' -- 从Kafka最早分区开始读取。
);
CREATE TEMPORARY TABLE kafkaSink (
order_id STRING,
order_name STRING,
dt STRING
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' ='<bootstrap.servers>',
'format' = 'csv',
'properties.enable.idempotence' = 'false',
'sink.partitioner' = 'com.aliyun.KafkaSinkPartitioner'
);
INSERT INTO kafkaSink
SELECT * FROM KafkaSource
;
pom.xml
VVR Kafka连接器的其他版本已经放置在Maven中央仓库。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>KafkaPartitionerSQL</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--Kafka连接器-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>1.17-vvr-8.0.11-1</version>
</dependency>
<!--flink 依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.17.2</version>
</dependency>
</dependencies>
<build>
<finalName>KafkaPartitionerSQL</finalName>
<plugins>
<!-- Java 编译器-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- 我们使用maven-shade 创建一个包含所有必须依赖的 fat jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.3</version>
</plugin>
</plugins>
</build>
</project>
阿里云消息队列Kafka不支持幂等和事务写入,因此无法实现精确一次语义(Exactly-Once Semantics),需要在结果表添加
properties.enable.idempotence=false
以关闭幂等写入功能。sink.partitioner
需要填写继承FlinkKafkaPartitioner
类的完整引用路径,如com.aliyun.KafkaSinkPartitioner
。更多参数请参见Kafka连接器WITH参数。
步骤三:打包并上传自定义分区器
使用文件管理功能将编译好的JAR包上传至实时计算控制台。
步骤四:作业引用
将JAR包作为附加依赖文件引用后,请在WITH参数中设置sink.partitioner
参数,参数值为分区器完整的类路径,如org.mycompany.MyPartitioner
。
步骤五:验证测试
使用阿里云消息队列Kafka的快速体验消息收发进行测试。
登录消息队列Kafka控制台。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在Topic管理页面,选择对应的Topic,单击体验发送消息进行测试。
yun001,flink,20250501 yun002,flink,20250505 yun003,flink,20250505 yun004,flink,20250505
发送四条消息后,可以在下游的topic查看消息写入分区的情况。
可以看到相同日期的数据都被写入到了同一分区。
JAR作业实现自定义分区
步骤一:编写JAR作业
使用VVR Kafka连接器中的setPartitioner
指定自定义分区逻辑类。
KafkaPartitionerDataStream.java
自定义分区逻辑:从数据中的日期字段(字符串)提取最后两位数字,对分区数取模,确保相同日期的数据落入同一分区(目标分区数为 3)。
package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
public class KafkaPartitionerDataStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 构建Kafka Source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("<BootstrapServers>")
.setTopics("source")
.setGroupId("test-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// 采用无水印策略
DataStreamSource<String> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 构建Kafka Sink
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("<BootstrapServers>")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("sink")
// 自定义分区
.setPartitioner(new KafkaSinkPartitioner())
.setKafkaValueSerializer(StringSerializer.class)
.build())
.build();
source.sinkTo(sink).name("Kafka Sink");
env.execute();
}
static class KafkaSinkPartitioner extends FlinkKafkaPartitioner<String> {
// 根据日期,取日期最后两位,将相同日期的数据写入到同一个分区
@Override
public int partition(String record, byte[] key, byte[] value, String topic, int[] partitionSize) {
String[] s = record.split(",");
int date = Integer.parseInt(s[2].substring(s[2].length() - 2));
return date % 3;
}
}
}
pom.xml
VVR Kafka连接器的其他版本已经放置在Maven中央仓库。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>KafkaPartitionerDataStream</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--VVR Kafka Connector-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>1.17-vvr-8.0.11-1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.17.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<finalName>KafkaPartitionerDS</finalName>
<plugins>
<!-- Java 编译器-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- 我们使用maven-shade 创建一个包含所有必须依赖的 fat jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aliyun.KafkaPartitionerDataStream</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
步骤二:打包作业后上传
使用文件管理功能将编译好的JAR包上传至实时计算控制台。连接器依赖建议声明为provided
作用域,作为附加依赖文件引入,便于版本管理,更轻体量的JAR包方便上传,详情请参见连接器依赖和使用。
步骤三:部署作业
部署作业更多参数说明详见部署JAR作业。
步骤四:验证测试
使用阿里云消息队列Kafka的快速体验消息收发进行测试。
登录消息队列Kafka控制台。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在Topic管理页面,选择对应的Topic,单击体验发送消息进行测试。
yun001,flink,20250501 yun002,flink,20250505 yun003,flink,20250505 yun004,flink,20250505
发送四条消息后,可以在下游的topic查看消息写入分区的情况。
可以看到相同日期的数据都被写入到了同一分区。
相关文档
更多JAR作业开发流程请参考JAR作业开发。
如果遇到阿里云Kafka相关的报错问题,请参考使用云消息队列 Kafka 版时客户端的报错及解决方案。