Skip to content

Commit

Permalink
multiple test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 9, 2023
1 parent 3cf1465 commit eb922ec
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 17 deletions.
14 changes: 3 additions & 11 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,12 +1111,11 @@ def test_workerstate_resumed_waiting_to_flight(ws):
assert ws.tasks["x"].state == "flight"


@pytest.mark.parametrize("critical_section", ["execute", "deserialize_task"])
@pytest.mark.parametrize("resume_inside_critical_section", [False, True])
@pytest.mark.parametrize("resumed_status", ["executing", "resumed"])
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_execute_preamble_early_cancel(
c, s, b, critical_section, resume_inside_critical_section, resumed_status
c, s, b, resume_inside_critical_section, resumed_status
):
"""Test multiple race conditions in the preamble of Worker.execute(), which used to
cause a task to remain permanently in resumed state or to crash the worker through
Expand All @@ -1129,15 +1128,8 @@ async def test_execute_preamble_early_cancel(
test_worker.py::test_execute_preamble_abort_retirement
"""
async with BlockedExecute(s.address) as a:
if critical_section == "execute":
in_ev = a.in_execute
block_ev = a.block_execute
a.block_deserialize_task.set()
else:
assert critical_section == "deserialize_task"
in_ev = a.in_deserialize_task
block_ev = a.block_deserialize_task
a.block_execute.set()
in_ev = a.in_execute
block_ev = a.block_execute

async def resume():
if resumed_status == "executing":
Expand Down
3 changes: 0 additions & 3 deletions distributed/tests/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,14 +518,12 @@ async def test_worker_metrics(c, s, a, b):

# metrics for foo include self and its child bar
assert list(foo_metrics) == [
("execute", "x", "deserialize", "seconds"),
("execute", "x", "thread-cpu", "seconds"),
("execute", "x", "thread-noncpu", "seconds"),
("execute", "x", "executor", "seconds"),
("execute", "x", "other", "seconds"),
("execute", "x", "memory-read", "count"),
("execute", "x", "memory-read", "bytes"),
("execute", "y", "deserialize", "seconds"),
("execute", "y", "thread-cpu", "seconds"),
("execute", "y", "thread-noncpu", "seconds"),
("execute", "y", "executor", "seconds"),
Expand All @@ -536,7 +534,6 @@ async def test_worker_metrics(c, s, a, b):
list(bar0_metrics)
== list(bar1_metrics)
== [
("execute", "y", "deserialize", "seconds"),
("execute", "y", "thread-cpu", "seconds"),
("execute", "y", "thread-noncpu", "seconds"),
("execute", "y", "executor", "seconds"),
Expand Down
3 changes: 2 additions & 1 deletion distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ def f(arg):
annotations={},
span_id=None,
stimulus_id="test",
run_id=5,
)
assert ev.run_spec is not None
ev2 = ev.to_loggable(handled=11.22)
Expand All @@ -402,7 +403,7 @@ def f(arg):
"who_has": {"y": ["w1"]},
"nbytes": {"y": 123},
"priority": [0],
"run_spec": [None, None, None],
"run_spec": None,
"duration": 123.45,
"resource_restrictions": {},
"actor": False,
Expand Down
2 changes: 0 additions & 2 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2287,8 +2287,6 @@ def f(in_task, block_task):
def __init__(self, *args, **kwargs):
self.in_execute = asyncio.Event()
self.block_execute = asyncio.Event()
self.in_deserialize_task = asyncio.Event()
self.block_deserialize_task = asyncio.Event()
self.in_execute_exit = asyncio.Event()
self.block_execute_exit = asyncio.Event()

Expand Down

0 comments on commit eb922ec

Please sign in to comment.