全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息队列 MQ

Producer 最佳实践

更新时间:2017-12-15 20:40:09

注意:以下最佳实践基于 Kafka 的 Java 客户端;对于其它语言客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。

Kafka 的发送非常简单,代码片段如下:

 Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
            topic,   \\topic
            null,    \\分区编号,这里最好为 null,交给 producer 去分配
            System.currentTimeMillis(), \\时间戳
            String.valueOf(message.hashCode()), \\ key,可以在控制台通过这个 Key 查找消息,这个 key 最好唯一;
            message)); \\value,消息内容

详细 Demo 可参见 这里

Key 和 Value

Kafka 0.10 的消息字段只有两个:Key 和 Value。为了便于追踪,重要消息最好都设置一个唯一的 Key。通过 Key 追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况;更重要的是,您可以在控制台可以根据 Key 查询消息的内容。

异步发送

需要注意的是这个接口是异步发送的; 如果你想得到发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)

线程安全

Producer 是线程安全的,且可以往任何 topic 发送消息。 一般一个应用,对应一个 producer 就足够了。

Ack

云上 Kafka 没有考虑这个参数,都认为是“all”,即所有消息同步到 slave 节点后才会返回成功的确认消息给客户端。

重试

在分布式环境下,由于网络等原因,偶尔的发送失败是常见的。这种失败有可能是消息已经发送成功,但是 ack 失败,也有可能是确实没发送成功。 此时,为了保证消息至少发送成功一次,有必要采取重试。 Kafka 有重试机制,但默认不开启。如果你想借助于 Kafka 的重试,可以配置以下参数:

  • retries : 配置成大于0的数值即可,建议不要超过3,重试太多影响性能。

云上 Kafka 是 VIP 网络架构,会主动掐掉空闲连接(一般30秒没活动),也就是说,不是一直活跃的客户端会经常收到"connection rest by peer"这样的错误,因此建议把重试机制都开启。非 Java 客户端,请参考相关文档,如果客户端没有内置实现,可以考虑在代码中手动重试。

Batch

Batch 的基本思路是:把消息缓存在内存中,并进行打包发送。 Kafka 通过 batch 来提高吞吐,但同时也会增加延迟,生产时应该对两者予以权衡。 在构建 producer 时,需要考虑以下两个参数:

  • batch.size : 发往每个 partition 的消息个数缓存量达到这个数值时,就会触发一次网络请求,把消息真正发往服务器。
  • linger.ms : 每个消息待在缓存中的最大时间,超过这个时间,就会忽略 batch.size 的限制,立即把消息发往服务器。

由此可见,Kafka 什么时候把消息真正发往服务器,是通过上面两个参数共同决定的。

batch.size 有助于提高吞吐,linger.ms 有助于控制延迟。您可以根据具体业务进行调整。

OOM

由上可知,Kafka 会缓存消息并打包发送,如果缓存太多,则有可能造成 OOM。

  • buffer.memory : 所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略 batch.sizelinger.ms 的限制。

buffer.memory 的默认数值是32M,对于单个 producer 来说,可以保证足够的性能。 需要注意的是,如果你在同一个 JVM 中启动多个 producer,那么每个 producer 都有可能占用32M 缓存空间,此时便有可能触发 OOM。

在生产时,一般没有必要启动多个 producer;如果特殊情况需要,则需要考虑buffer.memory的大小,避免触发 OOM。

分区顺序

单个分区内,消息是按照发送顺序储存的,是基本有序的。 但云上 Kafka 并不保证单个分区内绝对有序,所以在某些情况下,会发生少量消息乱序,比如: 云上 Kafka 为了提高可用性,某个分区挂掉后把消息 failover 到其它分区。

其它

如果你有其它使用方面的困惑,可通过以下方式反馈:

本文导读目录