From e025d21fc668b6c9908af7a27de753ab28c14ca7 Mon Sep 17 00:00:00 2001 From: Ma Jie Yue Date: Wed, 6 Nov 2024 20:54:36 +0800 Subject: [PATCH 1/2] in should_early_stop, we add check if all workers failed in nodecheck, and if yes, stop the job. it happens in workers of AllReduce_Strategy jobs --- dlrover/python/common/constants.py | 2 +- .../python/master/node/dist_job_manager.py | 33 +++++++++++++++++++ dlrover/python/tests/test_job_manager.py | 16 +++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/dlrover/python/common/constants.py b/dlrover/python/common/constants.py index e1c81e5ba..f71d8e8fd 100644 --- a/dlrover/python/common/constants.py +++ b/dlrover/python/common/constants.py @@ -118,7 +118,7 @@ class JobExitReason(object): RDZV_TIMEOUT_ERROR = "RdzvTimeout" PENDING_TIMEOUT = "PendingTimeout" UNCOMPLETED_TIMEOUT = "UncompletedTimeout" - + RDZV_ALL_FAILED = "RdzvAllFailed" class CustomMetricKeys: RDZV_ROUND = "rdzv_round" diff --git a/dlrover/python/master/node/dist_job_manager.py b/dlrover/python/master/node/dist_job_manager.py index 848a9706e..62cf6e341 100644 --- a/dlrover/python/master/node/dist_job_manager.py +++ b/dlrover/python/master/node/dist_job_manager.py @@ -243,7 +243,40 @@ def is_all_reduce_type_job(self): == DistributionStrategy.ALLREDUCE ) + def is_all_workers_node_check_failed(self): + return all([ + node.is_node_check_failed() + for _, node in self._job_nodes[NodeType.WORKER].items() + ]) + def should_early_stop(self): + # node-check all failed + if ( + self.is_all_reduce_type_job() + and self.is_all_workers_node_check_failed() + ): + msg = ( + "Stop the training early because all worker nodes has " + "failed the node check in rendezvous." + ) + + self._process_error( + None, + 0, + msg, + level=TrainingExceptionLevel.RDZV_ERROR, + ) + + self._report_event( + ErrorMonitorConstants.TYPE_INFO, + "job", + ErrorMonitorConstants.ACTION_EARLY_STOP, + "All node check failed", + {"nodes": json.dumps(self._worker_manager.cur_nodes)}, + ) + + return True, JobExitReason.RDZV_ALL_FAILED, msg + # ps pending judgement: any ps pod pending timeout timeout_ps_nodes = ( self._ps_manager.get_pending_timeout_oom_recovered_node() diff --git a/dlrover/python/tests/test_job_manager.py b/dlrover/python/tests/test_job_manager.py index 50fed89c7..15ecdcf91 100644 --- a/dlrover/python/tests/test_job_manager.py +++ b/dlrover/python/tests/test_job_manager.py @@ -575,6 +575,10 @@ def test_check_worker_status(self): manager._job_nodes[NodeType.WORKER][0].status = NodeStatus.FINISHED self.assertTrue(manager.all_critical_node_completed()) + for worker in manager._job_nodes[NodeType.WORKER].values(): + worker.reported_status = 2 + self.assertTrue(manager.is_all_workers_node_check_failed()) + def test_tf_ps_node_handling(self): params = MockK8sPSJobArgs() params.initilize() @@ -730,6 +734,18 @@ def test_early_stop_part3(self): result, reason, msg = manager.should_early_stop() self.assertFalse(result) + def test_early_stop_part4(self): + params = MockK8sAllreduceJobArgs() + params.initilize() + manager = create_job_manager(params, SpeedMonitor()) + manager._init_nodes() + + manager.is_all_workers_node_check_failed = mock.MagicMock(return_value=True) + result, reason, msg = manager.should_early_stop() + self.assertTrue(result) + self.assertEqual(reason, JobExitReason.RDZV_ALL_FAILED) + + def test_when_node_not_init(self): params = MockK8sPSJobArgs() params.initilize() From 0ad2a3e529b53d88e32081ddabe9f93f402c86d2 Mon Sep 17 00:00:00 2001 From: Ma Jie Yue Date: Wed, 6 Nov 2024 21:12:23 +0800 Subject: [PATCH 2/2] pass precommit run -a --- dlrover/python/common/constants.py | 1 + dlrover/python/master/node/dist_job_manager.py | 10 ++++++---- dlrover/python/tests/test_job_manager.py | 5 +++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dlrover/python/common/constants.py b/dlrover/python/common/constants.py index f71d8e8fd..dd9b752ba 100644 --- a/dlrover/python/common/constants.py +++ b/dlrover/python/common/constants.py @@ -120,6 +120,7 @@ class JobExitReason(object): UNCOMPLETED_TIMEOUT = "UncompletedTimeout" RDZV_ALL_FAILED = "RdzvAllFailed" + class CustomMetricKeys: RDZV_ROUND = "rdzv_round" TRAINING_ERROR_LEVEL = "error_level" diff --git a/dlrover/python/master/node/dist_job_manager.py b/dlrover/python/master/node/dist_job_manager.py index 62cf6e341..f7994cc89 100644 --- a/dlrover/python/master/node/dist_job_manager.py +++ b/dlrover/python/master/node/dist_job_manager.py @@ -244,10 +244,12 @@ def is_all_reduce_type_job(self): ) def is_all_workers_node_check_failed(self): - return all([ - node.is_node_check_failed() - for _, node in self._job_nodes[NodeType.WORKER].items() - ]) + return all( + [ + node.is_node_check_failed() + for _, node in self._job_nodes[NodeType.WORKER].items() + ] + ) def should_early_stop(self): # node-check all failed diff --git a/dlrover/python/tests/test_job_manager.py b/dlrover/python/tests/test_job_manager.py index 15ecdcf91..e9ea486f0 100644 --- a/dlrover/python/tests/test_job_manager.py +++ b/dlrover/python/tests/test_job_manager.py @@ -740,12 +740,13 @@ def test_early_stop_part4(self): manager = create_job_manager(params, SpeedMonitor()) manager._init_nodes() - manager.is_all_workers_node_check_failed = mock.MagicMock(return_value=True) + manager.is_all_workers_node_check_failed = mock.MagicMock( + return_value=True + ) result, reason, msg = manager.should_early_stop() self.assertTrue(result) self.assertEqual(reason, JobExitReason.RDZV_ALL_FAILED) - def test_when_node_not_init(self): params = MockK8sPSJobArgs() params.initilize()