Kafka处理Protobuf数据格式

Kafka连接器支持读取Protobuf格式的数据。

Protobuf格式

Protobuf(Protocol Buffers)是由Google开发的一种高效、跨语言、结构化的数据序列化格式。相比于 JSON 和 XML,它具有以下显著优势:

  • 体积小:序列化后的数据更紧凑,节省存储与网络传输资源。

  • 速度快:序列化与反序列化效率高,适合高性能场景。

  • 结构化定义:通过 .proto 文件定义数据结构,接口清晰,易于维护。

  • 跨语言支持强 :支持主流编程语言,便于多系统间数据交互。

因此,Protobuf 被广泛应用于高频通信、微服务、实时计算等场景,是 Kafka 中推荐使用的高效数据格式之一。

使用限制

  • 仅支持通过proto2语法定义的Protobuf文件生成的源代码。

  • 仅支持Protocol Buffers 21.7及以下版本。

步骤一:编译Protobuf文件

  1. 创建名为order.protoProtobuf文件。

    syntax = "proto2";
    // proto 的package名称
    package com.aliyun;
    // java package名称,如果不指定会默认用protopackage
    option java_package = "com.aliyun";
    // 是否编译成多个文件
    option java_multiple_files = true;
    // java的封装类的类名
    option java_outer_classname = "OrderProtoBuf";
    message Order {
      optional int32 orderId = 1;
      optional string orderName= 2;
      optional double orderPrice = 3;
      optional int64 orderDate = 4;
    }
  2. 使用Protocol Buffers工具生成源代码。

    创建一个Maven空项目,将Protobuf文件放到resources目录下。

    目录示例

    KafkaProtobuf
    ‒ src
      -java
      -resources 
        -order.proto
    ‒ pom.xml

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
            <groupId>com.aliyun</groupId>
            <artifactId>KafkaProtobuf</artifactId>
            <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>3.21.7</version> <!-- 版本号需与生成代码时使用的 Protobuf 版本一致 -->
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>3.3.1</version> <!-- 请根据你的 Kafka 版本进行调整 -->
            </dependency>
            
            <dependency>
                <groupId>com.github.javafaker</groupId>
                <artifactId>javafaker</artifactId>
                <version>1.0.2</version>
            </dependency>
        </dependencies>
    
        <build>
            <finalName>KafkaProtobuf</finalName>
            <plugins>
                <!-- Java 编译器-->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.13.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <!-- 我们使用maven-shade 创建一个包含所有必须依赖的 fat jar -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.5.3</version>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
    

    java目录下会生成三个类,分别是Order具体类,OrderOrBuilder接口类和OrderProtobuf外部封装类。

    # 终端进入项目目录下使用命令生成源代码
    protoc --java_out=src/main/java src/main/resources/order.proto
  3. 序列化和反序列化测试。

    package com.aliyun;
    
    public class OrderTest {
        public static void main(String[] args) {
            // 创建一个Order对象并设置字段值
            Order order = Order.newBuilder()
                    .setOrderId(8513)
                    .setOrderName("flink")
                    .setOrderPrice(99.99)
                    .setOrderDate(System.currentTimeMillis())
                    .build();
    
            // 序列化为字节数组
            byte[] serializedBytes = order.toByteArray();
            System.out.println("序列化后字节长度:"+ serializedBytes.length);
            // 反序列化为新的Order对象
            Order deserializedOrder;
            try {
                deserializedOrder = Order.parseFrom(serializedBytes);
            } catch (Exception e) {
                System.err.println("反序列化失败: " + e.getMessage());
                return;
            }
    
            System.out.println("原始对象: \n" + order);
            // 验证反序列化后的对象与原始对象的字段值是否一致
            if (order.getOrderId() == deserializedOrder.getOrderId() &&
                    order.getOrderName().equals(deserializedOrder.getOrderName()) &&
                order.getOrderPrice() == deserializedOrder.getOrderPrice() &&
                order.getOrderDate() == deserializedOrder.getOrderDate()) {
                System.out.println("序列化和反序列化测试通过!");
            } else {
                System.out.println("序列化和反序列化测试失败!");
            }
    
        }
    }

步骤二:构建测试数据,写入Kafka

本示例以云消息队列Kafka为操作环境。

  1. 下载SSL根证书。如果是SSL接入点,需下载该证书。

  2. usernamepassword为实例的用户名和密码。

package com.aliyun;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import com.github.javafaker.Faker; // 引入Faker库生成随机测试数据集

public class ProtoBufToKafkaTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        // 设置接入点,请通过Kafka控制台获取对应Topic的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<bootstrap_servers>");
        // 接入协议,使用SASL_SSL协议接入。
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        // 设置SSL根证书的路径(绝对路径),该文件不能被打包到Jar当中
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "../only.4096.client.truststore.jks");
        // 根证书store的密码,保持不变。
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        // SASL鉴权方式,保持不变。
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        // 云消息队列 Kafka 版消息的序列化方式。key/value的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        // 请求的最长等待时间。单位毫秒。
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        // 设置客户端内部重试次数。
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        // 设置客户端内部重试间隔。单位毫秒。
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
        // Hostname校验改成空。
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"aliyun_flink\" password=\"123456\";");
        // 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可。
        // 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个。
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
        String topic = "test";

        // 新增:创建一个列表用于存储三条消息
        List<ProducerRecord<String, byte[]>> messages = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            byte[] value = getProtoTestData();
            ProducerRecord<String, byte[]> kafkaMessage = new ProducerRecord<>(topic, value);
            messages.add(kafkaMessage);
        }

        try {
            // 批量发送消息
            List<Future<RecordMetadata>> futures = new ArrayList<>();
            for (ProducerRecord<String, byte[]> message : messages) {
                Future<RecordMetadata> metadataFuture = producer.send(message);
                futures.add(metadataFuture);
            }
            producer.flush();

            // 同步获取Future对象的结果
            for (Future<RecordMetadata> future : futures) {
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            // 客户端内部重试之后,仍然发送失败,业务要应对此类错误。
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }

    private static byte[] getProtoTestData() {
        // 使用Faker生成随机数据
        Faker faker = new Faker();
        int orderId = faker.number().numberBetween(1000, 9999); // 随机生成订单ID
        String orderName = faker.commerce().productName(); // 随机生成订单名称
        double orderPrice = faker.number().randomDouble(2, 10, 1000); // 随机生成订单价格
        long orderDate = System.currentTimeMillis(); // 当前时间作为订单日期

        // 按照定义的数据结构,创建一个对象。
        Order order = Order.newBuilder()
                .setOrderId(orderId)
                .setOrderName(orderName)
                .setOrderPrice(orderPrice)
                .setOrderDate(orderDate)
                .build();

        // 发送数据序列化:将对象数据转化为字节数据输出
        return order.toByteArray();
    }
}
  1. 运行测试代码,向Kafkatest Topic写入三条Protobuf格式的数据。

    image

步骤三:编译打包上传

需要上传protobuf-java-3.21.7.jar和编译打包好的KafkaProtobuf.jar。

说明

VVR 8.0.9及以上版本使用内置的Protobuf数据格式。如果低于此版本,需要额外添加flink-protobuf-1.17.2.jar依赖文件。

image

步骤四:Flink SQL读取数据

  1. SQL示例参考。

    添加protobuf.message-class-name参数指定消息体对应的message类,更多protobuf参数详情请参见Flink-Protobuf

    CREATE TEMPORARY TABLE KafkaSource (
      orderId INT,
      orderName STRING,
      orderPrice DOUBLE,
      orderDate BIGINT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test',
      'properties.group.id' = 'my-group',  -- 消费者组ID
      'properties.bootstrap.servers' = 'alikafka-serverless-cn-gh647pzkq03-1000-vpc.alikafka.aliyuncs.com:9092,alikafka-serverless-cn-gh647pzkq03-2000-vpc.alikafka.aliyuncs.com:9092,alikafka-serverless-cn-gh647pzkq03-3000-vpc.alikafka.aliyuncs.com:9092', // 填入对应的Kafka broker地址。
      'format' = 'protobuf',              -- value部分时使用的数据格式
      'protobuf.message-class-name' = 'com.aliyun.Order',    -- 指定消息体对应的message类                       
      'scan.startup.mode' = 'earliest-offset'  -- 从Kafka最早分区开始读取。
    );
    
    CREATE TEMPORARY TABLE KafkaSink (
      orderId INT,
      orderName STRING,
      orderPrice DOUBLE,
      orderDate BIGINT
    ) WITH (
        'connector' = 'print'
    );
    
    INSERT INTO KafkaSink
    SELECT * FROM KafkaSource
    ;
  2. 附加文件依赖引用。

    image

  3. SQL调试。

    image

  4. 作业部署运行后查看输出。

    image

常见问题

  • 调试检查报错Bad return type

    问题原因:采用了proto3语法编译生成的源代码。

    解决方案:改为proto2语法编译后重新打包上传。

  • 上游读取后,下游写入Kafka其他Topic,日志信息中出现大量warningCORRUPT_MESSAGE

    问题原因:阿里云消息队列Kafka(非专业高写版创建的Local存储的Topic)不支持幂等和事务写入,您将无法使用Kafka结果表提供的精确一次语义exactly-once semantic功能。

    解决方案:需要在结果表中添加配置项properties.enable.idempotence=false以关闭幂等写入功能。

  • 作业运行时日志报错:NoClassDefFoundError

    问题原因:上传的protobuf-javaJar包的版本和Protocol Buffers编译的不一致。

    解决方案:检查附加依赖文件的版本是否一致,是否有文件缺少,编译打包的完整性。

  • 作业检查报错:Could not find any factory for identifier 'protobuf' that implements one of 'org.apache.flink.table.factories.EncodingFormatFactory

    问题原因:仅支持VVR 8.0.9及以上版本使用内置的Protobuf数据格式。

    解决方案:检查是否有添加flink-protobuf依赖。