使用Partitioner实现自定义分区写入

本文将介绍如何基于FlinkKafkaPartitioner实现数据的自定义分区逻辑,将数据按需写入 Kafka 的不同分区。

分区模式

Kafka连接器可以通过设置sink.partitioner参数来配置合适的分区模式。如果都不满足您的需求,则需要通过自定义分区映射来满足不同的数据写入需求。

模式

分区逻辑

default(默认值)

  • 消息带有Key:具有相同Key的消息会被分配到同一个分区,保证分区内的顺序性。

  • 消息不带Key:采用轮询策略,均匀分布消息以实现负载均衡。

fixed

每一个并发对应一个固定分区。

  • Kafka分区为3,并发数为3,则每个并发任务会固定消费一个Kafka分区。

  • Kafka分区为4,并发数为3,某些分区会被分配给同一个并发任务处理。

  • Kafka分区为2,并发数为3,某些并发任务会空闲,无法分配到任何分区。

round-robin

Flink并发中的数据将被轮流依次分配至Kafka的各个分区。

SQL作业实现自定义分区

步骤一:编写自定义分区器

分区器需要继承FlinkKafkaPartitioner类,重写partition方法。

KafkaSinkPartitioner.java

自定义分区逻辑:从数据中的日期字段(字符串)提取最后两位数字,对分区数取模,确保相同日期的数据落入同一分区(目标分区数为 3)。

package com.aliyun;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.data.GenericRowData;

public class KafkaSinkPartitioner extends FlinkKafkaPartitioner {

    @Override
    public int partition(Object record, byte[] key, byte[] value, String topic, int[] partitionSize) {
    // 在FlinkSQL中,record的实际类型是GenericRowData
        if (record instanceof GenericRowData){
            GenericRowData grData  = (GenericRowData) record;
            // 可以通过如下类的方法获取该行所有数据
            for (int i = 0; i < grData.getArity(); i++){
                Object field = grData.getField(i);
                System.out.println("index: " + i + " :" + field);
            }
            // 根据日期,取日期最后两位,将相同日期的数据写入到同一个分区
            String dateString = grData.getString(2).toString();
            int date = Integer.parseInt(dateString.substring(dateString.length() - 2));
            return date % 3;
        }
        else {
            throw new IllegalArgumentException("record is not GenericRowData");
        }
    }
}

步骤二:编写SQL作业

需要在SQL作业中配置sink.partitioner参数来使用。本示例使用阿里云消息队列Kafka进行样例演示。

CREATE TEMPORARY TABLE KafkaSource (
  order_id STRING,
  order_name STRING,
  dt STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.group.id' = 'test-group',  -- 消费者组ID
  'properties.bootstrap.servers' = '<bootstrap.servers>', -- 填入对应的Kafka broker地址。
  'format' = 'csv',                        -- value部分时使用的数据格式
  'scan.startup.mode' = 'earliest-offset'  -- 从Kafka最早分区开始读取。
);

CREATE TEMPORARY TABLE kafkaSink (
  order_id STRING,
  order_name STRING,
  dt STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' ='<bootstrap.servers>',
  'format' = 'csv',
  'properties.enable.idempotence' = 'false',
  'sink.partitioner' = 'com.aliyun.KafkaSinkPartitioner'
);

INSERT INTO kafkaSink
SELECT * FROM KafkaSource
;

pom.xml

VVR Kafka连接器的其他版本已经放置在Maven中央仓库

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun</groupId>
    <artifactId>KafkaPartitionerSQL</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
    <!--Kafka连接器-->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>1.17-vvr-8.0.11-1</version>
        </dependency>
    <!--flink 依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.2</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>KafkaPartitionerSQL</finalName>
        <plugins>
            <!-- Java 编译器-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.13.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- 我们使用maven-shade 创建一个包含所有必须依赖的 fat jar -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.3</version>
            </plugin>
        </plugins>
    </build>
</project>
重要
  • 阿里云消息队列Kafka不支持幂等和事务写入,因此无法实现精确一次语义(Exactly-Once Semantics),需要在结果表添加properties.enable.idempotence=false以关闭幂等写入功能。

  • sink.partitioner需要填写继承FlinkKafkaPartitioner类的完整引用路径,如com.aliyun.KafkaSinkPartitioner

  • 更多参数请参见Kafka连接器WITH参数

步骤三:打包并上传自定义分区器

使用文件管理功能将编译好的JAR包上传至实时计算控制台。

image

步骤四:作业引用

JAR包作为附加依赖文件引用后,请在WITH参数中设置sink.partitioner参数,参数值为分区器完整的类路径,如org.mycompany.MyPartitioner

image

步骤五:验证测试

使用阿里云消息队列Kafka的快速体验消息收发进行测试。

  1. 登录消息队列Kafka控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. Topic管理页面,选择对应的Topic,单击体验发送消息进行测试。

    image

    yun001,flink,20250501
    yun002,flink,20250505
    yun003,flink,20250505
    yun004,flink,20250505
  5. 发送四条消息后,可以在下游的topic查看消息写入分区的情况。

    可以看到相同日期的数据都被写入到了同一分区。

    image

JAR作业实现自定义分区

步骤一:编写JAR作业

使用VVR Kafka连接器中的setPartitioner指定自定义分区逻辑类。

KafkaPartitionerDataStream.java

自定义分区逻辑:从数据中的日期字段(字符串)提取最后两位数字,对分区数取模,确保相同日期的数据落入同一分区(目标分区数为 3)。

package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

public class KafkaPartitionerDataStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 构建Kafka Source
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("<BootstrapServers>")
                .setTopics("source")
                .setGroupId("test-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();
        // 采用无水印策略
        DataStreamSource<String> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
        // 构建Kafka Sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("<BootstrapServers>")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                                .setTopic("sink")
                                // 自定义分区
                                .setPartitioner(new KafkaSinkPartitioner())      
                                .setKafkaValueSerializer(StringSerializer.class) 
                                .build())
                .build();
        source.sinkTo(sink).name("Kafka Sink");
        env.execute();
    }
    
    static class KafkaSinkPartitioner extends FlinkKafkaPartitioner<String> {
    // 根据日期,取日期最后两位,将相同日期的数据写入到同一个分区 
        @Override
        public int partition(String  record, byte[] key, byte[] value, String topic, int[] partitionSize) {
            String[] s = record.split(",");
            int date = Integer.parseInt(s[2].substring(s[2].length() - 2));
            return date % 3;
        }
    }

}

pom.xml

VVR Kafka连接器的其他版本已经放置在Maven中央仓库

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun</groupId>
    <artifactId>KafkaPartitionerDataStream</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--VVR Kafka Connector-->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>1.17-vvr-8.0.11-1</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.17.2</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <finalName>KafkaPartitionerDS</finalName>
        <plugins>
            <!-- Java 编译器-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.13.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- 我们使用maven-shade 创建一个包含所有必须依赖的 fat jar -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.KafkaPartitionerDataStream</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>
</project>

步骤二:打包作业后上传

使用文件管理功能将编译好的JAR包上传至实时计算控制台。连接器依赖建议声明为provided作用域,作为附加依赖文件引入,便于版本管理,更轻体量的JAR包方便上传,详情请参见连接器依赖和使用

image

步骤三:部署作业

部署作业更多参数说明详见部署JAR作业

image

步骤四:验证测试

使用阿里云消息队列Kafka的快速体验消息收发进行测试。

  1. 登录消息队列Kafka控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. Topic管理页面,选择对应的Topic,单击体验发送消息进行测试。

    image

    yun001,flink,20250501
    yun002,flink,20250505
    yun003,flink,20250505
    yun004,flink,20250505
  5. 发送四条消息后,可以在下游的topic查看消息写入分区的情况。

    可以看到相同日期的数据都被写入到了同一分区。

    image

相关文档