Schema Registry管理

云消息队列 Confluent 版使用Schema Registry管理Schema。本文将向您介绍Schema Registry的基本操作。

前提条件

  • 购买云消息队列 Confluent 版,详情请参见购买和部署实例

  • 获取Kafka集群和Schema Registry集群相关的访问权限,详情请参见RBAC授权

  • 安装Java 1.8或者1.11环境。具体操作,请参见安装JDK

  • 安装Maven以编译客户端演示程序。具体操作,请参见安装Maven

设置客户端环境

  1. 执行以下命令,安装maven example示例代码,并使用examples/clients/avro作为项目路径。

    git clone https://github.com/confluentinc/examples.git
    #使用该路径下的maven项目进行演示
    cd examples/clients/avro
    git checkout 7.1.0-post
  2. $HOME/.confluent/路径下创建客户端配置文件java.config。在配置文件中,配置如下配置项。

    bootstrap.servers=<your broker access address>
    security.protocol=SASL_SSL
    #your user should have the authority to access the topic
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<user>' password='<secret>';
    sasl.mechanism=PLAIN
    schema.registry.url=<your schema registry access address>
    basic.auth.credentials.source=USER_INFO
    #your user should have the authority to access the schema registry
    basic.auth.user.info=<user>:<user-secret>

创建Topic

  1. 在主页的左侧导航栏,单击Topics,然后单击右上角的Add topic

  2. New Topic页面,设置Topic名称和分区数,单击Create with defaults

  3. 在Topics页面,找到创建好的Topic,单击Topic名称进入Topic详情页。

  4. 在Topic详情页,单击Messages页签,然后单击Produce a new message to this topic,向此Topic发送JSON格式的测试数据。image

开启Schema格式校验

  1. 在目标Topic详情页面,单击Configuration页签。

  2. 依次单击Configuration > Edit settings > Switch to expert mode,将confluent_value_schema_validation字段设置为true,启用Schema验证消息内容格式。启用后发送和消费数据时将进行格式校验。

创建Schema

  1. 执行cat命令,查看maven example项目如下路径的文件内容。

    cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc

    返回结果

    {
     "namespace": "io.confluent.examples.clients.basicavro",
     "type": "record",
     "name": "Payment",
     "fields": [
         {"name": "id", "type": "string"},
         {"name": "amount", "type": "double"}
     ]
    }
  2. 在Topic详情页面,单击Schema,然后单击Set a schema

  3. Schema页签,单击Avro,将上述Payment.avsc文本填入文本框,单击Create

Kafka Producer/Consumer Client程序

以下示例使用Maven来配置项目和管理依赖项。完整的pom.xml示例,请参见pom.xml

创建Producer

在Producer客户端应用程序中,需要设置Kafka相关配置参数。具体的配置请参见上文设置客户端环境

构造生产者时,需要将消息序列化方式指定为KafkaAvroSerializer类,并将消息值类配置为Payment类。

示例代码如下:

package io.confluent.examples.clients.basicavro;

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;

public class ProducerExample {

    private static final String TOPIC = "transactions";
    private static final Properties props = new Properties();
    private static String configFile;

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) throws IOException {

        if (args.length < 1) {
          // Backwards compatibility, assume localhost
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        } else {
          // Load properties from a local configuration file
          // Create the configuration file (e.g. at '$HOME/.confluent/java.config') with configuration parameters
          // to connect to your Kafka cluster, which can be on your local host, Confluent Cloud, or any other cluster.
          // Documentation at https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
          configFile = args[0];
          if (!Files.exists(Paths.get(configFile))) {
            throw new IOException(configFile + " not found.");
          } else {
            try (InputStream inputStream = new FileInputStream(configFile)) {
              props.load(inputStream);
            }
          }
        }

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

        try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) {

            for (long i = 0; i < 10; i++) {
                final String orderId = "id" + Long.toString(i);
                final Payment payment = new Payment(orderId, 1000.00d);
                final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
                producer.send(record);
                Thread.sleep(1000L);
            }

            producer.flush();
            System.out.printf("Successfully produced 10 messages to a topic called %s%n", TOPIC);

        } catch (final SerializationException e) {
            e.printStackTrace();
        } catch (final InterruptedException e) {
            e.printStackTrace();
        }

    }

}

更多信息,请参见ProducerExample

说明

pom.xml包含avro-maven-plugin,Payment类二进制文件是在编译期间自动生成的。

运行Producer

进入example项目的examples/clients/avro目录下,执行以下步骤。

  1. 执行以下命令,编译项目。

    mvn clean compile package
  2. 查看Topic详情页面的Messages页签。此时应该没有消息记录。

  3. 执行Producer代码。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
      -Dexec.args="$HOME/.confluent/java.config"

    该命令使用了之前准备的客户端配置文件$HOME/.confluent/java.config。命令执行完成后,会输出如下结果:

    ...
    Successfully produced 10 messages to a topic called transactions
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    ...
  4. 查看结果。此时,您应该可以在Messages页面查看到上一步骤写入到消息。

创建Consumer

在Consumer客户端应用程序中,同样需要设置Kafka相关配置参数。具体的配置详情请参见上文设置客户端环境。构造消费者时,需要将消息反序列化方式指定为KafkaAvroDeSerializer类,并将消息值类配置为Payment类。示例代码如下:

package io.confluent.examples.clients.basicavro;

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;

public class ConsumerExample {

    private static final String TOPIC = "transactions";
    private static final Properties props = new Properties();
    private static String configFile;

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) throws IOException {

        if (args.length < 1) {
          // Backwards compatibility, assume localhost
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        } else {
          // Load properties from a local configuration file
          // Create the configuration file (e.g. at '$HOME/.confluent/java.config') with configuration parameters
          // to connect to your Kafka cluster, which can be on your local host, Confluent Cloud, or any other cluster.
          // Documentation at https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
          configFile = args[0];
          if (!Files.exists(Paths.get(configFile))) {
            throw new IOException(configFile + " not found.");
          } else {
            try (InputStream inputStream = new FileInputStream(configFile)) {
              props.load(inputStream);
            }
          }
        }

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-payments");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 

        try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList(TOPIC));

            while (true) {
                final ConsumerRecords<String, Payment> records = consumer.poll(Duration.ofMillis(100));
                for (final ConsumerRecord<String, Payment> record : records) {
                    final String key = record.key();
                    final Payment value = record.value();
                    System.out.printf("key = %s, value = %s%n", key, value);
                }
            }

        }
    }

}

更多信息,请参见ConsumerExample.。

运行Consumer代码

  1. 执行以下命令,编译项目。

    mvn clean compile package
  2. 运行ConsumerExample。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
      -Dexec.args="$HOME/.confluent/java.config"

    执行成功后,查看输出:

    ...
    key = id0, value = {"id": "id0", "amount": 1000.0}
    key = id1, value = {"id": "id1", "amount": 1000.0}
    key = id2, value = {"id": "id2", "amount": 1000.0}
    key = id3, value = {"id": "id3", "amount": 1000.0}
    key = id4, value = {"id": "id4", "amount": 1000.0}
    key = id5, value = {"id": "id5", "amount": 1000.0}
    key = id6, value = {"id": "id6", "amount": 1000.0}
    key = id7, value = {"id": "id7", "amount": 1000.0}
    key = id8, value = {"id": "id8", "amount": 1000.0}
    key = id9, value = {"id": "id9", "amount": 1000.0}
    ...

相关文档

更多关于Schema Registry的信息,请参见Schema Registry Overview