本文为您介绍如何在Kafka集群运维中对Kafka运维流量进行限制,以避免由于运维流量影响到正常的业务流量。本文以EMR Kafka 2.4.1版本为例。

背景信息

由于运维操作而出现的IO流量称为运维流量。在以下运维场景中需要对运维流量进行限制:
  • Partition Reassign场景。
  • 节点内副本移动到不同目录的场景。
  • 集群Broker恢复时,副本数据同步的场景。

注意事项

  • 由于Kafka流量构成不同、业务场景不同和运维场景不同,您需要综合判断是否需要对运维流量进行限制。
  • 限流的阈值需要根据具体的业务场景来确定。通常运维流量阈值过小将导致运维操作无法完成,运维流量阈值过大将造成IO争抢或带宽满载等问题,从而影响到正常业务流量,您应该合理的评估限流阈值。
  • 限流阈值设定需要考虑Topic业务流量的大小、业务可以承受的延迟、业务场景是否允许Kafka服务中断、Kafka集群自身的磁盘IO与网络IO的带宽能力等因素。
  • 通常情况下,建议您在业务低峰期间进行此类运维操作。

Kafka运维流量限制

Kafka限流相关参数

参数描述
leader.replication.throttled.replicasTopic级别,表示需要限流的分区leader副本列表。

格式为[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... 或者用星号(*)表示该Topic的所有leader副本限流。

follower.replication.throttled.replicasTopic级别,表示需要限流的分区follower副本列表。

格式为[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... 或者用星号(*)表示该Topic的所有follwer副本限流。

leader.replication.throttled.rateBroker级别,leader节点复制读流量。
follower.replication.throttled.rateBroker级别,follower节点复制写流量。

限流参数查看方式

您可以通过kafka-configs.sh命令来查看限流参数的值。

  • 查看指定节点的Broker参数。
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name <your broker id> --describe
  • 查看指定Topic的参数。
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name <your topic name> --describe

Partition Reassign场景限流

重要
  • 限流速度不能过小,如果限流速度过小,将不能触发实际的reassign复制过程。
  • 限流参数不会对正常的副本fetch流量进行限速。
  • 任务完成后,您需要通过verify参数移除TopicBroker上的限速参数配置。
  • 如果刚开始已经设置了throttle参数,则可以通过execute命令再次修改throttle参数。
  • 如果刚开始没有设置throttle参数,则需要使用kafka-configs.sh命令修改Topic上的leader.replication.throttled.replicasfollower.replication.throttled.replicas参数、修改Broker上的leader.replication.throttled.ratefollower.replication.throttled.rate参数。

通常使用kafka-reassign-partitions.sh工具来进行Partition Reassign操作,通过使用throttle参数来设置限流的大小。示例如下所示:

  1. 创建测试Topic。
    1. SSH方式登录到Kafka集群的Master节点,详情请参见登录集群
    2. 执行以下命令,创建测试Topic。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      您可以通过以下命令查看Topic详情。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. 执行以下命令,模拟数据写入。
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. 设置throttle参数并执行reassign操作。
    1. 创建reassignment-json-file文件reassign.json,写入如下内容。
      {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]}
    2. 执行reassign操作。
      由于模拟的写入速度为10 Mbit/s,所以将reassign限流速度设置为30 Mbit/s。
      kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute
  4. 查看限流参数。
    • 查看指定节点的Broker参数。
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe
    • 查看指定Topic的参数。
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe
  5. 查看reassign任务执行情况。
    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
    说明 任务完成后,您需要重复执行上述命令以移除限流参数。

节点内副本移动到不同目录的场景限流

通过kafka-reassign-partitions.sh工具可以进行Broker节点内的副本迁移,参数replica-alter-log-dirs-throttle可以对节点内的迁移IO进行限制。示例如下所示:

  1. 创建测试Topic。
    1. SSH方式登录到Kafka集群的Master节点,详情请参见登录集群
    2. 执行以下命令,创建测试Topic。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      您可以通过以下命令查看Topic详情。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. 执行以下命令,模拟数据写入。
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. 设置参数replica-alter-log-dirs-throttle并执行reassign操作。
    1. 创建文件reassign.json,将目标目录写入reassignment文件中,内容如下。
      {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]}
    2. 执行replicas movement操作。
      kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute
  4. 查看限流参数。
    Broker节点内目录间移动副本会在Broker上配置限流参数,参数名为Brokerreplica.alter.log.dirs.io.max.bytes.per.second。
    执行以下命令,查看指定节点的Broker参数。
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0
  5. 查看reassign任务执行情况。
    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
    说明 任务完成后,您需要重复执行上述命令以移除限流参数。

集群Broker恢复时,副本数据同步场景限流

重要
  • 限流速度不能过小,如果限流速度过小,将不能触发实际的reassign复制过程。
  • 限流参数不会对正常的副本fetch流量进行限速。
  • 数据恢复完成后,需要使用kafka-configs.sh命令删除相应的参数。

Broker重启时,需要从leader副本进行副本数据的同步。在Broker节点迁移、坏盘修复重新上线等场景时,由于之前的副本数据完全丢失、副本数据恢复会产生大量的同步流量,有必要对恢复过程进行限流避免恢复流量过大影响正常流量。示例如下所示:

  1. 创建测试Topic。
    1. SSH方式登录到Kafka集群的Master节点,详情请参见登录集群
    2. 执行以下命令,创建测试Topic。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      您可以通过以下命令查看Topic详情。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. 执行以下命令,写入测试数据。
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. 通过kafka-configs.sh命令设置限流参数。
    //设置Topic上的限流参数。
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"
    //设置Broker上的限流参数。
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2
    ......
  4. EMR控制台停止Broker 1节点。
  5. 删除Broker 1上的副本数据,模拟数据丢失的场景。
    rm -rf /mnt/disk2/kafka/log/test-throttled-0/
  6. EMR控制台启动Broker 1节点,观察限流参数是否起作用。
  7. Broker 1相应的副本恢复到ISR列表后,使用kafka-configs.sh命令删除限流参数的配置。
    //删除Topic上的限流参数。
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled
    //删除Broker上的限流参数
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0
    ......