diff --git a/mara_pipelines/incremental_processing/processed_files.py b/mara_pipelines/incremental_processing/processed_files.py index fd04915..69a983e 100644 --- a/mara_pipelines/incremental_processing/processed_files.py +++ b/mara_pipelines/incremental_processing/processed_files.py @@ -74,4 +74,5 @@ def already_processed_file(node_path: str, file_name: str) -> datetime: FROM data_integration_processed_file WHERE node_path = {'%s'} AND file_name = {'%s'} """, (node_path, file_name,)) - return cursor.fetchone()[0] \ No newline at end of file + row = cursor.fetchone() + return row[0] if row else None \ No newline at end of file diff --git a/mara_pipelines/parallel_tasks/files.py b/mara_pipelines/parallel_tasks/files.py index be43fd6..8ae6d13 100644 --- a/mara_pipelines/parallel_tasks/files.py +++ b/mara_pipelines/parallel_tasks/files.py @@ -55,7 +55,14 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read self._db_alias = db_alias self.timezone = timezone - self.use_workers = self.read_mode == [ReadMode.ALL, ReadMode.ONLY_NEW, ReadMode.ONLY_CHANGED] + self.use_workers = self.read_mode in [ReadMode.ALL] # NOTE: It should be possible to implement here ReadMode.ONLY_NEW + # and ReadMode.ONLY_CHANGED as well. I tried it but run into + # issues with the lambda function used in `process_commands`: + # + # The commands passed via `feed_workers` are processed through a + # multiprocessing.Queue. Probably all objects/functions passed over + # there need to be declared on root level. See as well: + # https://stackoverflow.com/a/8805244 @property def db_alias(self): diff --git a/mara_pipelines/pipelines.py b/mara_pipelines/pipelines.py index 24d01f8..39259d1 100644 --- a/mara_pipelines/pipelines.py +++ b/mara_pipelines/pipelines.py @@ -137,6 +137,7 @@ def run(self): raise ValueError(f"Unexecpted type passed to command_queue: {value}") for command in commands: + command.parent = self if not command.run(): return False diff --git a/tests/test_parallel_read.py b/tests/test_parallel_read.py index 696be9b..bc8d3f7 100644 --- a/tests/test_parallel_read.py +++ b/tests/test_parallel_read.py @@ -25,7 +25,7 @@ def empty_files(): root_path = EMPTY_FILES_BASE_PATH root_path.mkdir(parents=True, exist_ok=True) - file_list = [str((root_path / str(file)).absolute()) for file in range(1, 8*3)] + file_list = [str((root_path / str(file)).absolute()) for file in range(25)] # create empty files for file in file_list: @@ -64,6 +64,7 @@ def test_read_mode_all(empty_files): description="Runs a test pipeline which checks if a file exist", file_pattern='*', read_mode=ReadMode.ALL, - target_table=None)) + target_table=None, + max_number_of_parallel_tasks=4)) run_pipeline(pipeline) \ No newline at end of file