Skip to content

Commit

Permalink
fix(): Fix unbound variable issue in v1.8 (#155)
Browse files Browse the repository at this point in the history
* Fix unbound variable issue

* add back liences

* reset condig

* adding tests

* Address comments

* Attempting to address comments further

* Attempting to address

* Updates

* Adding

* fix all tests

---------

Co-authored-by: John Joyce <[email protected]>
Co-authored-by: John Joyce <[email protected]>
  • Loading branch information
3 people authored Jan 24, 2025
1 parent 7ac12f4 commit ec6e41f
Show file tree
Hide file tree
Showing 3 changed files with 416 additions and 21 deletions.
107 changes: 86 additions & 21 deletions datahub-actions/src/datahub_actions/cli/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import click
from click_default_group import DefaultGroup
from datahub.configuration.config_loader import load_config_file
from expandvars import UnboundVariable

import datahub_actions as datahub_actions_package
from datahub_actions.pipeline.pipeline import Pipeline
from datahub_actions.pipeline.pipeline_manager import PipelineManager

logger = logging.getLogger(__name__)


# Instantiate a singleton instance of the Pipeline Manager.
pipeline_manager = PipelineManager()

Expand All @@ -52,6 +52,44 @@ def actions() -> None:
pass


def load_raw_config_file(config_file: pathlib.Path) -> dict:
"""
Load a config file as raw YAML/JSON without variable expansion.
Args:
config_file: Path to the configuration file
Returns:
dict: Raw configuration dictionary
Raises:
Exception: If the file cannot be loaded or is invalid YAML/JSON
"""
try:
with open(config_file, "r") as f:
import yaml

return yaml.safe_load(f)
except Exception as e:
raise Exception(
f"Failed to load raw configuration file {config_file}: {e}"
) from e


def is_pipeline_enabled(config: dict) -> bool:
"""
Check if a pipeline configuration is enabled.
Args:
config: Raw configuration dictionary
Returns:
bool: True if pipeline is enabled, False otherwise
"""
enabled = config.get("enabled", True)
return not (enabled == "false" or enabled is False)


@actions.command(
name="run",
context_settings=dict(
Expand All @@ -70,43 +108,70 @@ def run(ctx: Any, config: List[str], debug: bool) -> None:
)

if debug:
# Set root logger settings to debug mode.
logging.getLogger().setLevel(logging.DEBUG)
else:
# Set root logger settings to info mode.
logging.getLogger().setLevel(logging.INFO)

# Statically configured to be registered with the pipeline Manager.
pipelines: List[Pipeline] = []

logger.debug("Creating Actions Pipelines...")

# If individual pipeline config was provided, create a pipeline from it.
if config is not None:
for pipeline_config in config:
pipeline_config_file = pathlib.Path(pipeline_config)
pipeline_config_dict = load_config_file(pipeline_config_file)
enabled = pipeline_config_dict.get("enabled", True)
if enabled == "false" or enabled is False:
# Phase 1: Initial validation of configs
valid_configs = []
for pipeline_config in config:
pipeline_config_file = pathlib.Path(pipeline_config)
try:
# First just load the raw config to check if it's enabled
raw_config = load_raw_config_file(pipeline_config_file)

if not is_pipeline_enabled(raw_config):
logger.warning(
f"Skipping pipeline {pipeline_config_dict.get('name')} as it is not enabled"
f"Skipping pipeline {raw_config.get('name') or pipeline_config} as it is not enabled"
)
continue

# now load the config with variable expansion
valid_configs.append(pipeline_config_file)

except Exception as e:
if len(config) == 1:
raise Exception(
f"Failed to load raw configuration file {pipeline_config_file}"
) from e
logger.warning(
f"Failed to load pipeline configuration! Skipping action config file {pipeline_config_file}...: {e}"
)

# Phase 2: Full config loading and pipeline creation
for pipeline_config_file in valid_configs:
try:
# Now load the full config with variable expansion
pipeline_config_dict = load_config_file(pipeline_config_file)
pipelines.append(
pipeline_config_to_pipeline(pipeline_config_dict)
) # Now, instantiate the pipeline.
pipelines.append(pipeline_config_to_pipeline(pipeline_config_dict))
except UnboundVariable as e:
if len(valid_configs) == 1:
raise Exception(
"Failed to load action configuration. Unbound variable(s) provided in config YAML."
) from e
logger.warning(
f"Failed to resolve variables in config file {pipeline_config_file}...: {e}"
)
continue

# Exit early if no valid pipelines were created
if not pipelines:
logger.error(
f"No valid pipelines were started from {len(config)} config(s). "
"Check that at least one pipeline is enabled and all required environment variables are set."
)
sys.exit(1)

logger.debug("Starting Actions Pipelines")

# Start each pipeline.
# Start each pipeline
for p in pipelines:
pipeline_manager.start_pipeline(p.name, p)
logger.info(f"Action Pipeline with name '{p.name}' is now running.")

# Now, simply run forever.
# Now, run forever only if we have valid pipelines
while True:
time.sleep(5)

Expand All @@ -121,10 +186,10 @@ def version() -> None:


# Handle shutdown signal. (ctrl-c)
def handle_shutdown(signum, frame):
def handle_shutdown(signum: int, frame: Any) -> None:
logger.info("Stopping all running Action Pipelines...")
pipeline_manager.stop_all()
exit(1)
sys.exit(1)


signal.signal(signal.SIGINT, handle_shutdown)
Empty file.
Loading

0 comments on commit ec6e41f

Please sign in to comment.