本文将通过 Java SDK 示例向您展示如何通过消息队列 for Apache Kafka 的 SDK 在公网环境收发消息。

更多详情,请参见消息队列 for Apache Kafka Demo (公网版)

步骤一:准备配置

  1. 添加 Maven 依赖

    添加 Maven 依赖的示例代码如下:

    //消息队列 for Apache Kafka 服务端版本为 0.10.0.0,建议客户端版本为 0.10.2.2
    <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>0.10.2.2</version>
    </dependency>
    					
  2. SASL 配置

    消息队列 for Apache Kafka 利用 SASL 机制对客户端进行身份验证。

    1. 创建文本文件 kafka_client_jaas.conf

      请参考以下示例代码进行修改:

      KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="xxxxxxxxxxxxxxxxxxxxx"
        password="xxxxxxxxxxxxxxxxxxxxx";
      };
      							
      注意 其中usernamepassword可以从消息队列 for Apache Kafka 的实例信息中获取。
    2. 设置 kafka_client_jaas.conf 的路径

      kafka_client_jaas.conf 的路径是系统变量,有以下两种办法进行设置(这里假设 kafka_client_jaas.conf 放在/home/admin 下面,实际部署时请注意修改为自己的路径)。

      • 程序启动时,启动 JVM 参数:

        -Djava.security.auth.login.config=/home/admin/kafka_client_jaas.conf

      • 或者在代码中设置参数(需要保证在 Kafka Producer 和 Consumer 启动之前):

        System.setProperty("java.security.auth.login.config", “/home/admin/kafka_client_jaas.conf");

  3. SSL配置

    下载根证书,下载后放入某个目录下,其路径需要直接配置在代码中。

  4. 客户端配置

    1. 准备配置文件:kafka.properties

      请参考以下示例代码进行修改:

      ## 接入点,即控制台的实例详情页显示的 SSL 接入点
      bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx
      
      ## 您在控制台创建的 Topic
      topic=alikafka-topic-demo
      
      ## 您在控制台创建的 Consumer Group
      group.id=CID-alikafka-consumer-group-demo
      
      ## ssl 根证书的路径,demo 中有,请拷贝到自己的某个目录下,不能被打包到 jar 中
      ## 这里假设您的目录为/home/admin,请记得修改为自己的实际目录
      ssl.truststore.location=/home/admin/kafka.client.truststore.jks
      
      ## sasl 路径,demo 中有,请拷贝到自己的某个目录下,不能被打包到 jar 中
      ## 这里假设您的目录为/home/admin,请记得修改为自己的实际目录
      java.security.auth.login.config=/home/admin/kafka_client_jaas.conf
      							
    2. 加载配置文件

      private static Properties properties;
      
      public static void configureSasl() {
         //如果用-D 或者其它方式设置过,这里不再设置
         if (null == System.getProperty("java.security.auth.login.config")) {
             //请注意将 XXX 修改为自己的路径
             //这个路径必须是一个文件系统可读的路径,不能被打包到 jar 中
             System.setProperty("java.security.auth.login.config", getKafkaProperties().getProperty("java.security.auth.login.config"));
         }
      }
      
      public synchronized static Properties getKafkaProperties() {
         if (null != properties) {
             return properties;
         }
         //获取配置文件 kafka.properties 的内容
         Properties kafkaProperties = new Properties();
         try {
             kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
         } catch (Exception e) {
             //没加载到文件,程序要考虑退出
             e.printStackTrace();
         }
         properties = kafkaProperties;
         return kafkaProperties;
      }
      							

步骤二:使用 Java SDK 发送消息

示例代码如下:

//设置 sasl 文件的路径
JavaKafkaConfigurer.configureSasl();

//加载 kafka.properties
Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

Properties props = new Properties();
//设置接入点,即控制台的实例详情页显示的“SSL接入点”
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//设置 SSL 根证书的路径,请记得将 XXX 修改为自己的路径
//与 sasl 路径类似,该文件也不能被打包到 jar 中
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
//根证书 store 的密码,保持不变
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
//接入协议,目前支持使用 SASL_SSL 协议接入
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
//SASL 鉴权方式,保持不变
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//Kafka 消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//请求的最长等待时间
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);

//构造 Producer 对象,注意,该对象是线程安全的,一般来说,一个进程内一个 Producer 对象即可;
//如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过 5 个
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

//构造一个 Kafka 消息
String topic = kafkaProperties.getProperty("topic"); //消息所属的 Topic,请在控制台申请之后,填写在这里
String value = "this is the message's value"; //消息的内容

ProducerRecord<String, String>  kafkaMessage =  new ProducerRecord<String, String>(topic, value);

try {
    //发送消息,并获得一个 Future 对象
    Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
    //同步获得 Future 对象的结果
    RecordMetadata recordMetadata = metadataFuture.get();
    System.out.println(String.format("Produce ok: topic:%s partition:%d offset:%d value:%s",
            recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), value));
} catch (Exception e) {
    //要考虑重试
    //参考常见报错: https://help.aliyun.com/document_detail/124136.html
    System.out.println("error occurred");
    e.printStackTrace();
}
			
注意

如果 kafka-client 的版本为 2.0.0 及以上,需额外设置以下参数:

props.put("ssl.endpoint.identification.algorithm", "");
				

步骤三:使用 Java SDK 订阅消息

示例代码如下:

//设置 sasl 文件的路径
JavaKafkaConfigurer.configureSasl();

//加载kafka.properties
Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

Properties props = new Properties();
//设置接入点,即控制台的实例详情页显示的“SSL接入点”
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//设置 SSL 根证书的路径,请记得将 XXX 修改为自己的路径
//与 sasl 路径类似,该文件也不能被打包到jar中
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
//根证书 store 的密码,保持不变
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
//接入协议,目前支持使用 SASL_SSL 协议接入
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
//SASL 鉴权方式,保持不变
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//两次 poll 之间的最大允许间隔
//请不要改得太大,服务器会掐掉空闲连接,不要超过 30000
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 25000);
//每次 poll 的最大数量
//注意该值不要改得太大,如果 poll 太多数据,而不能在下次 poll 之前消费完,则会触发一次负载均衡,产生卡顿
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//当前消费实例所属的消费组,请在控制台申请之后填写
//属于同一个组的消费实例,会负载消费消息
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
//构造消息对象,也即生成一个消费实例
KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
//设置消费组订阅的 Topic,可以订阅多个
//如果 GROUP_ID_CONFIG 是一样,则订阅的 Topic 也建议设置成一样
List<String> subscribedTopics =  new ArrayList<String>();
//如果需要订阅多个 Topic,则在这里 add 进去即可
//每个 Topic 需要先在控制台进行创建
subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);

//循环消费消息
while (true){
    try {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        //必须在下次 poll 之前消费完这些数据, 且总耗时不得超过 SESSION_TIMEOUT_MS_CONFIG
        //建议开一个单独的线程池来消费消息,然后异步返回结果
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(String.format("Consume topic:%s partition:%d offset:%d value:%s",
                    record.topic(), record.partition(), record.offset(), record.value()));
        }
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (Throwable ignore) {

        }
        //参考常见报错: https://help.aliyun.com/knowledge_detail/124136.html
        e.printStackTrace();
    }
}
			
注意

如果 kafka-client 的版本为 2.0.0 及以上,需额外设置以下参数:

props.put("ssl.endpoint.identification.algorithm", "");