I praticed the sample of Multi-worker training with keras in colab :
import json
import os
import sys
if ‘.’ not in sys.path:
sys.path.insert(0, ‘.’)
import tensorflow as tf
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28,28)),
tf.keras.layers.Reshape(target_shape=(28,28,1)),
tf.keras.layers.Conv2D(32, 3, activation=‘relu’),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation=‘relu’),
tf.keras.layers.Dense(10)])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=[‘accuracy’])
return model
tf_config = {
‘cluster’: {‘worker’:[‘localhost:12345’, ‘localhost:23456’]},
‘task’: {‘type’: ‘worker’, ‘index’: 0}}
json.dumps(tf_config)
os.environ[‘GREETINGS’] = ‘Hello Tensorflow!’
!echo ${GREETINGS}
strategy = tf.distribute.MultiWorkerMirroredStrategy()
import mnist_setup
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_work_batch_size = 64
tf_config = json.loads(os.environ[‘TF_CONFIG’])
num_workers = len(tf_config[‘cluster’][‘worker’])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_work_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
mutil_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
!ls *.py
os.environ[‘TF_CONFIG’] = json.dumps(tf_config)
%killbgscripts
!python main.py &> job_0.log
it runed over and over util 1 hour. Then I coded:
import time
time.sleep(10)
!cat job_0.log
it showed:
2023-11-23 10:37:48.050630: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT
2023-11-23 11:37:50.937150: E external/local_tsl/tsl/distributed_runtime/coordination/coordination_service_agent.cc:767] Coordination agent is set to ERROR: DEADLINE_EXCEEDED: Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:
/job:worker/replica:0/task:1
Additional GRPC error information from remote target /job:worker/replica:0/task:0 while calling /tensorflow.CoordinationService/WaitForAllTasks:
:{“created”:“@1700739470.936549355”,“description”:“Error received from peer ipv4:127.0.0.1:12345”,“file”:“external/com_github_grpc_grpc/src/core/lib/surface/call.cc”,“file_line”:1056,“grpc_message”:“Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:\n/job:worker/replica:0/task:1\n”,“grpc_status”:4} [type.googleapis.com/tensorflow.CoordinationServiceError=‘’]
2023-11-23 11:37:50.937687: E tensorflow/core/common_runtime/base_collective_executor.cc:249] BaseCollectiveExecutor::StartAbort DEADLINE_EXCEEDED: Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:
/job:worker/replica:0/task:1
Additional GRPC error information from remote target /job:worker/replica:0/task:0 while calling /tensorflow.CoordinationService/WaitForAllTasks:
:{“created”:“@1700739470.936549355”,“description”:“Error received from peer ipv4:127.0.0.1:12345”,“file”:“external/com_github_grpc_grpc/src/core/lib/surface/call.cc”,“file_line”:1056,“grpc_message”:“Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:\n/job:worker/replica:0/task:1\n”,“grpc_status”:4} [type.googleapis.com/tensorflow.CoordinationServiceError=‘’]
2023-11-23 11:37:50.937844: E tensorflow/core/common_runtime/eager/context_distributed_manager.cc:846] Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:
/job:worker/replica:0/task:1
Additional GRPC error information from remote target /job:worker/replica:0/task:0 while calling /tensorflow.CoordinationService/WaitForAllTasks:
:{“created”:“@1700739470.936549355”,“description”:“Error received from peer ipv4:127.0.0.1:12345”,“file”:“external/com_github_grpc_grpc/src/core/lib/surface/call.cc”,“file_line”:1056,“grpc_message”:“Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:\n/job:worker/replica:0/task:1\n”,“grpc_status”:4}
Traceback (most recent call last):
File “/content/main.py”, line 11, in
strategy = tf.distribute.MultiWorkerMirroredStrategy()
File “/usr/local/lib/python3.10/dist-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py”, line 186, in init
CollectiveAllReduceExtended(
File “/usr/local/lib/python3.10/dist-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py”, line 339, in init
self._initialize_strategy(self._cluster_resolver, devices=devices)
File “/usr/local/lib/python3.10/dist-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py”, line 358, in _initialize_strategy
self._initialize_multi_worker(cluster_resolver)
File “/usr/local/lib/python3.10/dist-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py”, line 530, in _initialize_multi_worker
context.context().ensure_initialized()
File “/usr/local/lib/python3.10/dist-packages/tensorflow/python/eager/context.py”, line 620, in ensure_initialized
pywrap_tfe.TFE_EnableCollectiveOps(context_handle, server_def_str)
tensorflow.python.framework.errors_impl.DeadlineExceededError: Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:
/job:worker/replica:0/task:1
Additional GRPC error information from remote target /job:worker/replica:0/task:0 while calling /tensorflow.CoordinationService/WaitForAllTasks:
:{“created”:“@1700739470.936549355”,“description”:“Error received from peer ipv4:127.0.0.1:12345”,“file”:“external/com_github_grpc_grpc/src/core/lib/surface/call.cc”,“file_line”:1056,“grpc_message”:“Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:\n/job:worker/replica:0/task:1\n”,“grpc_status”:4}
2023-11-23 11:37:51.511448: W tensorflow/core/common_runtime/eager/context.cc:628] Unable to destroy server_ object, so releasing instead. Servers don’t support clean shutdown.
2023-11-23 11:37:51.511816: E external/local_tsl/tsl/distributed_runtime/coordination/coordination_service_agent.cc:517] Shutdown() was called while coordination agent is in error state, implying that distributed execution failed. Note: agent will still shutdown anyway. Agent status: DEADLINE_EXCEEDED: Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:
/job:worker/replica:0/task:1
Additional GRPC error information from remote target /job:worker/replica:0/task:0 while calling /tensorflow.CoordinationService/WaitForAllTasks:
:{“created”:“@1700739470.936549355”,“description”:“Error received from peer ipv4:127.0.0.1:12345”,“file”:“external/com_github_grpc_grpc/src/core/lib/surface/call.cc”,“file_line”:1056,“grpc_message”:“Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:\n/job:worker/replica:0/task:1\n”,“grpc_status”:4} [type.googleapis.com/tensorflow.CoordinationServiceError=‘’]
This is usually caused by an earlier error during execution. Check the logs (this task or the leader) for an earlier error to debug further.