Skip to content

Commit

Permalink
change how futures are set up in the worker events test
Browse files Browse the repository at this point in the history
  • Loading branch information
stan-dot committed Oct 11, 2024
1 parent cd91f0f commit 98204c8
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions tests/unit_tests/worker/test_task_worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import itertools
import threading
from collections.abc import Callable, Iterable
Expand Down Expand Up @@ -358,15 +359,31 @@ def assert_running_count_plan_produces_ordered_worker_and_data_events(
]

count = itertools.count()
events: Future[list[Any]] = take_events_from_streams(
event_streams,
lambda _: next(count) >= len(expected_events) - 1,
)
events = []

async def collect_events():
async for event in take_events_from_streams(
event_streams,
lambda _: next(count) >= len(expected_events) - 1,
):
events.append(event)
if len(events) >= len(expected_events):
break

task_id = worker.submit_task(task)
worker.begin_task(task_id)
results = events.result(timeout=timeout)
# Await for events to be collected with proper timeout
try:
asyncio.run(asyncio.wait_for(collect_events(), timeout=timeout))
except asyncio.TimeoutError:
pytest.fail(f"Test timed out after {timeout} seconds while waiting for events.")

_compare_events(expected_events, task_id, results)


def _compare_events(
expected_events: list[DataEvent | WorkerEvent], task_id: str, results: list[Any]
) -> None:
for actual, expected in itertools.zip_longest(results, expected_events):
if isinstance(expected, WorkerEvent):
if expected.task_status:
Expand Down

0 comments on commit 98204c8

Please sign in to comment.