From 56f2095d0d6b23f802eced995021dd2c115d9843 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Wed, 22 May 2024 08:26:34 -0600 Subject: [PATCH] Add option to raise errors during `find_pipelines` (#3823) * Add option to raise errors during `find_pipelines` Signed-off-by: Deepyaman Datta * Add coverage for non-simplified project structures Signed-off-by: Deepyaman Datta * Fix skipping for non-simplified project structures Signed-off-by: Deepyaman Datta * Add required type hint for `raise_errors` argument Signed-off-by: Deepyaman Datta * Add docstring and release notes for `raise_errors` Signed-off-by: Deepyaman Datta * Add documentation on the `raise_error` flag to web Signed-off-by: Deepyaman Datta --------- Signed-off-by: Deepyaman Datta --- RELEASE.md | 3 +- .../nodes_and_pipelines/pipeline_registry.md | 4 +- kedro/framework/project/__init__.py | 26 +++++++++- kedro/pipeline/modular_pipeline.py | 4 ++ .../project/test_pipeline_discovery.py | 48 ++++++++++++------- 5 files changed, 63 insertions(+), 22 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index b75cc57a22..a74ec12cd7 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,6 +1,7 @@ # Upcoming Release 0.19.6 ## Major features and improvements +* Added `raise_errors` argument to `find_pipelines`. If `True`, the first pipeline for which autodiscovery fails will cause an error to be raised. The default behaviour is still to raise a warning for each failing pipeline. * It is now possible to use Kedro without having `rich` installed. * Added a `--telemetry` flag to `kedro run`, allowing consent to data usage to be granted or revoked at the same time the command is run. @@ -45,7 +46,7 @@ Many thanks to the following Kedroids for contributing PRs to this release: * Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings. * Updated CLI command `kedro catalog resolve` to read credentials properly. * Changed the path of where pipeline tests generated with `kedro pipeline create` from `/src/tests/pipelines/` to `/tests/pipelines/`. -* Updated ``.gitignore`` to prevent pushing Mlflow local runs folder to a remote forge when using mlflow and git. +* Updated ``.gitignore`` to prevent pushing MLflow local runs folder to a remote forge when using MLflow and Git. * Fixed error handling message for malformed yaml/json files in OmegaConfigLoader. * Fixed a bug in `node`-creation allowing self-dependencies when using transcoding, that is datasets named like `name@format`. * Improved error message when passing wrong value to node. diff --git a/docs/source/nodes_and_pipelines/pipeline_registry.md b/docs/source/nodes_and_pipelines/pipeline_registry.md index 6b92b30512..6520c06e01 100644 --- a/docs/source/nodes_and_pipelines/pipeline_registry.md +++ b/docs/source/nodes_and_pipelines/pipeline_registry.md @@ -51,7 +51,9 @@ Under the hood, the `find_pipelines()` function traverses the `src/.pipelines.` module 3. Validating that the constructed object is a {py:class}`~kedro.pipeline.Pipeline` -If any of these steps fail, `find_pipelines()` raises an appropriate warning and skips the current pipeline but continues traversal. +By default, if any of these steps fail, `find_pipelines()` (or `find_pipelines(raise_errors=False)`) raises an appropriate warning and skips the current pipeline but continues traversal. During development, this enables you to run your project with some pipelines, even if other pipelines are broken or works in progress. + +If you specify `find_pipelines(raise_errors=True)`, the autodiscovery process will fail upon the first error. In production, this ensures errors are caught up front, and pipelines do not get excluded accidentally. The mapping returned by `find_pipelines()` can be modified, meaning you are not limited to the pipelines returned by each of the `create_pipeline()` functions found above. For example, to add a data engineering pipeline that isn't part of the default pipeline, add it to the dictionary *after* constructing the default pipeline: diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index 7cfd2a9957..4ced05c0dc 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -347,7 +347,7 @@ def _create_pipeline(pipeline_module: types.ModuleType) -> Pipeline | None: return obj -def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912 +def find_pipelines(raise_errors: bool = False) -> dict[str, Pipeline]: # noqa: PLR0912 """Automatically find modular pipelines having a ``create_pipeline`` function. By default, projects created using Kedro 0.18.3 and higher call this function to autoregister pipelines upon creation/addition. @@ -359,13 +359,23 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912 For more information on the pipeline registry and autodiscovery, see https://kedro.readthedocs.io/en/stable/nodes_and_pipelines/pipeline_registry.html + Args: + raise_errors: If ``True``, raise an error upon failed discovery. + Returns: A generated mapping from pipeline names to ``Pipeline`` objects. + Raises: + ImportError: When a module does not expose a ``create_pipeline`` + function, the ``create_pipeline`` function does not return a + ``Pipeline`` object, or if the module import fails up front. + If ``raise_errors`` is ``False``, see Warns section instead. + Warns: UserWarning: When a module does not expose a ``create_pipeline`` function, the ``create_pipeline`` function does not return a ``Pipeline`` object, or if the module import fails up front. + If ``raise_errors`` is ``True``, see Raises section instead. """ pipeline_obj = None @@ -375,6 +385,12 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912 pipeline_module = importlib.import_module(pipeline_module_name) except Exception as exc: if str(exc) != f"No module named '{pipeline_module_name}'": + if raise_errors: + raise ImportError( + f"An error occurred while importing the " + f"'{pipeline_module_name}' module." + ) from exc + warnings.warn( IMPORT_ERROR_MESSAGE.format( module=pipeline_module_name, tb_exc=traceback.format_exc() @@ -406,7 +422,13 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912 pipeline_module_name = f"{PACKAGE_NAME}.pipelines.{pipeline_name}" try: pipeline_module = importlib.import_module(pipeline_module_name) - except: # noqa: E722 + except Exception as exc: + if raise_errors: + raise ImportError( + f"An error occurred while importing the " + f"'{pipeline_module_name}' module." + ) from exc + warnings.warn( IMPORT_ERROR_MESSAGE.format( module=pipeline_module_name, tb_exc=traceback.format_exc() diff --git a/kedro/pipeline/modular_pipeline.py b/kedro/pipeline/modular_pipeline.py index 2bf672a26c..9eb4caba16 100644 --- a/kedro/pipeline/modular_pipeline.py +++ b/kedro/pipeline/modular_pipeline.py @@ -99,8 +99,10 @@ def _get_dataset_names_mapping( the same as they are named in the provided pipeline. When dict[str, str] is provided, current names will be mapped to new names in the resultant pipeline. + Returns: A dictionary that maps the old dataset names to the provided ones. + Examples: >>> _get_dataset_names_mapping("dataset_name") {"dataset_name": "dataset_name"} # a str name will stay the same @@ -138,8 +140,10 @@ def _get_param_names_mapping( the same as they are named in the provided pipeline. When dict[str, str] is provided, current names will be mapped to new names in the resultant pipeline. + Returns: A dictionary that maps the old parameter names to the provided ones. + Examples: >>> _get_param_names_mapping("param_name") {"params:param_name": "params:param_name"} # a str name will stay the same diff --git a/tests/framework/project/test_pipeline_discovery.py b/tests/framework/project/test_pipeline_discovery.py index 4525144fa5..afffbcfdeb 100644 --- a/tests/framework/project/test_pipeline_discovery.py +++ b/tests/framework/project/test_pipeline_discovery.py @@ -177,12 +177,16 @@ def test_find_pipelines_skips_regular_files_within_the_pipelines_folder( @pytest.mark.parametrize( - "mock_package_name_with_pipelines,pipeline_names", - [(x, x) for x in [set(), {"my_pipeline"}]], - indirect=True, + "mock_package_name_with_pipelines,pipeline_names,raise_errors", + [ + (x, x, raise_errors) + for x in [set(), {"my_pipeline"}] + for raise_errors in [True, False] + ], + indirect=["mock_package_name_with_pipelines", "pipeline_names"], ) def test_find_pipelines_skips_modules_that_cause_exceptions_upon_import( - mock_package_name_with_pipelines, pipeline_names + mock_package_name_with_pipelines, pipeline_names, raise_errors ): # Create a module that will result in errors when we try to load it. pipelines_dir = Path(sys.path[0]) / mock_package_name_with_pipelines / "pipelines" @@ -191,12 +195,14 @@ def test_find_pipelines_skips_modules_that_cause_exceptions_upon_import( (pipeline_dir / "__init__.py").write_text("I walk a lonely road...") configure_project(mock_package_name_with_pipelines) - with pytest.warns( - UserWarning, match=r"An error occurred while importing the '\S+' module." + with getattr(pytest, "raises" if raise_errors else "warns")( + ImportError if raise_errors else UserWarning, + match=r"An error occurred while importing the '\S+' module.", ): - pipelines = find_pipelines() - assert set(pipelines) == pipeline_names | {"__default__"} - assert sum(pipelines.values()).outputs() == pipeline_names + pipelines = find_pipelines(raise_errors=raise_errors) + if not raise_errors: + assert set(pipelines) == pipeline_names | {"__default__"} + assert sum(pipelines.values()).outputs() == pipeline_names @pytest.mark.parametrize( @@ -226,12 +232,16 @@ def create_pipeline(**kwargs) -> Pipeline: @pytest.mark.parametrize( - "mock_package_name_with_pipelines,pipeline_names", - [(x, x) for x in [set(), {"my_pipeline"}]], - indirect=True, + "mock_package_name_with_pipelines,pipeline_names,raise_errors", + [ + (x, x, raise_errors) + for x in [set(), {"my_pipeline"}] + for raise_errors in [True, False] + ], + indirect=["mock_package_name_with_pipelines", "pipeline_names"], ) def test_find_pipelines_skips_unimportable_pipeline_module( - mock_package_name_with_pipelines, pipeline_names + mock_package_name_with_pipelines, pipeline_names, raise_errors ): (Path(sys.path[0]) / mock_package_name_with_pipelines / "pipeline.py").write_text( textwrap.dedent( @@ -248,12 +258,14 @@ def create_pipeline(**kwargs) -> Pipeline: ) configure_project(mock_package_name_with_pipelines) - with pytest.warns( - UserWarning, match=r"An error occurred while importing the '\S+' module." + with getattr(pytest, "raises" if raise_errors else "warns")( + ImportError if raise_errors else UserWarning, + match=r"An error occurred while importing the '\S+' module.", ): - pipelines = find_pipelines() - assert set(pipelines) == pipeline_names | {"__default__"} - assert sum(pipelines.values()).outputs() == pipeline_names + pipelines = find_pipelines(raise_errors=raise_errors) + if not raise_errors: + assert set(pipelines) == pipeline_names | {"__default__"} + assert sum(pipelines.values()).outputs() == pipeline_names @pytest.mark.parametrize(