Error again: Multi-worker training with keras

I praticed the sample of Multi-worker training with keras in colab :

import json
import os
import sys

if ‘.’ not in sys.path:
sys.path.insert(0, ‘.’)
import tensorflow as tf

%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)

return train_dataset

def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28,28)),
tf.keras.layers.Reshape(target_shape=(28,28,1)),
tf.keras.layers.Conv2D(32, 3, activation=‘relu’),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation=‘relu’),
tf.keras.layers.Dense(10)])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=[‘accuracy’])
return model

tf_config = {
‘cluster’: {‘worker’:[‘localhost:12345’, ‘localhost:23456’]},
‘task’: {‘type’: ‘worker’, ‘index’: 0}}

json.dumps(tf_config)
os.environ[‘GREETINGS’] = ‘Hello Tensorflow!’
!echo ${GREETINGS}
strategy = tf.distribute.MultiWorkerMirroredStrategy()

import mnist_setup
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()

%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_work_batch_size = 64
tf_config = json.loads(os.environ[‘TF_CONFIG’])
num_workers = len(tf_config[‘cluster’][‘worker’])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_work_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
mutil_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

!ls *.py
os.environ[‘TF_CONFIG’] = json.dumps(tf_config)
%killbgscripts
!python main.py &> job_0.log

it runed over and over util 1 hour. Then I coded:

import time
time.sleep(10)
!cat job_0.log

it showed:

2023-11-23 10:37:48.050630: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT
2023-11-23 11:37:50.937150: E external/local_tsl/tsl/distributed_runtime/coordination/coordination_service_agent.cc:767] Coordination agent is set to ERROR: DEADLINE_EXCEEDED: Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:
/job:worker/replica:0/task:1

Additional GRPC error information from remote target /job:worker/replica:0/task:0 while calling /tensorflow.CoordinationService/WaitForAllTasks:
:{“created”:“@1700739470.936549355”,“description”:“Error received from peer ipv4:127.0.0.1:12345”,“file”:“external/com_github_grpc_grpc/src/core/lib/surface/call.cc”,“file_line”:1056,“grpc_message”:“Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:\n/job:worker/replica:0/task:1\n”,“grpc_status”:4} [type.googleapis.com/tensorflow.CoordinationServiceError=‘’]
2023-11-23 11:37:50.937687: E tensorflow/core/common_runtime/base_collective_executor.cc:249] BaseCollectiveExecutor::StartAbort DEADLINE_EXCEEDED: Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:
/job:worker/replica:0/task:1

Additional GRPC error information from remote target /job:worker/replica:0/task:0 while calling /tensorflow.CoordinationService/WaitForAllTasks:
:{“created”:“@1700739470.936549355”,“description”:“Error received from peer ipv4:127.0.0.1:12345”,“file”:“external/com_github_grpc_grpc/src/core/lib/surface/call.cc”,“file_line”:1056,“grpc_message”:“Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:\n/job:worker/replica:0/task:1\n”,“grpc_status”:4} [type.googleapis.com/tensorflow.CoordinationServiceError=‘’]
2023-11-23 11:37:50.937844: E tensorflow/core/common_runtime/eager/context_distributed_manager.cc:846] Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:
/job:worker/replica:0/task:1

Additional GRPC error information from remote target /job:worker/replica:0/task:0 while calling /tensorflow.CoordinationService/WaitForAllTasks:
:{“created”:“@1700739470.936549355”,“description”:“Error received from peer ipv4:127.0.0.1:12345”,“file”:“external/com_github_grpc_grpc/src/core/lib/surface/call.cc”,“file_line”:1056,“grpc_message”:“Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:\n/job:worker/replica:0/task:1\n”,“grpc_status”:4}
Traceback (most recent call last):
File “/content/main.py”, line 11, in
strategy = tf.distribute.MultiWorkerMirroredStrategy()
File “/usr/local/lib/python3.10/dist-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py”, line 186, in init
CollectiveAllReduceExtended(
File “/usr/local/lib/python3.10/dist-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py”, line 339, in init
self._initialize_strategy(self._cluster_resolver, devices=devices)
File “/usr/local/lib/python3.10/dist-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py”, line 358, in _initialize_strategy
self._initialize_multi_worker(cluster_resolver)
File “/usr/local/lib/python3.10/dist-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py”, line 530, in _initialize_multi_worker
context.context().ensure_initialized()
File “/usr/local/lib/python3.10/dist-packages/tensorflow/python/eager/context.py”, line 620, in ensure_initialized
pywrap_tfe.TFE_EnableCollectiveOps(context_handle, server_def_str)
tensorflow.python.framework.errors_impl.DeadlineExceededError: Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:
/job:worker/replica:0/task:1

Additional GRPC error information from remote target /job:worker/replica:0/task:0 while calling /tensorflow.CoordinationService/WaitForAllTasks:
:{“created”:“@1700739470.936549355”,“description”:“Error received from peer ipv4:127.0.0.1:12345”,“file”:“external/com_github_grpc_grpc/src/core/lib/surface/call.cc”,“file_line”:1056,“grpc_message”:“Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:\n/job:worker/replica:0/task:1\n”,“grpc_status”:4}
2023-11-23 11:37:51.511448: W tensorflow/core/common_runtime/eager/context.cc:628] Unable to destroy server_ object, so releasing instead. Servers don’t support clean shutdown.
2023-11-23 11:37:51.511816: E external/local_tsl/tsl/distributed_runtime/coordination/coordination_service_agent.cc:517] Shutdown() was called while coordination agent is in error state, implying that distributed execution failed. Note: agent will still shutdown anyway. Agent status: DEADLINE_EXCEEDED: Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:
/job:worker/replica:0/task:1

Additional GRPC error information from remote target /job:worker/replica:0/task:0 while calling /tensorflow.CoordinationService/WaitForAllTasks:
:{“created”:“@1700739470.936549355”,“description”:“Error received from peer ipv4:127.0.0.1:12345”,“file”:“external/com_github_grpc_grpc/src/core/lib/surface/call.cc”,“file_line”:1056,“grpc_message”:“Barrier timed out. Barrier_id: WaitForAllTasks::16776239125982028645. Timed out task names:\n/job:worker/replica:0/task:1\n”,“grpc_status”:4} [type.googleapis.com/tensorflow.CoordinationServiceError=‘’]
This is usually caused by an earlier error during execution. Check the logs (this task or the leader) for an earlier error to debug further.

please help me resolve the problem, thank you very much!