EMR Kafka磁盘故障运维

磁盘故障及其运维通常伴随着磁盘上的数据销毁。在进行磁盘运维时,您应考虑数据是否需要迁移备份。对于Kafka集群,您还需要考虑Topic分区副本数据是否可以从其他Broker节点分区副本同步恢复。本文以EMR Kafka 2.4.1版本为例,介绍Kafka磁盘故障运维的操作。

业务场景

Kafka将日志数据存储到磁盘中,当磁盘出现故障时,会导致磁盘IO能力下降、集群吞吐量下降、消息读写延时或日志目录offline等问题。这些情况有可能影响到线上业务平稳运行、数据丢失、Kafka集群容错能力下降,单块盘故障甚至有可能因为IO处理能力下降导致集群出现雪崩效应、引起重大生产事故。因此需要对磁盘故障进行有效监控以便及时发现故障。当磁盘发生故障时,应及时完成相关故障的处理,及时恢复集群的容错能力。

磁盘运维概述

本文从磁盘监控和磁盘故障恢复角度来介绍磁盘运维策略。

磁盘监控

以下内容从Kafka服务层面以及ECS系统层面来简单了解一下磁盘的监控策略。

  • Kafka服务层面:可以在云监控系统中设置EMR Kafka集群的OfflineLogDirectoryCount和UnderReplicatedPartitions等指标告警,及时发现相关指标的异常。

  • ECS系统层面:可以在云监控中设置相应ECS实例的I/O wait和mbps等指标,来监控磁盘的健康状态。ECS后台也会自动的检测磁盘状态,当发现问题时,会自动为您推送相关的磁盘事件。

磁盘故障恢复

当出现log directory offline、Under Replicated Partition时,需要尽快定位是否是由于磁盘故障导致的。

当出现磁盘故障时,需要根据故障原因、故障影响程度、业务需求(是否接受数据丢失、是否允许服务较长时间不可用)、集群状态等综合考虑恢复采取的策略。

  • 如果业务优先保证服务可用,但允许丢失部分数据,则应考虑在可能会丢失数据的情况下,先恢复服务可用性。

  • 如果业务不允许数据丢失,但能容忍服务较长时间不可用,则需要考虑如何尽可能的避免丢失数据的情况出现。

  • 如果业务需要高可用与数据不丢失,则您需要通过合理的集群配置、使用Kafka方式提高系统容错能力,以避免出现一块盘故障就导致数据丢失的情况出现。

  • 如果发现因为故障盘IO性能下降导致集群整体性能下降,影响业务,则应快速隔离故障盘来进行业务止损。

当检测或定位到磁盘故障时,可以考虑如下运维策略:

  • 节点间分区迁移:将故障磁盘中的分区副本迁移到其他Broker,详情请参见节点间分区迁移方式

  • 节点内分区迁移:将故障磁盘中的分区副本迁移到当前Broker的其他磁盘,详情请参见节点内分区迁移方式

  • 原Broker数据恢复:将异常磁盘从log.dirs中移除,重启Broker后会自动恢复丢失的Partition数据到本节点的其他目录,详情请参见原Broker数据恢复方式

使用限制

本文档仅适用于EMR-5.8.0及后续版本,EMR-3.42.0及后续版本。

注意事项

磁盘故障运维应注意以下信息:

  • 选择修复策略:综合各种因素考虑选择磁盘维修的策略。

  • 注意限流:在磁盘运维过程当中,如果涉及到数据复制迁移,需要注意限制运维流量,避免对正常业务流量造成影响。

  • 限流阈值:应根据磁盘IO能力和正常业务流量来评估迁移时的限流阈值。

  • 分区副本迁移:物理盘维修完毕上线之后,应将Topic分区数据迁移回原磁盘,leader负载迁移回原Broker节点。

  • 数据迁移时长:在磁盘运维过程当中,如果涉及到数据复制迁移,需要评估数据迁移时长对业务的影响。

节点间分区迁移方式

方案描述

当发现磁盘故障隐患或者由于其他原因需要替换磁盘时,可以将故障磁盘上的分区副本数据迁移到其他磁盘。故障磁盘维修更换期间,分区副本个数不会变化、集群处于正常容错状态。故障盘维修更换完毕之后将分区数据迁回到原磁盘。此方案的优点在于磁盘运维周期,集群处于正常容错状态,各Broker负载较为均衡;缺点是当故障盘上的分区没有其他ISR副本时,有可能丢失数据。

适用场景

故障盘所在Broker磁盘容量不足场景,或者故障盘所在Broker负载过高的场景。

注意事项

  • 注意限流:分区数据迁移将产生较大的数据迁移流量,需要对流量进行限制以避免对正常业务产生影响。

  • 迁移时长:如果需要迁移的数据较多,相应的迁移时长也会加大。

  • 限流阈值:应根据磁盘IO能力、正常业务流量来评估迁移时的限流阈值。

  • 磁盘空间:需要确保集群有足够的磁盘容量来存放迁移的数据。

  • 备份原始assignment文件。分区迁移时,需要注意保存原来的assignment文件,便于磁盘修复后,将分区迁移回原来的磁盘目录。

  • 由于disk1存储了Kafka服务的日志文件,所有涉及disk1的坏盘修复操作必须遵循以下步骤:

    1. 下线disk1前:务必先将日志目录迁移至其他存储介质。

      # 创建新的日志目录,确保目录权限与/mnt/disk1/log/保持一致。
      sudo mkdir /mnt/disk2/log
      
      # 检查软链接是否正确指向disk2。
      sudo ln -sf /mnt/disk2/log /var/log/taihao-apps
      
      # 迁移原日志文件至新目录。
      sudo mv /mnt/disk1/log/* /mnt/disk2/log/
    2. 上线disk1后:确保将日志文件目录恢复到原位。

      # 创建disk1的日志目录。
      sudo mkdir /mnt/disk1/log
      
      # 检查软链接是否正确指向disk1。
      sudo ln -sf /mnt/disk1/log /var/log/taihao-apps
      
      # 移动日志文件回disk1。
      sudo mv /mnt/disk2/log/* /mnt/disk1/log/

操作步骤

本示例以Broker 0的坏盘/mnt/disk7为例,介绍如何采用节点间分区迁移的方式进行故障盘运维。

  1. 分区迁移。

    1. 以SSH方式登录到源Kafka集群的Master节点,详情请参见登录集群

    2. 可选:待维修磁盘隔离。如果磁盘已经发生故障,您需要执行以下命令快速隔离该磁盘。

      chmod 000 /mnt/disk7
    3. 执行以下命令,将待维修磁盘上的分区迁移到其他Broker节点。

      kafka-reassign-partitions.sh --zookeeper  master-1-1:2181/emr-kafka --reassignment-json-file reassign.json --throttle 30000000 --execute
      说明

      reassign.json包含了待迁移磁盘上需要迁移的分区副本。

      执行完reassign命令后,会输出原始的assignment,可以将该输出保存起来,便于后续恢复。

    4. 执行以下命令,确认故障盘上数据已经迁移完毕。

      ls -lrt /mnt/disk7/kafka/log

      通过回显信息,查看日志目录下是否有日志文件,没有日志文件表示数据已经迁移完毕。

    5. 可选:如果前面步骤中没有隔离待维修磁盘,则需要执行以下命令隔离待维修磁盘。

      chmod 000 /mnt/disk7
  2. 卸载故障磁盘目录。

    重要

    编辑fstab条目时,直接删除而不是注释掉故障盘条目。

    1. 执行以下命令,编辑/etc/fstab

      vim /etc/fstab
    2. 删除disk7的信息。

    3. 执行以下命令,验证目录句柄是否释放。

      lsof +D /mnt/disk7

      如果发现句柄长时间(半个小时以上)无法释放,则可以通过控制台重启对应Broker服务。

    4. 执行以下命令,卸载故障磁盘目录。

      umount /mnt/disk7
  3. 修改log.dirs配置并重启服务。

    1. 在EMR控制台,修改恢复后的Broker节点级别的log.dirs配置项,移除故障磁盘对应的分区目录。

      本示例需要移除的目录为/mnt/disk7/kafka/log

    2. 在EMR控制台重启Broker服务,详情请参见操作步骤

      说明

      如果ECS的修复磁盘事件运维流程中需要重启ECS实例,则可以将重启Broker服务与重启ECS实例结合起来完成。

  4. 通过修复磁盘事件,在ECS控制台进行后续磁盘修复工作。

    此过程可能需要的时间周期为几天。

  5. 磁盘上线,详情请参见磁盘上线

    您需要将修复后的磁盘重新mount到原来的目录。本示例为/mnt/disk7目录。

  6. 在修复后的磁盘目录上创建kafka日志目录。

    //创建原始日志目录。本例子为/mnt/disk7/kafka/log。
    sudo mkdir -p /mnt/disk7/kafka/log
    sudo chown -R kafka:hadoop /mnt/disk7/kafka/log
  7. 修改log.dirs配置并重启服务。

    1. 在EMR控制台,修改恢复后的Broker节点级别的log.dirs配置项,添加修复后磁盘对应的分区目录。

      本示例需要增加的目录为/mnt/disk7/kafka/log

    2. 在EMR控制台重启Broker服务,详情请参见操作步骤

  8. 迁回分区副本。

    使用步骤1保存的原始assignment文件,将原来故障盘上的分区迁回到修复后的磁盘。

原Broker数据恢复方式

方案描述

当磁盘故障时,如果磁盘IO性能已经明显下降,则需要快速隔离故障磁盘避免因单点故障影响集群性能。

磁盘隔离之后,对应kafka日志目录处于offline状态。此时,如果分区存在ISR副本或者允许分区数据丢失,可以直接将故障磁盘从log.dirs中删除下线,然后重启Broker。重启Broker后,如果其他节点存在新的leader副本,原有故障盘分区副本将在其所在Broker进行恢复。磁盘修复上线后,您可以通过reassign工具将原有数据迁移回修复后的磁盘。

本方案的缺点在于:当故障盘上的分区没有ISR副本时,有可能丢失数据。

适用场景

  • 故障磁盘IO性能已经明显下降,需要快速进行故障盘隔离的场景。

  • 故障磁盘上的日志目录已经offline的场景。

  • 故障盘上的分区在其他节点已经没有ISR副本时,允许丢失分区数据。

注意事项

  • 流量限制:由于数据在同一个节点恢复,会产生副本恢复流量,应注意限流。

  • 磁盘空间:需要注意本Broker的其他节点是否有足够的空间容纳异常磁盘的数据。

  • 数据迁移:磁盘修复重新上线后,通常需要将数据从其他磁盘挪回到修复后的磁盘以保证磁盘负载均衡。

操作步骤

以下示例以Broker 0的/mnt/disk7坏盘为例,介绍如何采用原地数据恢复方式进行故障盘运维。

  1. 故障盘隔离。

    1. 以SSH方式登录到源Kafka集群的Master节点,详情请参见登录集群

    2. 执行以下命令,隔离故障盘。

      chmod 000 /mnt/disk7
  2. 可选:备份故障盘上Kafka日志分区目录名称。

    通过备份故障盘目录名称,当磁盘修复上线后,您可以将原有分区迁移回修复后的磁盘中。

  3. 卸载故障磁盘目录。

    重要

    编辑fstab条目时,直接删除而不是注释掉故障盘条目。

    1. 执行以下命令,编辑/etc/fstab

      vim /etc/fstab
    2. 删除disk7的信息。

    3. 执行以下命令,验证目录句柄是否释放。

      lsof +D /mnt/disk7

      如果发现句柄长时间(半个小时以上)无法释放,则可以通过控制台重启对应Broker服务。

    4. 执行以下命令,卸载故障磁盘目录。

      umount /mnt/disk7
  4. 修改故障Broker节点的log.dirs配置并限制恢复流量。

    1. 在EMR控制台,修改恢复后的Broker节点级别的log.dirs配置项,移除故障磁盘对应的分区目录。

      本示例需要移除的目录为/mnt/disk7/kafka/log

    2. 可选:通过设置reassign限流参数的方式来限制Broker上分区数据同步恢复时候的流量带宽,详情请参见限制Kafka服务端运维流量

  5. 在EMR控制台重启Broker服务,详情请参见操作步骤

    说明

    如果ECS的修复磁盘事件运维流程中需要重启ECS实例,则可以将重启Broker服务与重启ECS实例结合起来完成。

    在重启的过程中,Kafka将会恢复本Broker故障盘上缺失的副本分区到其他磁盘,待Kafka恢复好分区数据后,如果前一步骤设置了限流参数,则需要将限流参数去除掉。

  6. 通过修复磁盘事件,在ECS控制台进行后续磁盘修复工作。

    此过程可能需要的时间周期为几天。

  7. 磁盘上线,详情请参见磁盘上线

    您需要将修复后的磁盘重新mount到原来的目录。本示例为/mnt/disk7目录。

  8. 创建Kafka日志目录并重启服务。

    1. 在修复后的磁盘目录上创建Kafka日志目录。

      //创建原始日志目录。本例子为/mnt/disk7/kafka/log。
      sudo mkdir -p /mnt/disk7/kafka/log
      sudo chown -R kafka:hadoop /mnt/disk7/kafka/log
    2. 在EMR控制台,修改恢复后的Broker节点级别的log.dirs配置项,添加修复后磁盘对应的分区目录。

      log.dirs

      本示例需要增加的目录为/mnt/disk7/kafka/log

    3. 在EMR控制台重启Broker服务,详情请参见操作步骤

  9. 迁回分区副本。

    使用步骤2保存的分区副本目录,通过reassign工具迁移分区副本broker内部目录的方式迁回到修复后的磁盘。

节点内分区迁移方式

方案描述

当发现磁盘故障时,如果磁盘IO性能已经明显下降,需要快速隔离故障磁盘避免因单点故障影响集群性能。

磁盘隔离之后,对应的kafka日志目录处于offline状态。此时,如果故障盘上存在无法切换到其他Broker的副本且业务数据不能丢失,则可以将分区副本数据迁移到本Broker的其他磁盘后,再进行后续的磁盘运维流程。本方案用于处理如果使用其他运维方案会丢失数据,但业务不允许数据丢失的场景。

适用场景

故障盘上的分区在其他节点已经没有ISR副本,并且业务不允许数据丢失的场景。

注意事项

需要评估OS系统层面的mv操作产生的IO热点的影响。

操作步骤

本示例以Broker 0的坏盘/mnt/disk7为例,介绍如何采用节点内分区迁移的方式进行故障盘运维。

  1. 故障盘隔离。

    1. 以SSH方式登录到源Kafka集群的Master节点,详情请参见登录集群

    2. 执行以下命令,隔离故障盘。

      chmod 000 /mnt/disk7
  2. 可选:备份故障盘上Kafka日志分区目录名称。

    通过备份故障盘目录名称,当磁盘修复上线后,您可以将原有分区迁移回修复后的磁盘中。

  3. 迁移节点内分区。

    /mnt/disk7/kafka/log下的所有数据以分区为单位,按照分区占用磁盘容量的大小,均匀的迁移到当前节点的其他目录,详情请参见节点内分区迁移方式恢复

  4. 卸载故障磁盘目录。

    重要

    编辑fstab条目时,直接删除而不是注释掉故障盘条目。

    1. 执行以下命令,编辑/etc/fstab

      vim /etc/fstab
    2. 删除disk7的信息。

    3. 执行以下命令,验证目录句柄是否释放。

      lsof +D /mnt/disk7

      如果发现句柄长时间(半个小时以上)无法释放,则可以通过控制台重启对应Broker服务。

    4. 执行以下命令,卸载故障磁盘目录。

      umount /mnt/disk7
  5. 在EMR控制台重启Broker服务,详情请参见操作步骤

    说明

    如果ECS的修复磁盘事件运维流程中需要重启ECS实例,则可以将重启Broker服务与重启ECS实例结合起来完成。

  6. 通过修复磁盘事件,在ECS控制台进行后续磁盘修复工作。

    此过程可能需要的时间周期为几天。

  7. 磁盘上线,详情请参见磁盘上线

    您需要将修复后的磁盘重新mount到原来的目录。本示例为/mnt/disk7目录。

  8. 创建Kafka日志目录并重启服务。

    1. 在修复后的磁盘目录上创建kafka日志目录。

      //创建原始日志目录。本例子为/mnt/disk7/kafka/log。
      sudo mkdir -p /mnt/disk7/kafka/log
      sudo chown -R kafka:hadoop /mnt/disk7/kafka/log
    2. 在EMR控制台,修改恢复后的Broker节点级别的log.dirs配置项,添加修复后磁盘对应的分区目录。log.dirs

    3. 在EMR控制台重启Broker服务,详情请参见操作步骤

  9. 可选:迁回分区副本。

    使用步骤2保存的分区副本目录,通过reassign工具迁移分区副本broker内部目录的方式迁回到修复后的磁盘。