本文通过示例为您介绍,如何在DataScience集群进行分布式Mnist训练。您可以根据本文示例自行适配您的数据,修改提供的代码示例以进行定制化建模。

前提条件

  • 开发工具
    • 本地安装了Java JDK 8。
    • 本地安装了Maven 3.x。
    • 本地安装了用于Java或Scala开发的IDE,推荐IntelliJ IDEA,且已配置完成JDK和Maven环境。
  • 已创建DataScience集群,详情请参见创建集群

背景信息

DataScience集群已经内置了EasyRec算法库,您也可以使用EasyRec进行训练,此方式的优点是降低了算法的工作量,加快了模型迭代的速度,详情请参见EMR Tutorial

分布式Mnist训练支持以下两种模式:

准备工作

您可以按照以下方式获取代码示例:
  • 方式一:
    1. 使用SSH方式登录到集群,详细信息请参见使用SSH连接主节点
    2. 执行如下命令,下载mnist.npz
      wget https://pai-vision-data-hz.oss-cn-zhangjiakou.aliyuncs.com/easy-rec/mnist_demo/mnist.npz
    3. 执行如下命令,下载mnist_mirrored.py
      wget https://pai-vision-data-hz.oss-cn-zhangjiakou.aliyuncs.com/easy-rec/mnist_demo/mnist_mirrored.py -O mnist_mirrored.py
  • 方式二:
    1. 下载代码:data.tar.gz
    2. 使用SSH方式登录到集群,详细信息请参见使用SSH连接主节点
    3. 上传data.tar.gz至集群主节点。
    4. 执行如下命令,解压缩data.tar.gz
      tar zxvf data.tar.gz

      您可以获取到mnist.npzmnist_mirrored.py

代码示例

代码示例如下。
import tensorflow as tf
import logging
logging.basicConfig(
    level=logging.INFO, format='[%(asctime)s][%(levelname)s] %(message)s')

import os, json
curr_dir, _ = os.path.split(os.path.abspath(__file__))
remote_path = 'hdfs:///user/data/mnist.npz'
data_path = os.path.join(curr_dir, 'mnist.npz')
tf.io.gfile.copy(remote_path, data_path, overwrite=True)
logging.info('remote_path = %s' % remote_path)
logging.info('data_path = %s' % data_path)
logging.info('tensorflow version: %s' % tf.__version__)

BUFFER_SIZE = 10000
BATCH_SIZE_PER_REPLICA = 64
LEARNING_RATE = 1e-4

#strategy = tf.distribute.MirroredStrategy()
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
logging.info('Number of devices: {}'.format(strategy.num_replicas_in_sync))
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
logging.info('batch_size = %d batch_size_per_replica = %d' % (BATCH_SIZE,
             BATCH_SIZE_PER_REPLICA))

train, test = tf.keras.datasets.mnist.load_data(path=data_path)
num_train_samples = len(train[0])
num_test_samples = len(test[0])
logging.info('num_train_samples = %d num_test_samples = %d' % (num_train_samples,
             num_test_samples))

steps_per_epoch = int(num_train_samples / BATCH_SIZE)

mnist_train = tf.data.Dataset.from_tensor_slices(train)
mnist_test = tf.data.Dataset.from_tensor_slices(test)

def scale(image, label):
  image = tf.cast(image, tf.float32)
  image = tf.expand_dims(image, axis=2)
  image /= 255
  return image, label

mnist_train = mnist_train.repeat()
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

with strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])

  model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])


def decay(epoch):
  return LEARNING_RATE

checkpoint_dir = 'hdfs:///user/exp/mnist_mirrored/'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='hdfs:///user/exp/mnist_mirrored/logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay)
]

model.fit(train_dataset, epochs=12, callbacks=callbacks, steps_per_epoch=steps_per_epoch)

model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))

logging.info("Start evaluate: ")
eval_loss, eval_acc = model.evaluate(eval_dataset, steps=int(num_test_samples/BATCH_SIZE))
logging.info('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

单机多卡模式

此模式下您可以使用MirroredStrategy方法,在集群主节点上执行如下命令。
hadoop fs -mkdir -p hdfs://emr-header-1:9000/user/data/
hadoop fs -put mnist.npz hdfs://emr-header-1:9000/user/data/
el_submit  -t standalone -a mnist_train -f mnist_mirrored.py  -m local -wn 1 -wg 2  -wc 6  -wm 20000 -c python mnist_mirrored.py
  • -wn:服务器数量,此模式下必须是1。
  • -wg:表示GPU的数量。
  • -wc:表示CPU的数量。
  • -wm:表示CPU内存大小(字节)。本示例代码中的20000表示20 GB。

多机多卡模式

此模式下您可以使用MultiWorkerMirroredStrategy方法,在集群主节点上执行如下命令。
说明 使用此模式时需要替换代码中的strategy = tf.distribute.MirroredStrategy()strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
hadoop fs -mkdir -p hdfs://emr-header-1:9000/user/data/
hadoop fs -put mnist.npz hdfs://emr-header-1:9000/user/data/
el_submit  -t tensorflow-worker -a mnist_train -f mnist_mirrored.py  -m local -wn 2 -wg 1  -wc 6  -wm 20000 -c python mnist_mirrored.py
  • -wn:服务器数量,本示例为2。
  • -wg:表示GPU的数量。
  • -wc:表示CPU的数量。
  • -wm:表示CPU内存大小(字节)。本示例代码中的20000表示20 GB。