【2.4.1.7】tensorflow多GPU的实现-7
多GPU评估
在此评估中,您将更改训练脚本以对新数据集(CIFAR-10)进行训练。首先拷贝一个副本以供参考之用,然后请查看assessmentment.py文件:
!cp assessment.py assessment_original.py
您可以使用下面的代码单元运行串行执行的训练过程。
!python assessment_original.py
目标
评估的目的是让您更改assessment.py文件,以便在四分钟(240秒)内将其训练到0.75的训练准确度(acc)。 为了确保模型在验证指标上开始收敛 ,您必须达到至少0.25的验证准确度(val_acc)。这两个要求必须至少在两个连续的训练周期都达到。
指导
您将需要更改assessment.py文件,以便它可以在Horovod上使用多个GPU。评估脚本将使用与该服务器上一样多的GPU运行您的脚本。(因此,尽管使用较少的GPU可能可以满足准确度的要求,但是在启动全部GPU时,您的训练脚本必须正确。)。您可以查看在本练习中可以访问的任何文件作为参考。需要考虑的一些因素是学习率、批量大小和优化器。
assessment.py训练脚本包括我们在之前的练习中使用过的PrintTotalTime回调函数,以便您可以跟踪时间,而StopAtAccuracy回调函数则可以自动强制满足评估的要求。您不得编辑或删除这两个回调函数;否则,您的评估将失败。您可以自行决定添加或删除其他回调函数。请注意,此脚本中还有其他代码可帮助确保解决此问题,例如设置numpy随机种子以使结果更具确定性,删除此代码将有风险。
在进行实验时,可以在执行训练时使用诸如–batch-size和–base-lr之类的参数。但是,当您找到可以通过评估的参数时,请将这些参数在assessment.py中保存为默认值,因为测试设备将在不带参数的情况下运行您的assessment.py脚本。例如,如果您希望评估设备以batch size等于16来测试您的代码,请按如下所示更改文件:
parser.add_argument('-batch-size', type = int, default = 16,
help ='input batch size for training')
一旦您更改了assessment.py并达到标准,就可以返回课程页面并单击评估按钮。
您的代码将被运行。无论是否通过,您都将在3分钟内收到通知。
您可以在提交之前在这里测试您的代码的性能。我们鼓励您这样做,以便您对自己会通过充满信心。
!mpirun -np FIXME python assessment.py
脚本
assessment.py
import argparse
import tensorflow as tf
import random as rn
import numpy as np
np.random.seed(965)
rn.seed(965)
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing import image
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Dense, \
Add, Activation, Dropout, MaxPooling2D, GlobalAveragePooling2D
import os
from time import time
tf.config.threading.set_inter_op_parallelism_threads(1)
tf.config.threading.set_intra_op_parallelism_threads(1)
parser = argparse.ArgumentParser(description='CIFAR-10 Example',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--batch-size', type=int, default=128,
help='input batch size for training')
parser.add_argument('--epochs', type=int, default=50,
help='number of epochs to train')
parser.add_argument('--base-lr', type=float, default=0.01,
help='learning rate for a single GPU')
parser.add_argument('--warmup-epochs', type=float, default=5,
help='number of warmup epochs')
parser.add_argument('--momentum', type=float, default=0.9,
help='SGD momentum')
args = parser.parse_args()
# Define the function that creates the model
def cbr(x, conv_size):
channel_axis = 1 if K.image_data_format() == 'channels_first' else -1
x = Conv2D(conv_size, (3,3), padding='same')(x)
x = BatchNormalization(axis=channel_axis)(x)
x = Activation('relu')(x)
return x
def conv_block(x, conv_size, scale_input = False):
x_0 = x
if scale_input:
x_0 = Conv2D(conv_size, (1, 1), activation='linear', padding='same')(x_0)
x = cbr(x, conv_size)
x = Dropout(0.01)(x)
x = cbr(x, conv_size)
x = Add()([x_0, x])
return x
def create_model():
# Implementation of WideResNet (depth = 16, width = 10) based on keras_contrib
# https://github.com/keras-team/keras-contrib/blob/master/keras_contrib/applications/wide_resnet.py
inputs = Input(shape=(32, 32, 3))
x = cbr(inputs, 16)
x = conv_block(x, 160, True)
x = conv_block(x, 160)
x = MaxPooling2D((2, 2))(x)
x = conv_block(x, 320, True)
x = conv_block(x, 320)
x = MaxPooling2D((2, 2))(x)
x = conv_block(x, 640, True)
x = conv_block(x, 640)
x = GlobalAveragePooling2D()(x)
outputs = Dense(num_classes, activation='softmax')(x)
model = tf.keras.models.Model(inputs, outputs)
opt = tf.keras.optimizers.SGD(lr=args.base_lr, momentum=args.momentum)
model.compile(loss=tf.keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy'])
return model
verbose = 1
num_classes = 10
data_augmentation = True
# The data, split between train and test sets:
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
# Input image dimensions
img_rows, img_cols = 32, 32
num_classes = 10
if K.image_data_format() == 'channels_first':
x_train = x_train.reshape(x_train.shape[0], 3, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], 3, img_rows, img_cols)
input_shape = (3, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 3)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 3)
input_shape = (img_rows, img_cols, 3)
# Convert class vectors to binary class matrices.
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)
model = create_model()
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
# Callbacks. Do NOT edit these, the assessment
# code depends on these appearing exactly as
# below, and you will fail the assessment if
# these are changed.
class PrintTotalTime(tf.keras.callbacks.Callback):
def on_train_begin(self, logs=None):
self.start_time = time()
def on_epoch_end(self, epoch, logs=None):
elapsed_time = round(time() - self.start_time, 2)
print("Elapsed training time through epoch {}: {}".format(epoch+1, elapsed_time))
def on_train_end(self, logs=None):
total_time = round(time() - self.start_time, 2)
print("Total training time: {}".format(total_time))
class StopAtAccuracy(tf.keras.callbacks.Callback):
def __init__(self, train_target=0.75, val_target=0.25, patience=2, verbose=0):
self.train_target = train_target
self.val_target = val_target
self.patience = patience
self.verbose = verbose
self.stopped_epoch = 0
self.met_train_target = 0
self.met_val_target = 0
def on_epoch_end(self, epoch, logs=None):
if logs.get('accuracy') > self.train_target:
self.met_train_target += 1
else:
self.met_train_target = 0
if logs.get('val_accuracy') > self.val_target:
self.met_val_target += 1
else:
self.met_val_target = 0
if self.met_train_target >= self.patience and self.met_val_target >= self.patience:
self.stopped_epoch = epoch
self.model.stop_training = True
def on_train_end(self, logs=None):
if self.stopped_epoch > 0 and verbose == 1:
print('Early stopping after epoch {}. Training accuracy target ({}) and validation accuracy target ({}) met.'.format(self.stopped_epoch + 1, self.train_target, self.val_target))
def lr_schedule(epoch):
if epoch < 15:
return args.base_lr
if epoch < 25:
return 1e-1 * args.base_lr
if epoch < 35:
return 1e-2 * args.base_lr
return 1e-3 * args.base_lr
callbacks = [
StopAtAccuracy(verbose=verbose),
tf.keras.callbacks.LearningRateScheduler(lr_schedule)
]
if verbose:
callbacks.append(PrintTotalTime())
# This will do preprocessing and realtime data augmentation:
datagen = image.ImageDataGenerator(
featurewise_center=False, # set input mean to 0 over the dataset
samplewise_center=False, # set each sample mean to 0
featurewise_std_normalization=False, # divide inputs by std of the dataset
samplewise_std_normalization=False, # divide each input by its std
zca_whitening=False, # apply ZCA whitening
zca_epsilon=1e-06, # epsilon for ZCA whitening
rotation_range=0, # randomly rotate images in the range (degrees, 0 to 180)
# randomly shift images horizontally (fraction of total width)
width_shift_range=0.1,
# randomly shift images vertically (fraction of total height)
height_shift_range=0.1,
shear_range=0., # set range for random shear
zoom_range=0., # set range for random zoom
channel_shift_range=0., # set range for random channel shifts
# set mode for filling points outside the input boundaries
fill_mode='nearest',
cval=0., # value used for fill_mode = "constant"
horizontal_flip=True, # randomly flip images
vertical_flip=False, # randomly flip images
# set rescaling factor (applied before any other transformation)
rescale=None,
# set function that will be applied on each input
preprocessing_function=None,
# image data format, either "channels_first" or "channels_last"
data_format=None,
# fraction of images reserved for validation (strictly between 0 and 1)
validation_split=0.0)
# Compute quantities required for feature-wise normalization
# (std, mean, and principal components if ZCA whitening is applied).
datagen.fit(x_train)
# Fit the model on the batches generated by datagen.flow().
model.fit(datagen.flow(x_train, y_train, batch_size=args.batch_size),
callbacks=callbacks,
epochs=args.epochs,
verbose=verbose,
# avoid shuffling for reproducible training
shuffle=False,
steps_per_epoch=int(len(y_train)/(args.batch_size)),
validation_data=(x_test, y_test),
workers=4)
# Score trained model.
scores = model.evaluate(x_test, y_test, verbose=verbose)
if verbose:
print('Test loss:', scores[0])
print('Test accuracy:', scores[1])
full_solution.py
import argparse
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing import image
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Dense, \
Add, Activation, Dropout, MaxPooling2D, GlobalAveragePooling2D
import numpy as np
import os
from time import time
import horovod.tensorflow.keras as hvd
import csv
from tensorflow_addons.optimizers import NovoGrad
# Initialize Horovod
hvd.init()
# Pin to a GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
tf.config.experimental.set_memory_growth(gpus[hvd.local_rank()], True)
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# Parse input arguments
parser = argparse.ArgumentParser(description='Fashion MNIST Example',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--log-dir', default='./logs',
help='tensorboard log directory')
parser.add_argument('--batch-size', type=int, default=32,
help='input batch size for training')
parser.add_argument('--val-batch-size', type=int, default=32,
help='input batch size for validation')
parser.add_argument('--epochs', type=int, default=40,
help='number of epochs to train')
parser.add_argument('--base-lr', type=float, default=0.01,
help='learning rate for a single GPU')
parser.add_argument('--momentum', type=float, default=0.9,
help='SGD momentum')
parser.add_argument('--wd', type=float, default=0.000005,
help='weight decay')
parser.add_argument('--warmup-epochs', type=float, default=5,
help='number of warmup epochs')
parser.add_argument('--target-accuracy', type=float, default=.85,
help='Target accuracy to stop training')
parser.add_argument('--patience', type=float, default=2,
help='Number of epochs that meet target before stopping')
args = parser.parse_args()
# Define a function for a simple learning rate decay over time
def lr_schedule(epoch):
if epoch < 15:
return args.base_lr
if epoch < 25:
return 1e-1 * args.base_lr
if epoch < 35:
return 1e-2 * args.base_lr
return 1e-3 * args.base_lr
# Define the function that creates the model
def cbr(x, conv_size):
channel_axis = 1 if K.image_data_format() == 'channels_first' else -1
x = Conv2D(conv_size, (3,3), padding='same')(x)
x = BatchNormalization(axis=channel_axis)(x)
x = Activation('relu')(x)
return x
def conv_block(x, conv_size, scale_input = False):
x_0 = x
if scale_input:
x_0 = Conv2D(conv_size, (1, 1), activation='linear', padding='same')(x_0)
x = cbr(x, conv_size)
x = Dropout(0.01)(x)
x = cbr(x, conv_size)
x = Add()([x_0, x])
return x
def create_model():
# Implementation of WideResNet (depth = 16, width = 10) based on keras_contrib
# https://github.com/keras-team/keras-contrib/blob/master/keras_contrib/applications/wide_resnet.py
inputs = Input(shape=(28, 28, 1))
x = cbr(inputs, 16)
x = conv_block(x, 160, True)
x = conv_block(x, 160)
x = MaxPooling2D((2, 2))(x)
x = conv_block(x, 320, True)
x = conv_block(x, 320)
x = MaxPooling2D((2, 2))(x)
x = conv_block(x, 640, True)
x = conv_block(x, 640)
x = GlobalAveragePooling2D()(x)
outputs = Dense(num_classes, activation='softmax')(x)
model = tf.keras.models.Model(inputs, outputs)
opt = NovoGrad(lr=args.base_lr, grad_averaging=True)
# Wrap the optimizer in a Horovod distributed optimizer
opt = hvd.DistributedOptimizer(opt)
model.compile(loss=tf.keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy'])
return model
if hvd.rank() == 0:
verbose = 1
else:
verbose = 0
# Input image dimensions
img_rows, img_cols = 28, 28
num_classes = 10
# Load Fashion MNIST data.
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()
if K.image_data_format() == 'channels_first':
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)
# Convert class vectors to binary class matrices
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)
# Training data iterator.
train_gen = image.ImageDataGenerator(featurewise_center=True, featurewise_std_normalization=True,
horizontal_flip=True, width_shift_range=0.2, height_shift_range=0.2)
train_gen.fit(x_train)
train_iter = train_gen.flow(x_train, y_train, batch_size=args.batch_size)
# Validation data iterator.
test_gen = image.ImageDataGenerator(featurewise_center=True, featurewise_std_normalization=True)
test_gen.mean = train_gen.mean
test_gen.std = train_gen.std
test_iter = test_gen.flow(x_test, y_test, batch_size=args.val_batch_size)
callbacks = []
callbacks.append(tf.keras.callbacks.LearningRateScheduler(lr_schedule))
# Add learning rate warmup callback.
callbacks.append(hvd.callbacks.LearningRateWarmupCallback(initial_lr=args.base_lr, warmup_epochs=args.warmup_epochs, verbose=verbose))
# Broadcast initial variable states from the first worker to all others.
callbacks.append(hvd.callbacks.BroadcastGlobalVariablesCallback(0))
# Average the metrics among workers at the end of every epoch.
callbacks.append(hvd.callbacks.MetricAverageCallback())
class PrintThroughput(tf.keras.callbacks.Callback):
def __init__(self, total_images=0):
self.total_images = total_images
def on_epoch_begin(self, epoch, logs=None):
self.epoch_start_time = time()
def on_epoch_end(self, epoch, logs={}):
epoch_time = time() - self.epoch_start_time
images_per_sec = round(self.total_images / epoch_time, 2)
print('Images/sec: {}'.format(images_per_sec))
if verbose:
callbacks.append(PrintThroughput(total_images=len(y_train)))
class StopAtAccuracy(tf.keras.callbacks.Callback):
def __init__(self, target=0.85, patience=2, verbose=0):
self.target = target
self.patience = patience
self.verbose = verbose
self.stopped_epoch = 0
self.met_target = 0
def on_epoch_end(self, epoch, logs=None):
if logs.get('val_accuracy') > self.target:
self.met_target += 1
else:
self.met_target = 0
if self.met_target >= self.patience:
self.stopped_epoch = epoch
self.model.stop_training = True
def on_train_end(self, logs=None):
if self.stopped_epoch > 0 and self.verbose == 1:
print('Early stopping after epoch {}'.format(self.stopped_epoch + 1))
callbacks.append(StopAtAccuracy(target=args.target_accuracy, patience=args.patience, verbose=verbose))
class PrintTotalTime(tf.keras.callbacks.Callback):
def on_train_begin(self, logs=None):
self.start_time = time()
def on_epoch_end(self, epoch, logs=None):
total_time = round(time() - self.start_time, 2)
print("Cumulative training time after epoch {}: {}".format(epoch + 1, total_time))
def on_train_end(self, logs=None):
total_time = round(time() - self.start_time, 2)
print("Cumulative training time: {}".format(total_time))
if verbose:
callbacks.append(PrintTotalTime())
class SaveTrainingData(tf.keras.callbacks.Callback):
def __init__(self, data_filepath=''):
self.data_filepath = data_filepath
def on_train_begin(self, logs=None):
file = open(self.data_filepath, 'w', newline='')
writer = csv.writer(file)
writer.writerow(['time', 'val_accuracy'])
writer.writerow([0.0, 0.0])
file.close()
self.train_start_time = time()
def on_epoch_end(self, epoch, logs={}):
total_time = time() - self.train_start_time
file = open(self.data_filepath, 'a')
writer = csv.writer(file)
writer.writerow([round(total_time,1), round(logs['val_accuracy'], 4)])
file.close()
# Save the training data.
if hvd.rank() == 0:
data_filepath = "training_data/{}ranks-{}bs-{}lr-{}m-{}w.csv".format(hvd.size(), args.batch_size,
args.base_lr, args.momentum, args.warmup_epochs)
callbacks.append(SaveTrainingData(data_filepath=data_filepath))
# Create the model.
model = create_model()
# Train the model.
model.fit(train_iter,
steps_per_epoch=len(train_iter) // hvd.size(),
callbacks=callbacks,
epochs=args.epochs,
verbose=verbose,
workers=4,
initial_epoch=0,
validation_data=test_iter,
validation_steps=3 * len(test_iter) // hvd.size())
# Evaluate the model on the full data set.
score = model.evaluate(test_iter, steps=len(test_iter), workers=4, verbose=verbose)
if verbose:
print('Test loss:', score[0])
print('Test accuracy:', score[1])
参考资料
- Nvidia的课件《用多 GPU 训练神经网络》
个人公众号,比较懒,很少更新,可以在上面提问题,如果回复不及时,可发邮件给我: tiehan@sina.cn