Skip to content

Commit

Permalink
made the new task failure into a single method
Browse files Browse the repository at this point in the history
  • Loading branch information
evalott100 committed Sep 9, 2024
1 parent d4628cd commit f01f90b
Showing 1 changed file with 33 additions and 45 deletions.
78 changes: 33 additions & 45 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,46 +61,46 @@ def configure_epics_environment():


_ALLOWED_CORO_TASKS = {"async_finalizer", "async_setup", "async_teardown"}


@pytest.fixture(autouse=True, scope="function")
def assert_no_pending_tasks(request: FixtureRequest):
"""
if "RE" in request.fixturenames:
# The RE fixture will do the same, as well as some additional loop cleanup
return
"""

# There should be no tasks pending after a test has finished
fail_count = request.session.testsfailed

def error_and_kill_pending_tasks():
loop = asyncio.get_event_loop()
unfinished_tasks = {
task
for task in asyncio.all_tasks(loop)
if task.get_coro().__name__ not in _ALLOWED_CORO_TASKS and not task.done()
}
for task in unfinished_tasks:
task.cancel()

# If the tasks are still pending because the test failed
# for other reasons then we don't have to error here
if unfinished_tasks and request.session.testsfailed == fail_count:
def _error_and_kill_pending_tasks(
loop: asyncio.AbstractEventLoop, request: FixtureRequest, fail_count: int
):
unfinished_tasks = {
task
for task in asyncio.all_tasks(loop)
if task.get_coro().__name__ not in _ALLOWED_CORO_TASKS and not task.done()
}
for task in unfinished_tasks:
task.cancel()

if unfinished_tasks:
# We only raise an exception here if the test failed.
if request.session.testsfailed == fail_count:
raise RuntimeError(
f"Not all tasks closed during test {request.node.name}:\n"
f"{pprint.pformat(unfinished_tasks)}"
)

request.addfinalizer(error_and_kill_pending_tasks)
return unfinished_tasks



@pytest.fixture(autouse=True, scope="function")
def fail_test_on_unclosed_tasks(request: FixtureRequest):

fail_count = request.session.testsfailed
loop = asyncio.get_event_loop()
loop.set_debug(True)

request.addfinalizer(
lambda: _error_and_kill_pending_tasks(loop, request, fail_count)
)


@pytest.fixture(scope="function")
def RE(request: FixtureRequest):
loop = asyncio.new_event_loop()
loop.set_debug(True)
RE = RunEngine({}, call_returns_result=True, loop=loop)

fail_count = request.session.testsfailed

def clean_event_loop():
Expand All @@ -110,27 +110,15 @@ def clean_event_loop():
except TransitionError:
pass

unfinished_tasks = {
task
for task in asyncio.all_tasks(loop)
if task.get_coro().__name__ not in _ALLOWED_CORO_TASKS and not task.done()
}

# Cancelling the tasks is thread unsafe, but we get a new event loop
# with each RE test so we don't need to do this
loop.call_soon_threadsafe(loop.stop)
RE._th.join()
for task in unfinished_tasks:
task.cancel()
loop.close()

# If the tasks are still pending because the test failed
# for other reasons then we don't have to error here
if unfinished_tasks and request.session.testsfailed == fail_count:
raise RuntimeError(
f"Not all tasks closed during test {request.node.name}:\n"
f"{pprint.pformat(unfinished_tasks)}"
)
try:
_error_and_kill_pending_tasks(loop, request, fail_count)
finally:
loop.close()


request.addfinalizer(clean_event_loop)
return RE
Expand Down

0 comments on commit f01f90b

Please sign in to comment.