全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

Producer 最佳实践

更新时间:2017-09-29 10:03:17   分享:   

KafkaProducer最佳实践

注意:以下最佳实践基于 Kafka 的 Java 客户端;对于其它语言客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。

Kafka 的发送非常简单,代码片段如下:

  1. Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
  2. topic, \\topic
  3. null, \\分区编号,这里最好为null,交给producer去分配
  4. System.currentTimeMillis(), \\时间戳
  5. String.valueOf(message.hashCode()), \\ key,可以在控制台通过这个Key查找消息,这个key最好唯一;
  6. message)); \\value,消息内容

详细 Demo 可参见 这里

异步发送

需要注意的是这个接口是异步发送的;如果你想得到发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)

线程安全

Producer 是线程安全的,且可以往任何 topic 发送消息。一般一个应用,对应一个 producer 就足够了。

Ack

云上 Kafka 没有考虑这个参数,都认为是“all”,即所有消息同步到 slave 节点后才会返回成功的确认消息给客户端。

重试

在分布式环境下,由于网络等原因,偶尔的发送失败是常见的。这种失败有可能是消息已经发送成功,但是ack失败,也有可能是确实没发送成功。此时,为了保证消息至少发送成功一次,有必要采取重试。Kafka有重试机制,但默认不开启。如果你想借助于Kafka的重试,可以配置以下参数:

  • retries : 配置成大于0的数值即可,建议不要超过3,重试太多影响性能。

云上Kafka是VIP网络架构,会主动掐掉空闲连接(一般30没活动),也就是说,不是一直活跃的客户端会经常收到”connection rest by peer”这样的错误,因此建议把重试机制都开启。非Java客户端,请参考相关文档,如果客户端没有内置实现,可以考虑在代码中手动重试。

Batch

Batch的基本思路是:把消息缓存在内存中,并进行打包发送。Kafka 通过 batch 来提高吞吐,但同时也会增加延迟,生产时应该对两者予以权衡。在构建 producer 时,需要考虑以下两个参数:

  • batch.size : 发往每个 partition 的消息个数缓存量达到这个数值时,就会触发一次网络请求,把消息真正发往服务器。
  • linger.ms : 每个消息待在缓存中的最大时间,超过这个时间,就会忽略 batch.size 的限制,立即把消息发往服务器。

由此可见,Kafka 什么时候把消息真正发往服务器,是通过上面两个参数共同决定的。

batch.size 有助于提高吞吐,linger.ms 有助于控制延迟。您可以根据具体业务进行调整。

OOM

由上可知,Kafka 会缓存消息并打包发送,如果缓存太多,则有可能造成 OOM。

  • buffer.memory : 所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略 batch.sizelinger.ms 的限制。

buffer.memory 的默认数值是32M,对于单个producer来说,可以保证足够的性能。需要注意的是,如果你在同一个JVM中启动多个producer,那么每个producer都有可能占用32M缓存空间,此时便有可能触发OOM。

在生产时,一般没有必要启动多个producer;如果特殊情况需要,则需要考虑buffer.memory的大小,避免触发OOM。

分区顺序

单个分区内,消息是按照发送顺序储存的,是基本有序的。但云上Kafka并不保证单个分区内绝对有序,所以在某些情况下,会发生少量消息乱序,比如:云上Kafka为了提高可用性,某个分区挂掉后把消息failover到其它分区。

其它

如果你有其它使用方面的困惑,可通过以下方式反馈:

本文导读目录
本文导读目录
以上内容是否对您有帮助?