From ea3b64827925f5df25e0d5acda115b5f08780302 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 27 Jul 2023 14:28:08 +0200 Subject: [PATCH 1/4] Log worker close reason in events --- distributed/tests/test_scheduler.py | 19 +++++++++++++++++++ distributed/worker.py | 1 + 2 files changed, 20 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index d0703361d3..c24bb72e90 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1755,6 +1755,12 @@ async def test_close_worker(s, a, b): await asyncio.sleep(0.2) assert len(s.workers) == 1 + events = s.get_events(a.address) + assert any( + "reason" in msg + for _, msg in events + if "closing-worker" in msg.get("action", "") + ) @pytest.mark.slow @@ -4477,3 +4483,16 @@ async def test_scatter_creates_ts(c, s, a, b): await a.close() assert await x2 == 2 assert s.tasks["x"].run_spec is not None + + +@gen_cluster( + client=True, + nthreads=[], + config={"distributed.scheduler.default-task-durations": {"slowinc": 1000}}, +) +async def test_scale_up_large_tasks(c, s): + futures = c.map(slowinc, range(10)) + while not s.tasks: + await asyncio.sleep(0.001) + + assert s.adaptive_target() == 10 diff --git a/distributed/worker.py b/distributed/worker.py index 391002cd5a..c0172597d6 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1521,6 +1521,7 @@ async def close( # type: ignore disable_gc_diagnosis() + self.log_event(self.address, {"action": "closing-worker", "reason": reason}) try: logger.info("Stopping worker at %s. Reason: %s", self.address, reason) except ValueError: # address not available if already closed From a30dd7a35b32d2d23c027fbd88b46b8d09251083 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 28 Jul 2023 12:02:28 +0200 Subject: [PATCH 2/4] Remove wrong test --- distributed/tests/test_scheduler.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index c24bb72e90..65426fb4e8 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4483,16 +4483,3 @@ async def test_scatter_creates_ts(c, s, a, b): await a.close() assert await x2 == 2 assert s.tasks["x"].run_spec is not None - - -@gen_cluster( - client=True, - nthreads=[], - config={"distributed.scheduler.default-task-durations": {"slowinc": 1000}}, -) -async def test_scale_up_large_tasks(c, s): - futures = c.map(slowinc, range(10)) - while not s.tasks: - await asyncio.sleep(0.001) - - assert s.adaptive_target() == 10 From 437668dab00b06eb7363e0b685ccf613db93b644 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 31 Jul 2023 12:39:45 +0200 Subject: [PATCH 3/4] Log exception when sending an event if the server is not up --- distributed/worker.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index c0172597d6..9e6cb988cb 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1521,7 +1521,12 @@ async def close( # type: ignore disable_gc_diagnosis() - self.log_event(self.address, {"action": "closing-worker", "reason": reason}) + try: + self.log_event(self.address, {"action": "closing-worker", "reason": reason}) + except Exception: + # This can happen when the Server is not up yet + logger.exception("Failed to log closing event") + try: logger.info("Stopping worker at %s. Reason: %s", self.address, reason) except ValueError: # address not available if already closed From c0887a265470ca2c0e935cd2ce602c3a46c306e6 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 31 Jul 2023 18:17:49 +0200 Subject: [PATCH 4/4] Increase events-cleanup-delay in test_clear_events --- distributed/tests/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 65426fb4e8..275148f605 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -751,7 +751,7 @@ async def test_remove_worker_by_name_from_scheduler(s, a, b): ) -@gen_cluster(config={"distributed.scheduler.events-cleanup-delay": "10 ms"}) +@gen_cluster(config={"distributed.scheduler.events-cleanup-delay": "500 ms"}) async def test_clear_events_worker_removal(s, a, b): assert a.address in s.events assert a.address in s.workers