From dfefb3f8d41b62e94290dd769d8513238afcbba2 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 23 Jan 2025 16:42:57 -0800 Subject: [PATCH 01/10] Fix unbound variable issue --- .../src/datahub_actions/cli/actions.py | 50 ++++++++++++------- examples/hello_world_datahub_cloud.yaml | 14 +++--- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/datahub-actions/src/datahub_actions/cli/actions.py b/datahub-actions/src/datahub_actions/cli/actions.py index d695ae2..338bf96 100644 --- a/datahub-actions/src/datahub_actions/cli/actions.py +++ b/datahub-actions/src/datahub_actions/cli/actions.py @@ -1,17 +1,3 @@ -# Copyright 2021 Acryl Data, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - import logging import pathlib import signal @@ -22,6 +8,7 @@ import click from click_default_group import DefaultGroup from datahub.configuration.config_loader import load_config_file +from expandvars import UnboundVariable import datahub_actions as datahub_actions_package from datahub_actions.pipeline.pipeline import Pipeline @@ -29,7 +16,6 @@ logger = logging.getLogger(__name__) - # Instantiate a singleton instance of the Pipeline Manager. pipeline_manager = PipelineManager() @@ -78,14 +64,32 @@ def run(ctx: Any, config: List[str], debug: bool) -> None: # Statically configured to be registered with the pipeline Manager. pipelines: List[Pipeline] = [] + attempted_configs = 0 logger.debug("Creating Actions Pipelines...") # If individual pipeline config was provided, create a pipeline from it. if config is not None: for pipeline_config in config: + attempted_configs += 1 pipeline_config_file = pathlib.Path(pipeline_config) - pipeline_config_dict = load_config_file(pipeline_config_file) + try: + # Attempt to load the configuration file + pipeline_config_dict = load_config_file(pipeline_config_file) + logger.info("Pipeline configuration loaded successfully.") + except UnboundVariable as e: + if len(config) == 1: + raise Exception( + "Failed to load action configuration. Unbound variable(s) provided in config YAML." + ) from e + else: + # Multiple configs, simply log and continue. + # Log the unbound variable error + logger.error( + f"Failed to load pipeline configuration! Skipping action...: {e}" + ) + continue + enabled = pipeline_config_dict.get("enabled", True) if enabled == "false" or enabled is False: logger.warning( @@ -99,6 +103,14 @@ def run(ctx: Any, config: List[str], debug: bool) -> None: pipeline_config_to_pipeline(pipeline_config_dict) ) # Now, instantiate the pipeline. + # Exit early if no valid pipelines were created + if not pipelines: + logger.error( + f"No valid pipelines were started from {attempted_configs} config(s). " + "Check that at least one pipeline is enabled and all required variables are bound." + ) + sys.exit(1) + logger.debug("Starting Actions Pipelines") # Start each pipeline. @@ -106,7 +118,7 @@ def run(ctx: Any, config: List[str], debug: bool) -> None: pipeline_manager.start_pipeline(p.name, p) logger.info(f"Action Pipeline with name '{p.name}' is now running.") - # Now, simply run forever. + # Now, run forever only if we have valid pipelines while True: time.sleep(5) @@ -121,10 +133,10 @@ def version() -> None: # Handle shutdown signal. (ctrl-c) -def handle_shutdown(signum, frame): +def handle_shutdown(signum: int, frame: Any) -> None: logger.info("Stopping all running Action Pipelines...") pipeline_manager.stop_all() - exit(1) + sys.exit(1) signal.signal(signal.SIGINT, handle_shutdown) diff --git a/examples/hello_world_datahub_cloud.yaml b/examples/hello_world_datahub_cloud.yaml index fb35cb6..d8802cf 100644 --- a/examples/hello_world_datahub_cloud.yaml +++ b/examples/hello_world_datahub_cloud.yaml @@ -1,13 +1,11 @@ # hello_world.yaml -name: "hello_world_datahub_cloud" -# 1. DataHub Cloud Connection: Configure how to talk to DataHub Cloud +name: "hello_world_datahub_cloud_2" +# 1. Event Source: Where to source event from. datahub: - server: "https://.acryl.io" - token: "" -# 2. Event Source: Where to source event from. + server: "https://longtailcompanions.acryl.io" + token: "eyJhbGciOiJIUzI1NiJ9.eyJhY3RvclR5cGUiOiJVU0VSIiwiYWN0b3JJZCI6ImpvaG4uam95Y2VAYWNyeWwuaW8iLCJ0eXBlIjoiUEVSU09OQUwiLCJ2ZXJzaW9uIjoiMiIsImp0aSI6ImMyYTAwYzAxLWNjZWMtNDRiNC05NGIxLTNmZWY3ZTk3ODkzNiIsInN1YiI6ImpvaG4uam95Y2VAYWNyeWwuaW8iLCJleHAiOjE3NDAwMDk3MjQsImlzcyI6ImRhdGFodWItbWV0YWRhdGEtc2VydmljZSJ9.WpIRnVb4qktlcVC_EJS-7i4E_rLJ3CTe3XHT6cL18oE" source: type: "datahub-cloud" -# 3. Action: What action to take on events. -# To learn how to develop a custom Action, see https://datahubproject.io/docs/actions/guides/developing-an-action +# 2. Action: What action to take on events. action: - type: "hello_world" + type: "hello_world" \ No newline at end of file From 54d4b9debfad611b1292ae38dec4587652242514 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 23 Jan 2025 16:45:49 -0800 Subject: [PATCH 02/10] add back liences --- datahub-actions/src/datahub_actions/cli/actions.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/datahub-actions/src/datahub_actions/cli/actions.py b/datahub-actions/src/datahub_actions/cli/actions.py index 338bf96..0dc966f 100644 --- a/datahub-actions/src/datahub_actions/cli/actions.py +++ b/datahub-actions/src/datahub_actions/cli/actions.py @@ -1,3 +1,17 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging import pathlib import signal From 3020fe1100ba1d5ed601251ec2b828ab7379f4b9 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 23 Jan 2025 16:46:33 -0800 Subject: [PATCH 03/10] reset condig --- examples/hello_world_datahub_cloud.yaml | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/examples/hello_world_datahub_cloud.yaml b/examples/hello_world_datahub_cloud.yaml index d8802cf..9efd019 100644 --- a/examples/hello_world_datahub_cloud.yaml +++ b/examples/hello_world_datahub_cloud.yaml @@ -1,11 +1,13 @@ # hello_world.yaml -name: "hello_world_datahub_cloud_2" -# 1. Event Source: Where to source event from. +name: "hello_world_datahub_cloud" +# 1. DataHub Cloud Connection: Configure how to talk to DataHub Cloud datahub: - server: "https://longtailcompanions.acryl.io" - token: "eyJhbGciOiJIUzI1NiJ9.eyJhY3RvclR5cGUiOiJVU0VSIiwiYWN0b3JJZCI6ImpvaG4uam95Y2VAYWNyeWwuaW8iLCJ0eXBlIjoiUEVSU09OQUwiLCJ2ZXJzaW9uIjoiMiIsImp0aSI6ImMyYTAwYzAxLWNjZWMtNDRiNC05NGIxLTNmZWY3ZTk3ODkzNiIsInN1YiI6ImpvaG4uam95Y2VAYWNyeWwuaW8iLCJleHAiOjE3NDAwMDk3MjQsImlzcyI6ImRhdGFodWItbWV0YWRhdGEtc2VydmljZSJ9.WpIRnVb4qktlcVC_EJS-7i4E_rLJ3CTe3XHT6cL18oE" + server: "https://.acryl.io" + token: "" +# 2. Event Source: Where to source event from. source: type: "datahub-cloud" -# 2. Action: What action to take on events. +# 3. Action: What action to take on events. +# To learn how to develop a custom Action, see https://datahubproject.io/docs/actions/guides/developing-an-action action: type: "hello_world" \ No newline at end of file From 6d2f5a5d15affdeee7e49b498e7b6fe55dd11d59 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 23 Jan 2025 16:47:03 -0800 Subject: [PATCH 04/10] adding tests --- datahub-actions/tests/unit/cli/__init__.py | 0 .../tests/unit/cli/test_actions.py | 305 ++++++++++++++++++ 2 files changed, 305 insertions(+) create mode 100644 datahub-actions/tests/unit/cli/__init__.py create mode 100644 datahub-actions/tests/unit/cli/test_actions.py diff --git a/datahub-actions/tests/unit/cli/__init__.py b/datahub-actions/tests/unit/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/datahub-actions/tests/unit/cli/test_actions.py b/datahub-actions/tests/unit/cli/test_actions.py new file mode 100644 index 0000000..26e6306 --- /dev/null +++ b/datahub-actions/tests/unit/cli/test_actions.py @@ -0,0 +1,305 @@ +import logging +import os +import tempfile +from typing import Generator +from unittest.mock import Mock + +import pytest +from click.testing import CliRunner + +from datahub_actions.cli.actions import actions, pipeline_config_to_pipeline +from datahub_actions.pipeline.pipeline import Pipeline +from datahub_actions.pipeline.pipeline_manager import PipelineManager + + +@pytest.fixture +def capture_logger( + caplog: pytest.LogCaptureFixture, +) -> Generator[pytest.LogCaptureFixture, None, None]: + """Fixture to capture logger output with the proper level.""" + caplog.set_level(logging.INFO) + yield caplog + + +@pytest.fixture +def mock_pipeline() -> Mock: + """Create a mock pipeline object.""" + mock = Mock(spec=Pipeline) + mock.name = "test_pipeline" + return mock + + +@pytest.fixture +def mock_pipeline_manager() -> Mock: + """Create a mock pipeline manager.""" + mock = Mock(spec=PipelineManager) + mock.start_pipeline = Mock() + mock.stop_all = Mock() + return mock + + +@pytest.fixture +def temp_config_file() -> Generator[str, None, None]: + """Creates a temporary YAML config file for testing.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write( + """ +name: "test_pipeline" +datahub: + server: "https://test.datahub.io" + token: "test-token" +source: + type: "datahub-cloud" +action: + type: "hello_world" +""" + ) + config_path = f.name + + yield config_path + # Cleanup + os.unlink(config_path) + + +@pytest.fixture +def disabled_config_file() -> Generator[str, None, None]: + """Creates a temporary YAML config file with disabled pipeline.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write( + """ +name: "disabled_pipeline" +enabled: false +datahub: + server: "https://test.datahub.io" + token: "test-token" +source: + type: "datahub-cloud" +action: + type: "hello_world" +""" + ) + config_path = f.name + + yield config_path + # Cleanup + os.unlink(config_path) + + +@pytest.fixture +def invalid_config_file() -> Generator[str, None, None]: + """Creates a temporary YAML config file with unbound variables.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write( + """ +name: "invalid_pipeline" +datahub: + server: "https://test.datahub.io" + token: ${UNDEFINED_VAR} +source: + type: "datahub-cloud" +action: + type: "hello_world" +""" + ) + config_path = f.name + + yield config_path + # Cleanup + os.unlink(config_path) + + +def test_version_command() -> None: + """Test the version command outputs version information.""" + runner = CliRunner() + result = runner.invoke(actions, ["version"]) + assert result.exit_code == 0 + assert "DataHub Actions version:" in result.output + assert "Python version:" in result.output + + +def test_disabled_pipeline_exits( + disabled_config_file: str, capture_logger: pytest.LogCaptureFixture +) -> None: + """Test that disabled pipelines cause program exit.""" + runner = CliRunner() + result = runner.invoke(actions, ["run", "-c", disabled_config_file]) + + assert result.exit_code == 1 + assert any( + "Skipping pipeline disabled_pipeline as it is not enabled" in record.message + for record in capture_logger.records + ) + + +def test_invalid_config_single(invalid_config_file: str) -> None: + """Test handling of single config with unbound variables.""" + runner = CliRunner() + result = runner.invoke(actions, ["run", "-c", invalid_config_file]) + assert result.exit_code != 0 + assert "Unbound variable(s) provided in config YAML" in str(result.exception) + + +def test_all_configs_invalid_or_disabled( + invalid_config_file: str, + disabled_config_file: str, + capture_logger: pytest.LogCaptureFixture, +) -> None: + """Test that program exits when all configs are invalid or disabled.""" + runner = CliRunner() + result = runner.invoke( + actions, ["run", "-c", invalid_config_file, "-c", disabled_config_file] + ) + assert result.exit_code == 1 + assert any( + "No valid pipelines were started from 2 config(s). Check that at least one pipeline is enabled and all required variables are bound." + in record.message + for record in capture_logger.records + ) + + +def test_mixed_valid_and_invalid_configs( + temp_config_file: str, + disabled_config_file: str, + capture_logger: pytest.LogCaptureFixture, + monkeypatch: pytest.MonkeyPatch, + mock_pipeline: Mock, +) -> None: + """Test handling mix of valid and invalid configs.""" + + def mock_create_pipeline(config: dict) -> Pipeline: + """Mock implementation of pipeline creation.""" + return mock_pipeline + + sleep_count = 0 + + def mock_sleep(seconds: int) -> None: + """Mock sleep that raises KeyboardInterrupt after first call.""" + nonlocal sleep_count + sleep_count += 1 + if sleep_count > 1: # Allow one sleep to ensure logs are captured + raise KeyboardInterrupt() + + # Mock the pipeline creation function and sleep + monkeypatch.setattr( + "datahub_actions.pipeline.pipeline.Pipeline.create", mock_create_pipeline + ) + monkeypatch.setattr("time.sleep", mock_sleep) + + runner = CliRunner() + result = runner.invoke( + actions, ["run", "-c", temp_config_file, "-c", disabled_config_file] + ) + + assert result.exit_code == 1 + print(capture_logger.records) + assert any( + "Pipeline configuration loaded successfully." in record.message + for record in capture_logger.records + ) + assert any( + "Skipping pipeline disabled_pipeline as it is not enable" in record.message + for record in capture_logger.records + ) + assert any( + "Action Pipeline with name 'test_pipeline' is now running." in record.message + for record in capture_logger.records + ) + + +def test_debug_mode_with_valid_config( + temp_config_file: str, + capture_logger: pytest.LogCaptureFixture, + monkeypatch: pytest.MonkeyPatch, + mock_pipeline: Mock, + mock_pipeline_manager: Mock, +) -> None: + """Test debug mode with valid pipeline config.""" + + def mock_create_pipeline(config: dict) -> Pipeline: + """Mock implementation of pipeline creation.""" + return mock_pipeline + + sleep_count = 0 + + def mock_sleep(seconds: int) -> None: + """Mock sleep that raises KeyboardInterrupt after first call.""" + nonlocal sleep_count + sleep_count += 1 + if sleep_count > 1: # Allow one sleep to ensure logs are captured + raise KeyboardInterrupt() + + # Mock the pipeline creation function and sleep + monkeypatch.setattr( + "datahub_actions.pipeline.pipeline.Pipeline.create", mock_create_pipeline + ) + monkeypatch.setattr("time.sleep", mock_sleep) + monkeypatch.setattr( + "datahub_actions.cli.actions.pipeline_manager", mock_pipeline_manager + ) # Fixed path! + + runner = CliRunner() + result = runner.invoke( + actions, + ["run", "-c", temp_config_file, "--debug"], + ) + + assert result.exit_code == 1 + print(capture_logger.records) + assert any( + "Pipeline configuration loaded successfully" in record.message + for record in capture_logger.records + ) + assert any( + "Action Pipeline with name 'test_pipeline' is now running." in record.message + for record in capture_logger.records + ) + + +def test_pipeline_config_to_pipeline_error() -> None: + """Test error handling in pipeline creation.""" + invalid_config = { + "name": "invalid_pipeline", + # Missing required fields + } + + with pytest.raises(Exception) as exc_info: + pipeline_config_to_pipeline(invalid_config) + assert "Failed to instantiate Actions Pipeline" in str(exc_info.value) + + +def test_multiple_disabled_configs( + disabled_config_file: str, + capture_logger: pytest.LogCaptureFixture, +) -> None: + """Test handling of multiple disabled configs.""" + runner = CliRunner() + result = runner.invoke( + actions, ["run", "-c", disabled_config_file, "-c", disabled_config_file] + ) + assert result.exit_code == 1 + assert any( + "No valid pipelines were started from 2 config(s). Check that at least one pipeline is enabled and all required variables are bound." + in record.message + for record in capture_logger.records + ) + assert any( + "Skipping pipeline disabled_pipeline as it is not enabled" in record.message + for record in capture_logger.records + ) + + +# Type checking with mypy annotations +def test_type_annotations() -> None: + """Verify type hints are correct (this is a compile-time check).""" + # These assignments will be checked by mypy + runner: CliRunner = CliRunner() + config_dict: dict = { + "name": "test_pipeline", + "datahub": {"server": "https://test.datahub.io", "token": "test-token"}, + "source": {"type": "datahub-cloud"}, + "action": {"type": "hello_world"}, + } + + # These assertions serve as runtime checks + assert isinstance(runner, CliRunner) + assert isinstance(config_dict, dict) From 26833b25c9b4cb3a990bfb83e63ccd99d1d1baf1 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 23 Jan 2025 17:46:36 -0800 Subject: [PATCH 05/10] Address comments --- datahub-actions/src/datahub_actions/cli/actions.py | 10 ++++------ datahub-actions/tests/unit/cli/test_actions.py | 4 ++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/datahub-actions/src/datahub_actions/cli/actions.py b/datahub-actions/src/datahub_actions/cli/actions.py index 0dc966f..d464214 100644 --- a/datahub-actions/src/datahub_actions/cli/actions.py +++ b/datahub-actions/src/datahub_actions/cli/actions.py @@ -78,14 +78,12 @@ def run(ctx: Any, config: List[str], debug: bool) -> None: # Statically configured to be registered with the pipeline Manager. pipelines: List[Pipeline] = [] - attempted_configs = 0 logger.debug("Creating Actions Pipelines...") # If individual pipeline config was provided, create a pipeline from it. if config is not None: for pipeline_config in config: - attempted_configs += 1 pipeline_config_file = pathlib.Path(pipeline_config) try: # Attempt to load the configuration file @@ -99,8 +97,8 @@ def run(ctx: Any, config: List[str], debug: bool) -> None: else: # Multiple configs, simply log and continue. # Log the unbound variable error - logger.error( - f"Failed to load pipeline configuration! Skipping action...: {e}" + logger.warning( + f"Failed to load pipeline configuration! Skipping action config file {pipeline_config_file}...: {e}" ) continue @@ -120,8 +118,8 @@ def run(ctx: Any, config: List[str], debug: bool) -> None: # Exit early if no valid pipelines were created if not pipelines: logger.error( - f"No valid pipelines were started from {attempted_configs} config(s). " - "Check that at least one pipeline is enabled and all required variables are bound." + f"No valid pipelines were started from {len(config)} config(s). " + "Check that at least one pipeline is enabled and all required environment variables are set." ) sys.exit(1) diff --git a/datahub-actions/tests/unit/cli/test_actions.py b/datahub-actions/tests/unit/cli/test_actions.py index 26e6306..7d9a888 100644 --- a/datahub-actions/tests/unit/cli/test_actions.py +++ b/datahub-actions/tests/unit/cli/test_actions.py @@ -151,7 +151,7 @@ def test_all_configs_invalid_or_disabled( ) assert result.exit_code == 1 assert any( - "No valid pipelines were started from 2 config(s). Check that at least one pipeline is enabled and all required variables are bound." + "No valid pipelines were started from 2 config(s). Check that at least one pipeline is enabled and all required environment variables are set." in record.message for record in capture_logger.records ) @@ -278,7 +278,7 @@ def test_multiple_disabled_configs( ) assert result.exit_code == 1 assert any( - "No valid pipelines were started from 2 config(s). Check that at least one pipeline is enabled and all required variables are bound." + "No valid pipelines were started from 2 config(s). Check that at least one pipeline is enabled and all required environment variables are set." in record.message for record in capture_logger.records ) From 97eb106023f73d3e42d073cee4763873ec5a5c3b Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 23 Jan 2025 18:21:12 -0800 Subject: [PATCH 06/10] Attempting to address comments further --- .../src/datahub_actions/cli/actions.py | 107 ++++++++++++------ .../tests/unit/cli/test_actions.py | 64 +++++------ 2 files changed, 105 insertions(+), 66 deletions(-) diff --git a/datahub-actions/src/datahub_actions/cli/actions.py b/datahub-actions/src/datahub_actions/cli/actions.py index d464214..e278a1b 100644 --- a/datahub-actions/src/datahub_actions/cli/actions.py +++ b/datahub-actions/src/datahub_actions/cli/actions.py @@ -52,6 +52,44 @@ def actions() -> None: pass +def load_raw_config_file(config_file: pathlib.Path) -> dict: + """ + Load a config file as raw YAML/JSON without variable expansion. + + Args: + config_file: Path to the configuration file + + Returns: + dict: Raw configuration dictionary + + Raises: + Exception: If the file cannot be loaded or is invalid YAML/JSON + """ + try: + with open(config_file, "r") as f: + import yaml + + return yaml.safe_load(f) + except Exception as e: + raise Exception( + f"Failed to load raw configuration file {config_file}: {e}" + ) from e + + +def is_pipeline_enabled(config: dict) -> bool: + """ + Check if a pipeline configuration is enabled. + + Args: + config: Raw configuration dictionary + + Returns: + bool: True if pipeline is enabled, False otherwise + """ + enabled = config.get("enabled", True) + return not (enabled == "false" or enabled is False) + + @actions.command( name="run", context_settings=dict( @@ -70,50 +108,53 @@ def run(ctx: Any, config: List[str], debug: bool) -> None: ) if debug: - # Set root logger settings to debug mode. logging.getLogger().setLevel(logging.DEBUG) else: - # Set root logger settings to info mode. logging.getLogger().setLevel(logging.INFO) - # Statically configured to be registered with the pipeline Manager. pipelines: List[Pipeline] = [] - logger.debug("Creating Actions Pipelines...") - # If individual pipeline config was provided, create a pipeline from it. - if config is not None: - for pipeline_config in config: - pipeline_config_file = pathlib.Path(pipeline_config) - try: - # Attempt to load the configuration file - pipeline_config_dict = load_config_file(pipeline_config_file) - logger.info("Pipeline configuration loaded successfully.") - except UnboundVariable as e: - if len(config) == 1: - raise Exception( - "Failed to load action configuration. Unbound variable(s) provided in config YAML." - ) from e - else: - # Multiple configs, simply log and continue. - # Log the unbound variable error - logger.warning( - f"Failed to load pipeline configuration! Skipping action config file {pipeline_config_file}...: {e}" - ) - continue - - enabled = pipeline_config_dict.get("enabled", True) - if enabled == "false" or enabled is False: + # Phase 1: Initial validation of configs + valid_configs = [] + for pipeline_config in config: + pipeline_config_file = pathlib.Path(pipeline_config) + try: + # First just load the raw config to check if it's enabled + raw_config = load_raw_config_file(pipeline_config_file) + + if not is_pipeline_enabled(raw_config): logger.warning( - f"Skipping pipeline {pipeline_config_dict.get('name')} as it is not enabled" + f"Skipping pipeline {raw_config.get('name') or pipeline_config} as it is not enabled" ) continue - # now load the config with variable expansion + valid_configs.append(pipeline_config_file) + + except Exception as e: + if len(config) == 1: + raise Exception( + f"Failed to load raw configuration file {pipeline_config_file}" + ) from e + logger.warning( + f"Failed to load pipeline configuration! Skipping action config file {pipeline_config_file}...: {e}" + ) + + # Phase 2: Full config loading and pipeline creation + for pipeline_config_file in valid_configs: + try: + # Now load the full config with variable expansion pipeline_config_dict = load_config_file(pipeline_config_file) - pipelines.append( - pipeline_config_to_pipeline(pipeline_config_dict) - ) # Now, instantiate the pipeline. + pipelines.append(pipeline_config_to_pipeline(pipeline_config_dict)) + except UnboundVariable as e: + if len(config) == 1: + raise Exception( + "Failed to load action configuration. Unbound variable(s) provided in config YAML." + ) from e + logger.warning( + f"Failed to resolve variables in config file {pipeline_config_file}...: {e}" + ) + continue # Exit early if no valid pipelines were created if not pipelines: @@ -125,7 +166,7 @@ def run(ctx: Any, config: List[str], debug: bool) -> None: logger.debug("Starting Actions Pipelines") - # Start each pipeline. + # Start each pipeline for p in pipelines: pipeline_manager.start_pipeline(p.name, p) logger.info(f"Action Pipeline with name '{p.name}' is now running.") diff --git a/datahub-actions/tests/unit/cli/test_actions.py b/datahub-actions/tests/unit/cli/test_actions.py index 7d9a888..dd5b456 100644 --- a/datahub-actions/tests/unit/cli/test_actions.py +++ b/datahub-actions/tests/unit/cli/test_actions.py @@ -10,7 +10,16 @@ from datahub_actions.cli.actions import actions, pipeline_config_to_pipeline from datahub_actions.pipeline.pipeline import Pipeline from datahub_actions.pipeline.pipeline_manager import PipelineManager +from contextlib import contextmanager +@contextmanager +def local_monkeypatch(monkeypatch, target, replacement): + """Apply monkeypatch temporarily within a context.""" + monkeypatch.setattr(target, replacement) + try: + yield + finally: + monkeypatch.undo() @pytest.fixture def capture_logger( @@ -179,25 +188,21 @@ def mock_sleep(seconds: int) -> None: if sleep_count > 1: # Allow one sleep to ensure logs are captured raise KeyboardInterrupt() - # Mock the pipeline creation function and sleep - monkeypatch.setattr( - "datahub_actions.pipeline.pipeline.Pipeline.create", mock_create_pipeline - ) - monkeypatch.setattr("time.sleep", mock_sleep) - runner = CliRunner() - result = runner.invoke( - actions, ["run", "-c", temp_config_file, "-c", disabled_config_file] - ) + + # Use local_monkeypatch for tighter control + + with local_monkeypatch( + monkeypatch, "datahub_actions.pipeline.pipeline.Pipeline.create", mock_create_pipeline + ), local_monkeypatch(monkeypatch, "time.sleep", mock_sleep): + result = runner.invoke( + actions, + ["run", "-c", temp_config_file, "-c", disabled_config_file] + ) assert result.exit_code == 1 - print(capture_logger.records) assert any( - "Pipeline configuration loaded successfully." in record.message - for record in capture_logger.records - ) - assert any( - "Skipping pipeline disabled_pipeline as it is not enable" in record.message + "Skipping pipeline disabled_pipeline as it is not enabled" in record.message for record in capture_logger.records ) assert any( @@ -228,27 +233,20 @@ def mock_sleep(seconds: int) -> None: if sleep_count > 1: # Allow one sleep to ensure logs are captured raise KeyboardInterrupt() - # Mock the pipeline creation function and sleep - monkeypatch.setattr( - "datahub_actions.pipeline.pipeline.Pipeline.create", mock_create_pipeline - ) - monkeypatch.setattr("time.sleep", mock_sleep) - monkeypatch.setattr( - "datahub_actions.cli.actions.pipeline_manager", mock_pipeline_manager - ) # Fixed path! - runner = CliRunner() - result = runner.invoke( - actions, - ["run", "-c", temp_config_file, "--debug"], - ) + + # Use local_monkeypatch for tighter control + with local_monkeypatch( + monkeypatch, "datahub_actions.pipeline.pipeline.Pipeline.create", mock_create_pipeline + ), local_monkeypatch(monkeypatch, "time.sleep", mock_sleep), local_monkeypatch( + monkeypatch, "datahub_actions.cli.actions.pipeline_manager", mock_pipeline_manager + ): + result = runner.invoke( + actions, + ["run", "-c", temp_config_file, "--debug"], + ) assert result.exit_code == 1 - print(capture_logger.records) - assert any( - "Pipeline configuration loaded successfully" in record.message - for record in capture_logger.records - ) assert any( "Action Pipeline with name 'test_pipeline' is now running." in record.message for record in capture_logger.records From b8804f529db50cad02ee040e7c4603d73e850ac1 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 23 Jan 2025 18:24:49 -0800 Subject: [PATCH 07/10] Attempting to address --- .../src/datahub_actions/cli/actions.py | 2 +- .../tests/unit/cli/test_actions.py | 37 +++++++++++++++---- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/datahub-actions/src/datahub_actions/cli/actions.py b/datahub-actions/src/datahub_actions/cli/actions.py index e278a1b..3d4a798 100644 --- a/datahub-actions/src/datahub_actions/cli/actions.py +++ b/datahub-actions/src/datahub_actions/cli/actions.py @@ -147,7 +147,7 @@ def run(ctx: Any, config: List[str], debug: bool) -> None: pipeline_config_dict = load_config_file(pipeline_config_file) pipelines.append(pipeline_config_to_pipeline(pipeline_config_dict)) except UnboundVariable as e: - if len(config) == 1: + if len(valid_configs) == 1: raise Exception( "Failed to load action configuration. Unbound variable(s) provided in config YAML." ) from e diff --git a/datahub-actions/tests/unit/cli/test_actions.py b/datahub-actions/tests/unit/cli/test_actions.py index dd5b456..0db581a 100644 --- a/datahub-actions/tests/unit/cli/test_actions.py +++ b/datahub-actions/tests/unit/cli/test_actions.py @@ -1,6 +1,7 @@ import logging import os import tempfile +from contextlib import contextmanager from typing import Generator from unittest.mock import Mock @@ -10,7 +11,7 @@ from datahub_actions.cli.actions import actions, pipeline_config_to_pipeline from datahub_actions.pipeline.pipeline import Pipeline from datahub_actions.pipeline.pipeline_manager import PipelineManager -from contextlib import contextmanager + @contextmanager def local_monkeypatch(monkeypatch, target, replacement): @@ -21,6 +22,7 @@ def local_monkeypatch(monkeypatch, target, replacement): finally: monkeypatch.undo() + @pytest.fixture def capture_logger( caplog: pytest.LogCaptureFixture, @@ -159,6 +161,22 @@ def test_all_configs_invalid_or_disabled( actions, ["run", "-c", invalid_config_file, "-c", disabled_config_file] ) assert result.exit_code == 1 + assert ( + "Failed to load action configuration. Unbound variable(s) provided in config YAML." + in str(result.exception) + ) + + +def test_all_configs_multiple_disabled( + disabled_config_file: str, + capture_logger: pytest.LogCaptureFixture, +) -> None: + """Test that program exits when all configs are invalid or disabled.""" + runner = CliRunner() + result = runner.invoke( + actions, ["run", "-c", disabled_config_file, "-c", disabled_config_file] + ) + assert result.exit_code == 1 assert any( "No valid pipelines were started from 2 config(s). Check that at least one pipeline is enabled and all required environment variables are set." in record.message @@ -191,13 +209,14 @@ def mock_sleep(seconds: int) -> None: runner = CliRunner() # Use local_monkeypatch for tighter control - + with local_monkeypatch( - monkeypatch, "datahub_actions.pipeline.pipeline.Pipeline.create", mock_create_pipeline + monkeypatch, + "datahub_actions.pipeline.pipeline.Pipeline.create", + mock_create_pipeline, ), local_monkeypatch(monkeypatch, "time.sleep", mock_sleep): result = runner.invoke( - actions, - ["run", "-c", temp_config_file, "-c", disabled_config_file] + actions, ["run", "-c", temp_config_file, "-c", disabled_config_file] ) assert result.exit_code == 1 @@ -237,9 +256,13 @@ def mock_sleep(seconds: int) -> None: # Use local_monkeypatch for tighter control with local_monkeypatch( - monkeypatch, "datahub_actions.pipeline.pipeline.Pipeline.create", mock_create_pipeline + monkeypatch, + "datahub_actions.pipeline.pipeline.Pipeline.create", + mock_create_pipeline, ), local_monkeypatch(monkeypatch, "time.sleep", mock_sleep), local_monkeypatch( - monkeypatch, "datahub_actions.cli.actions.pipeline_manager", mock_pipeline_manager + monkeypatch, + "datahub_actions.cli.actions.pipeline_manager", + mock_pipeline_manager, ): result = runner.invoke( actions, From 19e0b30a227451a8abd669dcc543a67b7357d2ca Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 23 Jan 2025 18:37:35 -0800 Subject: [PATCH 08/10] Updates --- datahub-actions/tests/unit/pipeline/test_pipeline_manager.py | 2 +- examples/hello_world_datahub_cloud.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py b/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py index 89e330d..38dccb0 100644 --- a/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py +++ b/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py @@ -13,7 +13,7 @@ # limitations under the License. import pytest - +import tests.unit.test_helpers from datahub_actions.pipeline.pipeline import Pipeline from datahub_actions.pipeline.pipeline_manager import PipelineManager diff --git a/examples/hello_world_datahub_cloud.yaml b/examples/hello_world_datahub_cloud.yaml index 9efd019..fb35cb6 100644 --- a/examples/hello_world_datahub_cloud.yaml +++ b/examples/hello_world_datahub_cloud.yaml @@ -10,4 +10,4 @@ source: # 3. Action: What action to take on events. # To learn how to develop a custom Action, see https://datahubproject.io/docs/actions/guides/developing-an-action action: - type: "hello_world" \ No newline at end of file + type: "hello_world" From 583507b4ef0bb68c7d2d7608eb95dc58eea4a5ab Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 23 Jan 2025 18:40:32 -0800 Subject: [PATCH 09/10] Adding --- datahub-actions/tests/unit/pipeline/test_pipeline_manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py b/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py index 38dccb0..197d628 100644 --- a/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py +++ b/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py @@ -13,7 +13,6 @@ # limitations under the License. import pytest -import tests.unit.test_helpers from datahub_actions.pipeline.pipeline import Pipeline from datahub_actions.pipeline.pipeline_manager import PipelineManager From 0f9ffd0f1c5575b4be7067c88276f10e9eecbaf4 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 23 Jan 2025 18:50:59 -0800 Subject: [PATCH 10/10] fix all tests --- datahub-actions/tests/unit/cli/test_actions.py | 12 ++++++++---- .../tests/unit/pipeline/test_pipeline_manager.py | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/datahub-actions/tests/unit/cli/test_actions.py b/datahub-actions/tests/unit/cli/test_actions.py index 0db581a..493419c 100644 --- a/datahub-actions/tests/unit/cli/test_actions.py +++ b/datahub-actions/tests/unit/cli/test_actions.py @@ -44,8 +44,8 @@ def mock_pipeline() -> Mock: def mock_pipeline_manager() -> Mock: """Create a mock pipeline manager.""" mock = Mock(spec=PipelineManager) - mock.start_pipeline = Mock() - mock.stop_all = Mock() + mock.start_pipeline = Mock(return_value=None) + mock.stop_all = Mock(return_value=None) return mock @@ -153,7 +153,6 @@ def test_invalid_config_single(invalid_config_file: str) -> None: def test_all_configs_invalid_or_disabled( invalid_config_file: str, disabled_config_file: str, - capture_logger: pytest.LogCaptureFixture, ) -> None: """Test that program exits when all configs are invalid or disabled.""" runner = CliRunner() @@ -190,6 +189,7 @@ def test_mixed_valid_and_invalid_configs( capture_logger: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch, mock_pipeline: Mock, + mock_pipeline_manager: Mock, ) -> None: """Test handling mix of valid and invalid configs.""" @@ -214,7 +214,11 @@ def mock_sleep(seconds: int) -> None: monkeypatch, "datahub_actions.pipeline.pipeline.Pipeline.create", mock_create_pipeline, - ), local_monkeypatch(monkeypatch, "time.sleep", mock_sleep): + ), local_monkeypatch(monkeypatch, "time.sleep", mock_sleep), local_monkeypatch( + monkeypatch, + "datahub_actions.cli.actions.pipeline_manager", + mock_pipeline_manager, + ): result = runner.invoke( actions, ["run", "-c", temp_config_file, "-c", disabled_config_file] ) diff --git a/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py b/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py index 197d628..89e330d 100644 --- a/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py +++ b/datahub-actions/tests/unit/pipeline/test_pipeline_manager.py @@ -13,6 +13,7 @@ # limitations under the License. import pytest + from datahub_actions.pipeline.pipeline import Pipeline from datahub_actions.pipeline.pipeline_manager import PipelineManager