From 0f264f567f65ce1f6d18dd1729a85738d41ed341 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Tue, 23 Apr 2024 11:41:53 +1200 Subject: [PATCH] Reduce logging verbosity, tweak tests. --- cylc/flow/task_events_mgr.py | 3 ++- cylc/flow/task_pool.py | 7 ++++--- tests/integration/test_task_events_mgr.py | 3 ++- tests/integration/test_task_pool.py | 23 ++++++++++++++--------- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 14f376dfb74..e3275d76aed 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -1788,6 +1788,7 @@ def _reset_job_timers(self, itask): itask.timeout = None itask.poll_timer = None return + ctx = (itask.submit_num, itask.state.status) if itask.poll_timer and itask.poll_timer.ctx == ctx: return @@ -1844,7 +1845,7 @@ def _reset_job_timers(self, itask): message += '%d*' % (num + 1) message += '%s,' % intvl_as_str(item) message += '...' - LOG.info(f"[{itask}] {message}") + LOG.debug(f"[{itask}] {message}") # Set next poll time self.check_poll_time(itask) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index b7f8c0601cb..1e501d5e987 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -221,7 +221,7 @@ def add_to_pool(self, itask) -> None: self.active_tasks.setdefault(itask.point, {}) self.active_tasks[itask.point][itask.identity] = itask self.active_tasks_changed = True - LOG.info(f"[{itask}] added to active task pool") + LOG.debug(f"[{itask}] added to active task pool") self.create_data_store_elements(itask) @@ -839,7 +839,7 @@ def remove(self, itask, reason=None): # TODO: same for datastore (still updated by scheduler loop) self.workflow_db_mgr.put_update_task_state(itask) - level = logging.INFO + level = logging.DEBUG if itask.state( TASK_STATUS_PREPARING, TASK_STATUS_SUBMITTED, @@ -1654,7 +1654,8 @@ def spawn_task( submit_num == 0 ): # Previous instance removed before completing any outputs. - LOG.info(f"Flow stopping at {point}/{name} - task previously removed") + LOG.info( + f"Flow blocked at {point}/{name} - task previously removed") return None itask = self._get_task_proxy_db_outputs( diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py index 7f3fa488162..62994487624 100644 --- a/tests/integration/test_task_events_mgr.py +++ b/tests/integration/test_task_events_mgr.py @@ -17,6 +17,7 @@ from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext from cylc.flow.scheduler import Scheduler +import logging from typing import Any as Fixture @@ -51,7 +52,7 @@ async def test__reset_job_timers( process_execution_polling_intervals. """ schd = scheduler(flow(one_conf)) - async with start(schd): + async with start(schd, level=logging.DEBUG): itask = schd.pool.get_tasks()[0] itask.state.status = 'running' itask.platform['execution polling intervals'] = [25] diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 2ad2e26a068..913616e7d2e 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -148,7 +148,7 @@ async def mod_example_flow( """ id_ = mod_flow(EXAMPLE_FLOW_CFG) schd: 'Scheduler' = mod_scheduler(id_, paused_start=True) - async with mod_run(schd): + async with mod_run(schd, level=logging.DEBUG): yield schd @@ -1198,7 +1198,7 @@ async def test_detect_incomplete_tasks( } }) schd = scheduler(id_) - async with start(schd) as log: + async with start(schd, level=logging.DEBUG) as log: itasks = schd.pool.get_tasks() for itask in itasks: itask.state_reset(is_queued=False) @@ -1279,7 +1279,7 @@ async def test_set_failed_complete( """Test manual completion of an incomplete failed task.""" id_ = flow(one_conf) schd = scheduler(id_) - async with start(schd) as log: + async with start(schd, level=logging.DEBUG) as log: one = schd.pool.get_tasks()[0] one.state_reset(is_queued=False) @@ -1899,12 +1899,13 @@ async def test_fast_respawn( # attempt to spawn it again itask = task_pool.spawn_task("foo", IntegerPoint("1"), {1}) assert itask is None - assert "Not spawning 1/foo: already used in this flow" in caplog.text + assert "Flow blocked at 1/foo - task previously removed" in caplog.text async def test_remove_active_task( example_flow: 'Scheduler', caplog: pytest.LogCaptureFixture, + log_filter: Callable, ) -> None: """Test warning on removing an active task.""" @@ -1917,9 +1918,13 @@ async def test_remove_active_task( task_pool.remove(foo, "request") assert foo not in task_pool.get_tasks() - assert ( - "removed from active task pool: request - active job orphaned" - in caplog.text + assert log_filter( + caplog, + regex=( + "1/foo.*removed from active task pool:" + " request - active job orphaned" + ), + level=logging.WARNING ) @@ -1947,7 +1952,7 @@ async def test_remove_by_suicide( } }) schd: 'Scheduler' = scheduler(id_) - async with start(schd) as log: + async with start(schd, level=logging.DEBUG) as log: # it should start up with 1/a and 1/b assert pool_get_task_ids(schd.pool) == ["1/a", "1/b"] a = schd.pool.get_task(IntegerPoint("1"), "a") @@ -2020,7 +2025,7 @@ async def test_remove_no_respawn(flow, scheduler, start, log_filter): # respawned as a result schd.pool.spawn_on_output(b1, TASK_OUTPUT_SUCCEEDED) assert log_filter( - log, contains='Not spawning 1/z: already used in this flow' + log, contains='Flow blocked at 1/z - task previously removed' ) z1 = schd.pool.get_task(IntegerPoint("1"), "z") assert (