Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Dec 4, 2023
1 parent b528ac7 commit 6eda2d3
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ def terminate(self) -> None:
self.__shared_memory_manager.shutdown()

logger.info("Terminating %r...", self.__next_step)
# self.__next_step.terminate()
self.__next_step.terminate()

def join(self, timeout: Optional[float] = None) -> None:
start_join = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ def test_parallel_transform_step() -> None:

transform_step.close()

pool.close()

metrics.calls.clear()

with assert_changes(
Expand Down Expand Up @@ -275,6 +273,7 @@ def test_parallel_transform_step() -> None:
],
):
transform_step.join()
pool.close()

assert next_step.submit.call_count == len(messages)

Expand Down Expand Up @@ -308,8 +307,7 @@ def test_parallel_run_task_terminate_workers() -> None:
starting_processes,
), assert_changes(lambda: int(next_step.terminate.call_count), 0, 1):
transform_step.terminate()

pool.close()
pool.close()


_COUNT_CALLS = 0
Expand Down Expand Up @@ -654,6 +652,7 @@ def test_output_block_resizing_without_limits() -> None:

strategy.close()
strategy.join(timeout=3)
pool.close()

assert (
next_step.submit.call_args_list
Expand All @@ -672,8 +671,6 @@ def test_output_block_resizing_without_limits() -> None:
in TestingMetricsBackend.calls
)

pool.close()


def message_processor_raising_invalid_message(x: Message[KafkaPayload]) -> KafkaPayload:
raise InvalidMessage(
Expand Down

0 comments on commit 6eda2d3

Please sign in to comment.