步骤一:准备示例代码
执行以下命令,克隆示例代码,并切换到7.9.0-post
分支。
git clone https://github.com/confluentinc/examples.git
cd examples/clients/avro
git checkout 7.9.0-post
在$HOME/.confluent/
路径下创建客户端配置文件java.config
。其中 $HOME
为您的用户主目录。在配置文件中,配置如下配置项。
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000
# Best practice for Kafka producer to prevent data loss
acks=all
# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
参数 | 描述 | 示例值 |
BROKER_ENDPOINT | KAFKA服务的链接地址。 服务地址在云消息队列 Confluent 版控制台访问链接和接口页面查看。若需要使用公网访问,则需要开启公网,其他安全访问配置请参见网络访问与安全设置。 | pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092 |
CLUSTER_API_KEY | 云消息队列 Confluent 版控制台用户管理页面中LDAP用户名和密码。 在测试过程中,可以暂时使用root账号及其密码。如需使用其他用户,则需在云消息队列 Confluent 版控制台中创建该用户,并为其授予Kafka cluster相应的权限。创建用户和授权,请参见用户管理和授权。 | root |
CLUSTER_API_SECRET | ****** |
SR_ENDPOINT | SCHEMA_REGISTRY服务的链接地址。 服务地址在云消息队列 Confluent 版控制台访问链接和接口页面查看。若需要使用公网访问,则需要开启公网,其他安全访问配置请参见网络访问与安全设置。 | pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443 |
SR_API_KEY | 云消息队列 Confluent 版控制台用户管理页面中LDAP用户名和密码。 在测试过程中,可以暂时使用root账号及其密码。如需使用其他用户,则需在云消息队列 Confluent 版控制台中创建该用户,并为其授予Schema Registry相应的权限。创建用户和授权,请参见用户管理和授权。 | root |
SR_API_SECRET | ****** |
步骤二:创建Topic
说明 示例代码中的Topic参数值设定为transactions
。在测试时,可以直接创建名为transactions
的Topic。如果需要使用其他的Topic,则需相应更改代码中的参数值。
登录Control Center控制台,在Home页面单击controlcenter.clusterk卡片,进入到Cluster overview页面。

在左侧导航栏,单击Topics,然后在Topic列表页面单击+ Add topic。

在New topic页面,设置Topic名称和分区数,然后单击Create with defaults。

创建完成后,进入到Topic详情页面。

步骤三:开启Schema格式校验
在Topic详情页面,单击Configuration页签,然后单击Edit settings。

然后单击Switch to expert mode。

将confluent_value_schema_validation字段设置为true,然后单击Save changes,启用Schema验证消息内容格式。启用后发送和消费数据时将进行格式校验。

步骤四:创建Schema
进入项目的examples/clients/avro目录下,执行以下命令,查看Payment.avsc
文件内容。
cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc
返回结果
{
"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
在Control Center控制台Topic详情页面,单击Schema,然后单击Set a schema。
在Schema页签,单击Avro,将上述Payment.avsc文本填入文本框,单击Create。

步骤五:发送/消费消息
发送消息
创建Schema的校验格式为Avro,则需要在发送消息时将消息序列化方式指定为KafkaAvroSerializer类,并将消息值类配置为Payment类。
示例代码如下:
生产消息示例代码
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;
public class ProducerExample {
private static final String TOPIC = "transactions";
private static final Properties props = new Properties();
private static String configFile;
@SuppressWarnings("InfiniteLoopStatement")
public static void main(final String[] args) throws IOException {
if (args.length < 1) {
// Backwards compatibility, assume localhost
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
} else {
// Load properties from a local configuration file
// Create the configuration file (e.g. at '$HOME/.confluent/java.config') with configuration parameters
// to connect to your Kafka cluster, which can be on your local host, Confluent Cloud, or any other cluster.
// Documentation at https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
configFile = args[0];
if (!Files.exists(Paths.get(configFile))) {
throw new IOException(configFile + " not found.");
} else {
try (InputStream inputStream = new FileInputStream(configFile)) {
props.load(inputStream);
}
}
}
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) {
for (long i = 0; i < 10; i++) {
final String orderId = "id" + Long.toString(i);
final Payment payment = new Payment(orderId, 1000.00d);
final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
producer.send(record);
Thread.sleep(1000L);
}
producer.flush();
System.out.printf("Successfully produced 10 messages to a topic called %s%n", TOPIC);
} catch (final SerializationException e) {
e.printStackTrace();
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
}
发送消息的操作如下:
进入项目的examples/clients/avro目录下,执行以下命令编译项目。
mvn clean compile package
编译完成后,执行以下代码,发送消息。
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
-Dexec.args="$HOME/.confluent/java.config"
执行发送命令后,如下结果则表明发送成功。
...
Successfully produced 10 messages to a topic called transactions
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
...
在Control Center控制台可以查看到已发送的消息。

消费消息
创建Schema的校验格式为Avro,则需要在消费消息时将消息反序列化方式指定为KafkaAvroDeSerializer类,并将消息值类配置为Payment类。
示例代码如下:
消费消息示例代码
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;
public class ConsumerExample {
private static final String TOPIC = "transactions";
private static final Properties props = new Properties();
private static String configFile;
@SuppressWarnings("InfiniteLoopStatement")
public static void main(final String[] args) throws IOException {
if (args.length < 1) {
// Backwards compatibility, assume localhost
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
} else {
// Load properties from a local configuration file
// Create the configuration file (e.g. at '$HOME/.confluent/java.config') with configuration parameters
// to connect to your Kafka cluster, which can be on your local host, Confluent Cloud, or any other cluster.
// Documentation at https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
configFile = args[0];
if (!Files.exists(Paths.get(configFile))) {
throw new IOException(configFile + " not found.");
} else {
try (InputStream inputStream = new FileInputStream(configFile)) {
props.load(inputStream);
}
}
}
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-payments");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
final ConsumerRecords<String, Payment> records = consumer.poll(Duration.ofMillis(100));
for (final ConsumerRecord<String, Payment> record : records) {
final String key = record.key();
final Payment value = record.value();
System.out.printf("key = %s, value = %s%n", key, value);
}
}
}
}
}
消费消息的操作如下:
进入项目的examples/clients/avro目录下,执行以下命令编译项目。
mvn clean compile package
执行以下代码,消费消息。
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
-Dexec.args="$HOME/.confluent/java.config"
运行消费命令后,如下结果则表明消息已被成功消费。
...
key = id0, value = {"id": "id0", "amount": 1000.0}
key = id1, value = {"id": "id1", "amount": 1000.0}
key = id2, value = {"id": "id2", "amount": 1000.0}
key = id3, value = {"id": "id3", "amount": 1000.0}
key = id4, value = {"id": "id4", "amount": 1000.0}
key = id5, value = {"id": "id5", "amount": 1000.0}
key = id6, value = {"id": "id6", "amount": 1000.0}
key = id7, value = {"id": "id7", "amount": 1000.0}
key = id8, value = {"id": "id8", "amount": 1000.0}
key = id9, value = {"id": "id9", "amount": 1000.0}
...