Schema Registry管理

更新时间:

流数据服务Confluent使用Schema Registry管理Schema。本文将向您介绍Schema Registry的基本操作。更多关于Schema Registry的信息请参见官方文档

前提条件

  • 请确保您已经购买流数据服务Confluent,详情请参见集群开通

  • 请确保获取Kafka集群和Schema Registry就群相关的访问权限,详情请参见Control Center页面进行RBAC授权

  • 确保安装Java 1.8 or 1.11环境。

  • 确保安装Maven以编译客户端演示程序。

一、设置客户端环境

  • 安装maven example示例代码,并使用examples/clients/avro作为演示的项目路径。

git clone https://github.com/confluentinc/examples.git
#使用该路径下的maven项目进行演示
cd examples/clients/avro
git checkout 7.1.0-post
  • 创建客户端配置文件

在$HOME/.confluent/java.config配置文件中,配置如下配置项。

bootstrap.servers=<your broker access address>
security.protocol=SASL_SSL
#your user should have the authority to access the topic
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<user>' password='<secret>';
sasl.mechanism=PLAIN
schema.registry.url=<your schema registry access address>
basic.auth.credentials.source=USER_INFO
#your user should have the authority to access the schema registry
basic.auth.user.info=<user>:<user-secret>

二、使用Schema Registry

1. 创建Topic

  • 登录Control Center控制台。

imageimage
  • 选择Topics,选择Add a topic。

  • 使用默认配置创建transactions演示用的topic。

  • 修改confluent.value.schema.validation=true,启用schema验证消息内容格式。

2. 定义Schema

  • 使用cat命令查看maven example项目如下路径的文件内容,以下使用该avro格式内容创建schema。

cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc

返回结果:
{
 "namespace": "io.confluent.examples.clients.basicavro",
 "type": "record",
 "name": "Payment",
 "fields": [
     {"name": "id", "type": "string"},
     {"name": "amount", "type": "double"}
 ]
}
  • 点击Set a schema按钮。

  • 选择Avro格式,将上述Payment.avsc文本填入文本框,点击Create按钮,会弹出Schema Updated提示信息,表明schema已经创建成功。

image

3. Kafka Producer/Consumer Client程序

以下示例使用 Maven 来配置项目和管理依赖项。完整的pom.xml示例,请参考这里pom.xml

创建Producer

在Producer客户端应用程序中,需要设置Kafka相关配置参数。具体的配置参见设置客户端环境章节。

构造生产者时,需要将消息序列化方式指定为KafkaAvroSerializer类,并将消息值类配置为Payment类。

示例代码如下:

...
import io.confluent.kafka.serializers.KafkaAvroSerializer;
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
...
KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props));
final Payment payment = new Payment(orderId, 1000.00d);
final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
producer.send(record);
...

完整Java 生产者示例代码,请参见ProducerExample

说明

pom.xml包含avro-maven-plugin,Payment类二进制文件是在编译期间自动生成的。

运行Producer

进入example项目的examples/clients/avro目录下,执行以下步骤:

  1. 编译项目。

mvn clean compile package
  1. 查看transactions topic的Messages页面。此时,应该没有消息记录。

  1. 执行Producer代码。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
      -Dexec.args="$HOME/.confluent/java.config"

    该命令使用了之前准备的客户端配置文件$HOME/.confluent/java.config。命令执行完后,会输出如下结果:

    ...
    Successfully produced 10 messages to a topic called transactions
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    ...
  1. 查看结果。此时,您应该可以在Messages页面查看到上一步骤写入到消息。

创建Consumer

在Consumer客户端应用程序中,同样需要设置Kafka相关配置参数。具体的配置详情请参见设置客户端环境。构造消费者时,需要将消息反序列化方式指定为KafkaAvroDeSerializer类,并将消息值类配置为Payment类。示例代码如下:

...
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
...
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
...
KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props));
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
  ConsumerRecords<String, Payment> records = consumer.poll(100);
  for (ConsumerRecord<String, Payment> record : records) {
    String key = record.key();
    Payment value = record.value();
  }
}
...

完整Java消费者示例代码,请参见ConsumerExample.。

运行Consumer代码

  1. 编译项目。

mvn clean compile package
  1. 运行ConsumerExample(您应先执行ProducerExample代码)。

mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
  -Dexec.args="$HOME/.confluent/java.config"

执行成功后,查看输出:

...
key = id0, value = {"id": "id0", "amount": 1000.0}
key = id1, value = {"id": "id1", "amount": 1000.0}
key = id2, value = {"id": "id2", "amount": 1000.0}
key = id3, value = {"id": "id3", "amount": 1000.0}
key = id4, value = {"id": "id4", "amount": 1000.0}
key = id5, value = {"id": "id5", "amount": 1000.0}
key = id6, value = {"id": "id6", "amount": 1000.0}
key = id7, value = {"id": "id7", "amount": 1000.0}
key = id8, value = {"id": "id8", "amount": 1000.0}
key = id9, value = {"id": "id9", "amount": 1000.0}
...
  1. 按Ctrl+C停止执行Consumer代码。