I am trying to train a model using data parallelism on multiple GPUs on a single machine. As I think, in data parallelism, we divide the data into batches, and then batches are deployed parallel. Afterward, the average gradient is calculated based on the current batch errors (for example, if there are 2 GPUs: errors will be 2 batches) and updated based on the average gradient.
Now, when I implemented horovod, I observed some other things. For example, I observed that the number of epochs trained is divided according to the number of GPUs. For example, if I train the model on 300 epochs, then, on 1 GPU, the number of epochs is 300, but on 2 GPUs, it is divided into 150 epochs (150 epochs process GPU1 and remains 150 epochs process 2nd GPU), and similarly, on 3 GPUs, it is 100 epochs. Is this correct? If it is correct, then how does it achieve data parallelism?
Here is my code:
import math
import sys
import time
import scipy.io
import numpy as np
import pandas as pd
from sklearn import metrics
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
from tensorflow.compat.v1.keras import backend as K
import horovod.tensorflow.keras as hvd
from tensorflow.keras.models import Sequential
# Horovod: initialize Horovod.
hvd.init()
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
physical_gpus = tf.config.list_physical_devices('GPU')
tf.config.set_visible_devices([physical_gpus[hvd.local_rank()]], "GPU")
def main():
input_shape = (seg_train_x.shape[1], seg_train_x.shape[2], seg_train_x.shape[3])
print(f'input shape {input_shape}')
epochs = int(math.ceil(300.0 / hvd.size()))
batch_size = 100
model = Sequential()
model.add(Conv2D(16, kernel_size=(3, 3),
activation='relu',
input_shape=input_shape))
model.add(Conv2D(32, (3, 3), activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01),
bias_regularizer=tf.keras.regularizers.l1(0.01)))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))
# Horovod: adjust learning rate based on number of GPUs.
scaled_lr = 0.00001 * hvd.size()
opt = tf.keras.optimizers.Adam(scaled_lr)
# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt, backward_passes_per_step=1)
model.compile(loss=tf.keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy'])
callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
print(f'input shape {seg_train_x.shape}')
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
csv_logger = tf.keras.callbacks.CSVLogger('training.log')
start = time.time()
model.fit(
seg_train_x,
seg_train_y,
batch_size=batch_size,
callbacks=[callbacks, csv_logger],
epochs=epochs,
validation_data= (seg_val_x, seg_val_y),
verbose=1 if hvd.rank() == 0 else 0,
)
end = time.time()
if hvd.rank() == 0:
print('Total Training Time:', round((end - start), 2), '(s)')
score = model.evaluate(seg_test_x, seg_test_y, verbose=0)
y_pred_test = model.predict(seg_test_x)
# Take the class with the highest probability from the test predictions
max_y_pred_test = np.argmax(y_pred_test, axis=1)
max_y_test = np.argmax(seg_test_y, axis=1) # actual test labels
fScore = metrics.f1_score(max_y_test, max_y_pred_test, average='macro')
print('Test loss:', score[0])
print('Test accuracy:', score[1])
print('F1-Score:', fScore)
if __name__ == '__main__':
main()
Environment:
- Framework: (TensorFlow)
- Framework version: 2.2.0
- Horovod version: v0.21.3
- MPI version: (Open MPI) 2.1.1
- CUDA version: 10.1, V10.1.243
- NCCL version: 2.11.4
- Python version: 3.6.9
- CMake version: 3.10.2