Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(): Fix unbound variable issue in v1.8 #155

Merged
merged 10 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 86 additions & 21 deletions datahub-actions/src/datahub_actions/cli/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import click
from click_default_group import DefaultGroup
from datahub.configuration.config_loader import load_config_file
from expandvars import UnboundVariable

import datahub_actions as datahub_actions_package
from datahub_actions.pipeline.pipeline import Pipeline
from datahub_actions.pipeline.pipeline_manager import PipelineManager

logger = logging.getLogger(__name__)


# Instantiate a singleton instance of the Pipeline Manager.
pipeline_manager = PipelineManager()

Expand All @@ -52,6 +52,44 @@ def actions() -> None:
pass


def load_raw_config_file(config_file: pathlib.Path) -> dict:
"""
Load a config file as raw YAML/JSON without variable expansion.

Args:
config_file: Path to the configuration file

Returns:
dict: Raw configuration dictionary

Raises:
Exception: If the file cannot be loaded or is invalid YAML/JSON
"""
try:
with open(config_file, "r") as f:
import yaml

return yaml.safe_load(f)
except Exception as e:
raise Exception(
f"Failed to load raw configuration file {config_file}: {e}"
) from e


def is_pipeline_enabled(config: dict) -> bool:
"""
Check if a pipeline configuration is enabled.

Args:
config: Raw configuration dictionary

Returns:
bool: True if pipeline is enabled, False otherwise
"""
enabled = config.get("enabled", True)
return not (enabled == "false" or enabled is False)


@actions.command(
name="run",
context_settings=dict(
Expand All @@ -70,43 +108,70 @@ def run(ctx: Any, config: List[str], debug: bool) -> None:
)

if debug:
# Set root logger settings to debug mode.
logging.getLogger().setLevel(logging.DEBUG)
else:
# Set root logger settings to info mode.
logging.getLogger().setLevel(logging.INFO)

# Statically configured to be registered with the pipeline Manager.
pipelines: List[Pipeline] = []

logger.debug("Creating Actions Pipelines...")

# If individual pipeline config was provided, create a pipeline from it.
if config is not None:
for pipeline_config in config:
pipeline_config_file = pathlib.Path(pipeline_config)
pipeline_config_dict = load_config_file(pipeline_config_file)
enabled = pipeline_config_dict.get("enabled", True)
if enabled == "false" or enabled is False:
# Phase 1: Initial validation of configs
valid_configs = []
for pipeline_config in config:
pipeline_config_file = pathlib.Path(pipeline_config)
try:
# First just load the raw config to check if it's enabled
raw_config = load_raw_config_file(pipeline_config_file)

if not is_pipeline_enabled(raw_config):
logger.warning(
f"Skipping pipeline {pipeline_config_dict.get('name')} as it is not enabled"
f"Skipping pipeline {raw_config.get('name') or pipeline_config} as it is not enabled"
)
continue

# now load the config with variable expansion
valid_configs.append(pipeline_config_file)

except Exception as e:
if len(config) == 1:
raise Exception(
f"Failed to load raw configuration file {pipeline_config_file}"
) from e
logger.warning(
f"Failed to load pipeline configuration! Skipping action config file {pipeline_config_file}...: {e}"
)

# Phase 2: Full config loading and pipeline creation
for pipeline_config_file in valid_configs:
try:
# Now load the full config with variable expansion
pipeline_config_dict = load_config_file(pipeline_config_file)
pipelines.append(
pipeline_config_to_pipeline(pipeline_config_dict)
) # Now, instantiate the pipeline.
pipelines.append(pipeline_config_to_pipeline(pipeline_config_dict))
except UnboundVariable as e:
if len(valid_configs) == 1:
raise Exception(
"Failed to load action configuration. Unbound variable(s) provided in config YAML."
) from e
logger.warning(
f"Failed to resolve variables in config file {pipeline_config_file}...: {e}"
)
continue

# Exit early if no valid pipelines were created
if not pipelines:
logger.error(
f"No valid pipelines were started from {len(config)} config(s). "
"Check that at least one pipeline is enabled and all required environment variables are set."
)
sys.exit(1)

logger.debug("Starting Actions Pipelines")

# Start each pipeline.
# Start each pipeline
for p in pipelines:
pipeline_manager.start_pipeline(p.name, p)
logger.info(f"Action Pipeline with name '{p.name}' is now running.")

# Now, simply run forever.
# Now, run forever only if we have valid pipelines
while True:
time.sleep(5)

Expand All @@ -121,10 +186,10 @@ def version() -> None:


# Handle shutdown signal. (ctrl-c)
def handle_shutdown(signum, frame):
def handle_shutdown(signum: int, frame: Any) -> None:
logger.info("Stopping all running Action Pipelines...")
pipeline_manager.stop_all()
exit(1)
sys.exit(1)


signal.signal(signal.SIGINT, handle_shutdown)
Empty file.
Loading
Loading