From 6eda2d3328199927ad704fabea20906dba66cfef Mon Sep 17 00:00:00 2001 From: Lyn Date: Mon, 4 Dec 2023 12:05:23 -0800 Subject: [PATCH] more tests --- .../strategies/run_task_with_multiprocessing.py | 2 +- .../strategies/test_run_task_with_multiprocessing.py | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index 94c7e86e..b2b4e9c0 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -811,7 +811,7 @@ def terminate(self) -> None: self.__shared_memory_manager.shutdown() logger.info("Terminating %r...", self.__next_step) - # self.__next_step.terminate() + self.__next_step.terminate() def join(self, timeout: Optional[float] = None) -> None: start_join = time.time() diff --git a/tests/processing/strategies/test_run_task_with_multiprocessing.py b/tests/processing/strategies/test_run_task_with_multiprocessing.py index b672f3d7..b3f2e39d 100644 --- a/tests/processing/strategies/test_run_task_with_multiprocessing.py +++ b/tests/processing/strategies/test_run_task_with_multiprocessing.py @@ -215,8 +215,6 @@ def test_parallel_transform_step() -> None: transform_step.close() - pool.close() - metrics.calls.clear() with assert_changes( @@ -275,6 +273,7 @@ def test_parallel_transform_step() -> None: ], ): transform_step.join() + pool.close() assert next_step.submit.call_count == len(messages) @@ -308,8 +307,7 @@ def test_parallel_run_task_terminate_workers() -> None: starting_processes, ), assert_changes(lambda: int(next_step.terminate.call_count), 0, 1): transform_step.terminate() - - pool.close() + pool.close() _COUNT_CALLS = 0 @@ -654,6 +652,7 @@ def test_output_block_resizing_without_limits() -> None: strategy.close() strategy.join(timeout=3) + pool.close() assert ( next_step.submit.call_args_list @@ -672,8 +671,6 @@ def test_output_block_resizing_without_limits() -> None: in TestingMetricsBackend.calls ) - pool.close() - def message_processor_raising_invalid_message(x: Message[KafkaPayload]) -> KafkaPayload: raise InvalidMessage(