本文为您介绍使用Kafka Rebalancer工具的注意事项、常用参数以及使用示例。本文以EMR Kafka 2.4.1版本为例。
背景信息
在使用Kafka集群过程中,常常会碰到以下问题:
leader分区不均衡:导致各个Broker负载不均衡,读写吞吐下降。
Borker分区数据量的不均衡:导致部分机器的磁盘利用率明显高于集群平均值,增加Broker宕机的风险。
节点内磁盘间的利用率不均衡:部分磁盘的利用率明显高于节点磁盘的平均利用率,增加了副本offline甚至Broker宕机的风险。
热点Topic:导致相应地磁盘load不均衡。
当出现上述问题时,通常需要进行负载均衡操作。进行负载均衡通常需要进行leader的重新分配和分区reassign等操作。Kafka提供的kafka-preferred-replica-election.sh和kafka-reassign-partitions.sh等工具可以进行此类负载均衡操作,但是这些工具需要进行较多配置操作,增加了运维工作量和难度。
EMR Kafka提供的Rebalancer工具封装了kafka-preferred-replica-election.sh和kafka-reassign-partitions.sh等工具,简化了运维工作量和难度,不会影响相应被封装工具的使用,您仍然可以使用相关工具进行运维工作。
注意事项
在使用工具的时候,需要对运维流量进行限制。
由于Rebalancer工具根据相应的选项生成reassignment的JSON文件,需要对生成的文件进行确认以确保与预期的分配结果一致。
对于较多的分区副本复制和移动操作,您应该评估运维时长以决定是否使用该工具进行运维。如果运维时间过长,您可以直接使用kafka-reassign-partitions.sh工具对运维任务进行拆分以便分时段进行运维。
需要借助kafka-reassign-partitions.sh工具对运维的过程进行监控,因此需要注意手工保存reassignment的JSON文件,该文件将用作kafka-reassign-partitions.sh的verify输入参数。
Rebalancer工具功能
在EMR集群Broker的ECS实例上,可以直接输入kafka-rebalancer.sh来查看该脚本工具的功能。
preferred-election:均衡leadership,需要输入Topic参数、对指定的Topic进行preferred election,详情请参见preferred-election。
该功能对kafka-preferred-replica-election.sh工具进行了封装。
balance-disks:基于磁盘空间使用率均衡节点内的磁盘分区副本分配,详情请参见balance-disks。
rebalance:基于磁盘空间使用率均衡集群节点之间的磁盘分区副本分配,详情请参见rebalance。
remove-broker-ids:移除指定Broker列表的所有分区副本。移除副本后,可以对Broker进行下线操作,详情请参见remove-broker-ids。
preferred-election
当分区副本leader不在preferred的Broker节点上时,有可能造成节点负载不均衡,此时需要均衡leadership。
重要参数
参数 | 描述 |
topics | 待preferred election的topics。 |
zookeeper | Kafka使用的ZooKeeper地址。EMR 5.x系列版本,由于不再直接支持以ZooKeeper地址方式与Kafka集群连接,因此无需使用--zookeeper参数,直接使用--bootstrap-server即可。 |
示例
触发preferred-election操作:
创建测试Topic。
kafka-topics.sh --create --topic elelction-topic --bootstrap-server core-1-1:9092 --replication-factor 2 --partitions 50
对测试Topic进行选举操作。
kafka-rebalancer.sh --zookeeper master-1-1:2181/emr-kafka --preferred-election --bootstrap-server core-1-1:9092 --topics elelction-topic
balance-disks
该功能用于均衡节点内的磁盘分区副本分配,是对kafka-reassign-partitions.sh相应功能的封装。与kafka-reassign-partitions.sh不同的地方在于,Rebalancer工具会基于磁盘空间使用率自动生成同一Broker节点内的分区副本分配文件。
重要参数
参数 | 描述 |
replica-alter-log-dirs-throttle | 限流参数,用来限制Broker内副本log目录迁移过程的迁移流量。 说明 您在使用该功能时,请设置合理的限流参数,避免因为资源争抢导致正常业务流量受到影响。 |
threshold | 磁盘使用率偏移阈值,只有当Broker内部的各个磁盘使用率的差异大于该阈值时,才会触发实际的副本Broker内部磁盘迁移操作,默认值为0.1。 |
zookeeper | Kafka使用的ZooKeeper地址。EMR 5.x系列版本,由于不再直接支持以ZooKeeper地址方式与Kafka集群连接,因此无需使用--zookeeper参数,直接使用--bootstrap-server即可。 |
示例
使用kafka-rebalancer.sh触发balance disks操作。
创建测试Topic。
kafka-topics.sh --create --topic balance-disks-topic --bootstrap-server core-1-1:9092 --replication-factor 2 --partitions 50
移动Broker 0 Topic分区副本在磁盘的位置,模拟磁盘不均衡的场景。
mv /mnt/disk1/kafka/log/balance-disks-topic-* /mnt/disk2/kafka/log/ mv /mnt/disk3/kafka/log/balance-disks-topic-* /mnt/disk4/kafka/log/
执行完上述模拟命令后,您需要重启对应节点以使模拟操作生效。
写入测试数据。
kafka-producer-perf-test.sh --producer-props bootstrap.servers=core-1-1:9092 --num-records 70000000 --throughput 200000 --record-size 1000 --topic balance-disks-topic
均衡Broker 0上的磁盘。
kafka-rebalancer.sh --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --balance-disks 0 --replica-alter-log-dirs-throttle 50000000 --threshold 0.1
保存打印在“Current partition replica movement”之后的JSON字符串到文件move.json中。
重要需要保存打印在“Current partition replica movement”之后的JSON字符串到文件move.json中,用作之后校验过程的reassignment-json-file的参数值。
由于使用了限流参数,迁移完成后,需要用kafka-reassign-partitions.sh工具verify选项去除设置在Topic以及Broker上的限流config参数。
监控检查balance disk过程。
您可以使用kafka-configs.sh工具进行限流参数确认,使用kafka-reassign-partitions.sh工具对迁移的进度进行确认。
查看限流参数是否生效。
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 0 --describe
查看balance disk任务。
kafka-reassign-partitions.sh --verify --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --reassignment-json-file move.json
说明输入的参数move.json是步骤1中保存的move.json文件。
rebalance
该功能基于磁盘空间使用率,均衡集群节点之间的磁盘分区副本分配,是对kafka-reassign-partitions.sh相应功能的封装。与kafka-reassign-partitions.sh不同的地方在于,Rebalancer会基于磁盘的空间使用率自动生成分区副本的节点间分配文件。
重要参数
参数 | 描述 |
throttle | 限流参数,用来限制reassign过程的迁移流量。 说明 您在使用该功能时,请设置合理的限流参数,避免因为资源争抢导致正常业务流量受到影响。 |
threshold | 磁盘使用率偏移阈值,只有当Broker内部的各个磁盘使用率的差异大于该阈值时,才会触发实际的rebalance操作,默认值为10%。 |
zookeeper | Kafka使用的ZooKeeper地址。EMR 5.x系列版本,由于不再直接支持以ZooKeeper地址方式与Kafka集群连接,因此无需使用--zookeeper参数,直接使用--bootstrap-server即可。 |
示例
使用kafka-rebalancer.sh触发rebalance操作。
创建测试Topic。
kafka-topics.sh --create --topic rebalance-topic --bootstrap-server core-1-1:9092 --replica-assignment 0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1
写入测试数据。
kafka-producer-perf-test.sh --topic rebalance-topic --num-records 7000000 --throughput 200000 --producer-props bootstrap.servers=core-1-1:9092 --record-size 1000
执行rebalance任务。
kafka-rebalancer.sh --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --rebalance --throttle 100000000 --threshold 0.1
保存打印在“Current partition replica movement”之后的JSON字符串到文件move.json中。
重要需要保存打印在“Current partition replica movement”之后的JSON字符串到文件move.json中,用作之后校验过程的reassignment-json-file的参数值。
由于使用了限流参数,迁移完成后,需要用kafka-reassign-partitions.sh工具verify选项去除设置在Topic以及Broker上的限流config参数。
监控检查rebalance过程。
您可以使用kafka-configs.sh工具进行限流参数确认,使用kafka-reassign-partitions.sh工具对迁移的进度进行确认。
查看限流参数是否生效。
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 0 --describe kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name rebalance-topic --describe
查看reassign任务。
kafka-reassign-partitions.sh --verify --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --reassignment-json-file move.json
说明输入的参数move.json是步骤1中保存的move.json文件。
remove-broker-ids
该功能用于移除指定Broker列表的所有分区副本。当您想要下线某个节点的时候,可以使用该功能,先将该节点上的所有分区副本移到其他Broker节点上。
通常Topic分区会被设置成3副本,因此一个集群通过需要保留3个Broker节点,不建议您在4个以下Broker节点规模的Kafka集群上执行下线Broker节点的操作。
重要参数
参数 | 描述 |
throttle | 限流参数,用来限制reassign过程的迁移流量。 说明 您在使用该功能时,请设置合理的限流参数,避免因为资源争抢导致正常业务流量受到影响。 |
zookeeper | Kafka使用的ZooKeeper地址。EMR 5.x系列版本,由于不再直接支持以ZooKeeper地址方式与Kafka集群连接,因此无需使用--zookeeper参数,直接使用--bootstrap-server即可。 |
示例
使用kafka-rebalancer.sh触发remove Broker操作。
创建测试Topic。
kafka-topics.sh --create --topic decommission-topic --partitions 50 --replication-factor 2 --bootstrap-server core-1-1:9092
写入测试数据。
kafka-producer-perf-test.sh --topic decommission-topic --num-records 70000000 --throughput 200000 --producer-props bootstrap.servers=core-1-1:9092 --record-size 1000
将Broker 1上的分区副本都移除掉。
kafka-rebalancer.sh --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --remove-broker-ids 1 --throttle 50000000
保存打印在“Current partition replica movement”之后的JSON字符串到文件move.json中。
重要需要保存打印在“Current partition replica movement”之后的JSON字符串到文件move.json中,用作之后校验过程的reassignment-json-file的参数值。
由于使用了限流参数,迁移完成后,需要用kafka-reassign-partitions.sh工具verify选项去除设置在Topic以及Broker上的限流config参数。
监控检查remove过程。
您可以使用kafka-configs.sh工具进行限流参数确认,使用kafka-reassign-partitions.sh工具检查分区副本迁移过程,使用kafka-log-dirs.sh工具查看待remove的Broker节点上的分区副本是否已经都迁移到其他节点。
查看限流参数是否生效。
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 0 --describe kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name decommission-topic --describe
查看reassign任务。
kafka-reassign-partitions.sh --verify --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --reassignment-json-file move.json
说明输入的参数move.json是步骤1中保存的move.json文件。
remove完成后,确保对应Broker上已经没有分区副本。
kafka-log-dirs.sh --bootstrap-server core-1-1:9092 --broker-list 1 --describe