本文介绍云消息队列 Kafka 版发布者的最佳实践,帮助您降低发送消息的错误率。本文最佳实践基于Java客户端。对于其他语言的客户端,其基本概念与思想是相通的,但实现细节可能存在差异。
发送消息
发送消息的示例代码如下:
Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
topic, //消息主题。
null, //分区编号。建议为null,由Producer分配。
System.currentTimeMillis(), //时间戳。
String.valueOf(value.hashCode()), //消息键。
value //消息值。
));
完整示例代码,请参见SDK概述。
Key和Value
0.10.2版本的云消息队列 Kafka 版的消息有以下两个字段:
Key:消息的标识。
Value:消息内容。
为了便于追踪,请为消息设置一个唯一的Key。您可以通过Key追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况。
如果消息发送量较大,建议不要设置Key,并使用黏性分区策略。黏性分区策略详情,请参见黏性分区策略。
在0.11.0以及之后的版本,云消息队列 Kafka 版开始支持headers,如果您需要使用headers,需要将服务端升级至2.2.0版本。
失败重试
分布式环境下,由于网络等原因偶尔发送失败是常见的。导致这种失败的原因可能是消息已经发送成功,但是ACK失败,也有可能是确实没发送成功。
云消息队列 Kafka 版是VIP网络架构,长时间不进行通信连接会被主动断开,因此,不是一直活跃的客户端会经常收到connection reset by peer
错误,建议重试消息发送。
您可以根据业务需求,设置以下重试参数:
retries
:消息发送失败时的重试次数。retry.backoff.ms
,消息发送失败的重试间隔,建议设置为1000
,单位:毫秒。
异步发送
发送接口是异步的,如果您想接收发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。
线程安全
Producer是线程安全的,且可以往任何Topic发送消息。通常情况下,一个应用对应一个Producer。
Acks
Acks的说明如下:
acks=0
:无需服务端的Response、性能较高、丢数据风险较大。acks=1
:服务端主节点写成功即返回Response、性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。acks=all
:服务端主节点写成功且备节点同步成功才返回Response、性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。
为了提升发送性能, 建议设置为acks=1
。
提升发送性能(减少碎片化发送请求)
一般情况下,一个云消息队列 Kafka 版Topic会有多个分区。云消息队列 Kafka 版Producer客户端在向服务端发送消息时,需要先确认往哪个Topic的哪个分区发送。我们给同一个分区发送多条消息时,Producer客户端将相关消息打包成一个Batch,批量发送到服务端。Producer客户端在处理Batch时,是有额外开销的。一般情况下,小Batch会导致Producer客户端产生大量请求,造成请求队列在客户端和服务端的排队,并造成相关机器的CPU升高,从而整体推高了消息发送和消费延迟。一个合适的Batch大小,可以减少发送消息时客户端向服务端发起的请求次数,在整体上提高消息发送的吞吐和延迟。
Batch机制,云消息队列 Kafka 版Producer端主要通过两个参数进行控制:
batch.size
: 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)。达到设置的数值时,就会触发一次网络请求,然后Producer客户端把消息批量发往服务器。如果batch.size
设置过小,有可能影响发送性能和稳定性。建议保持默认值16384。单位:字节。linger.ms
: 每条消息在缓存中的最长时间。若超过这个时间,Producer客户端就会忽略batch.size
的限制,立即把消息发往服务器。建议根据业务场景, 设置linger.ms
在100~1000之间。单位:毫秒。
因此,云消息队列 Kafka 版Producer客户端什么时候把消息批量发送至服务器是由batch.size
和linger.ms
共同决定的。您可以根据具体业务需求进行调整。为了提升发送的性能,保障服务的稳定性, 建议您设置batch.size=16384
和linger.ms=1000
。
黏性分区策略
只有发送到相同分区的消息,才会被放到同一个Batch中,因此决定一个Batch如何形成的一个因素是云消息队列 Kafka 版Producer端设置的分区策略。云消息队列 Kafka 版Producer允许通过设置Partitioner的实现类来选择适合自己业务的分区。在消息指定Key的情况下,云消息队列 Kafka 版Producer的默认策略是对消息的Key进行哈希,然后根据哈希结果选择分区,保证相同Key的消息会发送到同一个分区。
在消息没有指定Key的情况下,云消息队列 Kafka 版2.4版本之前的默认策略是循环使用主题的所有分区,将消息以轮询的方式发送到每一个分区上。但是,这种默认策略Batch的效果会比较差,在实际使用中,可能会产生大量的小Batch,从而使得实际的延迟增加。鉴于该默认策略对无Key消息的分区效率低问题,云消息队列 Kafka 版在2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。
黏性分区策略主要解决无Key消息分散到不同分区,造成小Batch问题。其主要策略是如果一个分区的Batch完成后,就随机选择另一个分区,然后后续的消息尽可能地使用该分区。这种策略在短时间内看,会将消息发送到同一个分区,如果拉长整个运行时间,消息还是可以均匀地发布到各个分区上的。这样可以避免消息出现分区倾斜,同时还可以降低延迟,提升服务整体性能。
如果您使用的云消息队列 Kafka 版Producer客户端是2.4及以上版本,默认的分区策略就采用黏性分区策略。如果您使用的Producer客户端版本小于2.4,可以根据黏性分区策略原理,自行实现分区策略,然后通过参数partitioner.class
设置指定的分区策略。
关于黏性分区策略实现,您可以参考如下Java版代码实现。该代码的实现逻辑主要是根据一定的时间间隔,切换一次分区。
public class MyStickyPartitioner implements Partitioner {
// 记录上一次切换分区时间。
private long lastPartitionChangeTimeMillis = 0L;
// 记录当前分区。
private int currentPartition = -1;
// 分区切换时间间隔,可以根据实际业务选择切换分区的时间间隔。
private long partitionChangeTimeGap = 100L;
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取所有分区信息。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
int availablePartitionSize = availablePartitions.size();
// 判断当前可用分区。
if (availablePartitionSize > 0) {
handlePartitionChange(availablePartitionSize);
return availablePartitions.get(currentPartition).partition();
} else {
handlePartitionChange(numPartitions);
return currentPartition;
}
} else {
// 对于有key的消息,根据key的哈希值选择分区。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private void handlePartitionChange(int partitionNum) {
long currentTimeMillis = System.currentTimeMillis();
// 如果超过分区切换时间间隔,则切换下一个分区,否则还是选择之前的分区。
if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
|| currentPartition < 0 || currentPartition >= partitionNum) {
lastPartitionChangeTimeMillis = currentTimeMillis;
currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
}
}
public void close() {}
}
OOM
结合云消息队列 Kafka 版的Batch设计思路,云消息队列 Kafka 版会缓存消息并打包发送,如果缓存太多,则有可能造成OOM(Out of Memory)。
buffer.memory
: 发送的内存池大小。如果内存池设置过小,则有可能导致申请内存耗时过长,从而影响发送性能,甚至导致发送超时。建议buffer.memory ≧ batch.size * 分区数 * 2
。单位:字节。buffer.memory
的默认数值是32 MB,对于单个Producer而言,可以保证足够的性能。重要如果您在同一个JVM中启动多个Producer,那么每个Producer都有可能占用32 MB缓存空间,此时便有可能触发OOM。
在生产时,一般没有必要启动多个Producer;如有特殊情况需要,则需要考虑
buffer.memory
的大小,避免触发OOM。
分区顺序
单个分区(Partition)内,消息是按照发送顺序储存的,是基本有序的。
默认情况下,云消息队列 Kafka 版为了提升可用性,并不保证单个分区内绝对有序,在升级或者宕机时,会发生少量消息乱序(某个分区挂掉后把消息Failover到其它分区)。
如果业务要求分区保证严格有序,请在创建Topic时选择使用Local存储。