本文介绍消息队列Kafka版发布者的最佳实践,帮助您减少发送消息出错的可能性。本文最佳实践基于Java客户端。对于其它语言的客户端,其基本概念与思想是相通的,但实现细节可能存在差异。

发送消息

发送消息的示例代码如下:

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
        topic,   //消息主题。
        null,   //分区编号。建议为null,由Producer分配。
        System.currentTimeMillis(),   //时间戳。
        String.valueOf(value.hashCode()),   //消息键。
        value   //消息值。
));

完整示例代码请参见SDK概述

Key和Value

0.10.2.2版本的Kafka的消息字段只有两个:Key和Value。

  • Key:消息的标识。
  • Value:消息内容。

为了便于追踪,请为消息设置一个唯一的Key。您可以通过Key追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况。

失败重试

分布式环境下,由于网络等原因偶尔发送失败是常见的。导致这种失败的原因可能是消息已经发送成功,但是Ack失败,也有可能是确实没发送成功。

消息队列Kafka版是VIP网络架构,会主动断开空闲连接(30秒没活动),因此,不是一直活跃的客户端会经常收到 "connection rest by peer" 错误,建议重试消息发送。

您可以根据业务需求,设置以下重试参数:
  • retries,重试次数,建议设置为3
  • retry.backoff.ms,重试间隔,建议设置为1000

异步发送

发送接口是异步的,如果您想得到发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)

线程安全

Producer是线程安全的,且可以往任何Topic发送消息。通常情况下,一个应用对应一个Producer就足够了。

Acks

Acks的说明如下:

  • acks=0:无需服务端的Response、性能较高、丢数据风险较大。
  • acks=1:服务端主节点写成功即返回Response、性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。

  • acks=all:服务端主节点写成功且备节点同步成功才返回Response、性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。

一般建议选择acks=1,重要的服务可以设置acks=all

Batch

Batch的基本思路是:把消息缓存在内存中,并进行打包发送。Kafka通过Batch来提高吞吐,但同时也会增加延迟,生产时应该对两者予以权衡。 在构建Producer时,需要考虑以下两个参数:

  • batch.size : 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)达到这个数值时,就会触发一次网络请求,然后客户端把消息真正发往服务器。
  • linger.ms : 每条消息待在缓存中的最长时间。若超过这个时间,就会忽略batch.size的限制,然后客户端立即把消息发往服务器。

因此,Kafka客户端什么时候把消息真正发往服务器是由batch.sizelinger.ms共同决定的。您可以根据具体业务需求进行调整。

OOM

结合Kafka的Batch设计思路,Kafka会缓存消息并打包发送,如果缓存太多,则有可能造成OOM(Out of Memory)。

  • buffer.memory : 所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略batch.sizelinger.ms的限制。
  • buffer.memory的默认数值是32 MB,对于单个Producer来说,可以保证足够的性能。 需要注意的是,如果您在同一个JVM中启动多个Producer,那么每个Producer都有可能占用32 MB缓存空间,此时便有可能触发OOM。
  • 在生产时,一般没有必要启动多个Producer;如果特殊情况需要,则需要考虑buffer.memory的大小,避免触发OOM。

分区顺序

单个分区(Partition)内,消息是按照发送顺序储存的,是基本有序的。

默认情况下,消息队列Kafka版为了提升可用性,并不保证单个分区内绝对有序,在升级或者宕机时,会发生少量消息乱序(某个分区挂掉后把消息Failover到其它分区)。

对于包年包月计费模式下的专业版实例,如果业务要求分区保证严格有序,请在创建Topic时指定保序。