针对Whale支持的几种并行化方式(数据并行、流水并行、Layer间拆分的模型并行、算子拆分的模型并行及各种混合并行策略),本文介绍其实现逻辑及在Whale中的实现方式,以指导您快速使用Whale进行分布式训练。

数据并行

  • 背景信息

    对于大规模应用数据,机器学习模型通常需要通过数据并行进行分布式训练加速。数据并行是分布式训练最常见的并行化方式,也是Whale支持的最基本的并行化方式之一。

  • 定义

    数据并行是指将训练数据分片放到不同运算节点中,并以相同的运算逻辑进行运算。

  • 实现逻辑
    以ResNet50模型为例,介绍数据并行的实现逻辑。该模型如下图所示。ResNet50模型当数据并行时,每张GPU卡都会有一个独立的模型副本,但是读取的训练数据不同,如下图所示。数据并行每轮迭代分为三步:
    1. 根据上一轮更新好的模型参数(权重),每个模型副本分别计算loss并得到梯度。
    2. 所有模型副本进行Reduce操作,完成梯度同步(所有Worker梯度ReduceSum在一起,同步后,每个模型副本得到同样的梯度)。
    3. 将梯度更新到模型参数(权重)。
  • Whale中实现数据并行
    在Whale中,您可以使用默认的Cluster Layout方式,并将模型代码放置在Replica Scope下,即可实现数据并行,代码示例如下(数据并行完整的可执行代码请参见simple_data_parallel.py)。
    import whale as wh
    wh.init()
    with wh.cluster():
        with wh.replica():
              ResNet50_Model_Defination()

模型并行

  • 背景信息
    受益于深度神经网络的发展,NLP及CV等领域的模型效果得到很大提升,但是同时大幅度增加了模型参数量。以Imagenet分类任务为例,优胜算法从2014年GoogleNet到2018年的Squeeze-and-Excitation Networks,参数量增长约36倍(从4百万增长至1.458亿参数)。Turing-NLG模型甚至达到170亿参数规模大小。模型参数飞速增长带来如下挑战:
    • 分布式训练梯度同步通信量增大。此外,受限于GPU显存,训练的Batch Size变小,导致训练的通信占比增加,分布式扩展效率降低。
    • 过小的Batch Size导致模型训练波动变大,影响收敛效果。
    模型并行适用于模型本身或部分规模较大,使用纯数据并行的训练加速效果不好或无法进行纯数据并行的场景。例如以下类似场景:
    • NLP领域的GPT2和T5等参数规模超大的场景。由于模型的显存消耗过大,单纯的数据并行无法训练。
    • NLP领域的BertLarge等参数规模较大的场景。模型中层与层间存在Feature Map(Activation),通信量远小于Weights 的通信量。
    • 以VGG-16模型为例,各层显存不平衡,单纯数据并行会造成较大的显存浪费。
  • 定义

    一种并行策略,不同计算设备(GPU和CPU等)负责网络模型不同层的计算,例如神经网络模型的不同网络层被分配到不同的设备。

  • 实现逻辑
    以BertLarge模型为例,通过Layer间的拆分策略,将BertLarge分配至不同的GPU卡中进行计算,以降低每张GPU卡的模型显存占用,从而提高BatchSize。同时,使用Activation通信代替梯度同步,大幅度降低卡间通信量。模型拆分如下图所示。模型并行数据并行时,每轮迭代需要通信一个完整的BertLarge模型梯度(约1.2G)。由于模型较大,导致在16G V100环境(还涉及Sequence length、Embedding Size等因素)中每次计算2条样本就需要进行梯度通信。因此,不仅通信占比过高导致性能差,而且由于Batch Size过小导致模型波动较大,收敛效果差。而模型并行时,每轮迭代只需要通信约27M的Activation数据,无需梯度通信过程。同时可以大幅度增加Batch Size,因此训练更加稳定,收敛效果更好。
  • Whale中实现模型并行
    您需要配置Cluster Layout,并将模型的不同部分放至Stage Scope下即可在Whale中实现模型并行。此处以BertLarge为例,单卡模型与模型并行的示例代码片段分别如下所示(BertLarge的模型并行可执行代码请参见train_bert_model.py):
    • 单卡模型
      embedding_output = embedding(inputs)
      encoder_output = encoder(embedding_output)
      pooler_output = pooler(encoder_output)
    • 模型并行
      import whale as wh
      cluster = wh.cluster()
      
      with cluster:
          with wh.stage():
              embedding_output = embedding(inputs)
              encoder_output = encoder_layer_1_8(embedding_output)
      
          with wh.stage():
              encoder_output = encoder_layer_9_16(embedding_output)
      
          with wh.stage():
              encoder_output = encoder_layer_17_24(embedding_output)
              pooler_output = pooler(encoder_output)
    说明 模型并行通常需要搭配流水并行,进行训练加速。

流水并行

  • 背景信息

    模型并行时,整个模型被划分为若干Stages,分配到不同的GPU卡中进行计算。由于不同Stage间存在依赖,因此GPU卡间存在明显的交替使用现象,无法充分利用GPU算力。此外,虽然模型划分的粒度有限,但是业务场景产生的海量数据对训练性能的要求越来越高,因此需要增加计算资源并行,以减少模型训练时间,从而提升模型收敛效果。

    针对模型并行存在的问题,Whale提供混合模型并行和流水并行,或混合模型并行、流水并行及数据并行的并行策略,以提高GPU的利用效率,从而解决分布式扩展问题。

  • 定义

    一种辅助性并行策略,通常与模型并行混合使用,是指不同设备(GPU和CPU等)接收不同批次数据(包括原始训练数据、Activation及Error)作为输入,同时计算设备对应的网络层部分。

  • 实现逻辑
    不同场景的模型可能适合不同的分布式并行策略。以BertLarge模型为例,不管在何种带宽和硬件拓扑下,每一层的Activation、显存及Flops计算都几乎一致,进行模型分片更易受Load Balance(包括显存均衡度和算力均衡度)影响。通过混合模型并行、流水并行及数据并行的混合策略,可以优化分布式BertLarge的性能,具体实现逻辑如下:
    1. 模型并行
      针对BertLarge模型过大的问题,可以使用Layer间的拆分策略将BertLarge分配至不同的GPU卡中进行计算,以降低每张GPU卡上的模型显存占用,从而提高Batch Size。同时使用Activation通信替代梯度同步,大幅度降低卡间通信量。模型拆分如下图所示。流水并行中的模型并行
    2. 流水并行
      如果仅使用Layer间拆分的模型并行,则当其中一张GPU卡计算时,其他GPU卡会闲置等待,即同一时间段只使用了一张GPU卡,如下图所示。模型并行时间轴因此,需要增加流水并行以提高其他GPU卡的利用率,其原理是每张GPU卡训练完一个Batch数据的当前部分后,立刻训练下一个Batch数据,执行逻辑如下图(该图为示例图,实际生产中会优化执行调度逻辑)所示。流水并行
    3. 数据并行
      由于模型并行不可能将模型拆分为无限份,拆分数量过大也会影响性能,因此如何利用更多的GPU卡进行训练加速就成为另外一个问题。您可以引入数据并行解决大数据规模问题,从而利用更大规模的GPU卡数进行训练。为了能够高效利用GPU间的NVLink(如果没有NVLink,则利用PCI-e)带宽,降低梯度同步开销,需要将模型并行的拆分放至不同服务器中进行,数据并行部分的梯度同步(通信量约为1.2 GB)放至服务器内进行,Layer间切分产生的Activation通信(约27 MB)通过TCP网络通信。假设每台服务器有4张GPU卡,将BertLarge进行4个副本的数据并行,模型拆分的逻辑如下图所示。流水并行中的模型拆分逻辑至此,通过Whale灵活易用的接口,便完成了模型并行、流水并行及数据并行的混合并行策略,以此对BertLarge进行训练加速。
  • Whale中实现流水并行
    您需要配置Cluster Layout,并将模型的不同部分放至Stage Scope,即可在Whale中实现模型并行。以BertLarge为例,单卡模型及流水并行的示例代码片段如下(BertLarge的流水并行可执行代码请参见train_bert_model.py):
    • 单卡模型
      embedding_output = embedding(inputs)
      encoder_output = encoder(embedding_output)
      pooler_output = pooler(encoder_output)
    • 流水并行
      import whale as wh
      # 假设申请1*3的GPU规模。
      cluster = wh.cluster()
      
      with cluster:
          with wh.pipeline(num_micro_batch=5):
              with wh.stage(): # stage 0
                  embedding_output = embedding(inputs)
                  encoder_output = encoder_layer_1_8(embedding_output)
      
              with wh.stage(): # stage 1
                  encoder_output = encoder_layer_9_16(embedding_output)
      
              with wh.stage(): # stage 2
                  encoder_output = encoder_layer_17_24(embedding_output)
                  pooler_output = pooler(encoder_output)
    • 流水并行搭配数据并行
      import whale as wh
      # 假设申请3*4的GPU规模。
      # 每个Stage进行4份副本的数据并行。
      cluster = wh.cluster(layout={"average": 4}) # 也可使用cluster = wh.cluster(layout="row")。
      
      with cluster:
          with wh.replica():
              with wh.pipeline(num_micro_batch=5):
                  with wh.stage(): # stage 0
                      embedding_output = embedding(inputs)
                      encoder_output = encoder_layer_1_8(embedding_output)
      
                  with wh.stage(): # stage 1
                      encoder_output = encoder_layer_9_16(embedding_output)
      
                  with wh.stage(): # stage 2
                      encoder_output = encoder_layer_17_24(embedding_output)
                      pooler_output = pooler(encoder_output)

算子拆分

  • 背景信息
    针对日益复杂的深度学习模型,通过数据并行进行任务训练仍然存在分布式性能差、单卡显存无法存储等问题。因此需要通过模型算子拆分进行分布式加速优化。算子拆分适用于模型本身或部分规模较大,无法使用数据并行进行训练的场景。例如以下类似场景:
    • 大规模分类任务场景
    • NLP领域的BertLarge、GPT2及T5等参数规模超大的场景。
    • 搜索推荐等Sparse模型,需要对稀疏特征进行分片的场景(Variable拆分在Whale中也视为一种算子拆分问题)。
  • 定义

    一种并行策略,将算子的存储部分和计算部分拆分到不同计算设备(GPU和CPU等)上进行存储和计算。

  • 实现逻辑
    不同算子拆分的实现逻辑各不相同,在此简单介绍ResNet50分类任务中的FC节点的拆分实现。ResNet50模型结构如下图所示。ResNet50模型结构分类任务中,当分类数较大时,Fully Connected(FC)部分权重数据并行的通信性能较差。分类数更多时,GPU显存无法存放过大的模型权重,此时需要通过算子拆分进行分布式优化。分类数通过算子拆之后的计算图如下图所示。算子拆分的分类模型图FC部分会将Weight权重按照列进行分割(如下图所示),ResNet50的输出会发送到每个FC分片卡上,每个分片只计算分配到自己的相应部分。得到输出的Logits之后,分别计算Softmax部分,但是需要跨卡通信以获得全局最大Logits信息和每行的Sum值。实现分类任务分布式计算的更多信息请参见“大规模分类分布式训练-算子拆分”。FC分割权重
  • Whale中实现算子拆分
    您需要配置Cluster Layout,并将需要进行算子拆分的部分代码放至Split Scope下,即可在Whale中实现算子拆分。以ResNet50为例,算子拆分的代码片段如下(算子拆分的大规模分类任务可执行代码请参见large_scale_classification.py)。
    import whale as wh
    cluster = wh.cluster()
    
    with cluster:
        with wh.replica():
            features = ResNet50(inputs)
    
        with wh.split():
            logits = FC(features)
            predictions = Softmax(logits)