Skip to content

Commit

Permalink
fix code smell warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
vbusson-pass committed Feb 11, 2025
1 parent 219d873 commit 4baa0b3
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 41 deletions.
20 changes: 13 additions & 7 deletions orchestration/dags/common/access_gcp_secrets.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import json
import secrets
import string
from typing import Optional, Union

from google.api_core.exceptions import AlreadyExists, NotFound, PermissionDenied
from google.auth.exceptions import DefaultCredentialsError
from google.cloud import secretmanager


def access_secret_data(project_id, secret_id, default=None, as_dict=False):
def access_secret_data(
project_id: str,
secret_id: str,
default: Optional[Union[str, dict]] = None,
as_dict: bool = False,
) -> Optional[Union[str, dict]]:
"""
Access secret data from Google Cloud Secret Manager, with optional JSON deserialization.
Expand Down Expand Up @@ -43,8 +49,8 @@ def access_secret_data(project_id, secret_id, default=None, as_dict=False):


def write_secret_key(
project_id: str, secret_id: str, secret_string: str, labels: dict = None
) -> str:
project_id: str, secret_id: str, secret_string: str, labels: Optional[dict] = None
) -> Optional[str]:
"""
Write a string as secret data to Google Cloud Secret Manager.
Expand Down Expand Up @@ -107,9 +113,9 @@ def create_key_if_not_exists(
project_id: str,
secret_id: str,
key_length: int = 32,
secret_string: str = None,
labels: dict = None,
) -> tuple[bool, str | None]:
secret_string: Optional[str] = None,
labels: Optional[dict] = None,
) -> Optional[str]:
"""
Write a string as secret data to Google Cloud Secret Manager only if it doesn't exist.
Expand All @@ -127,7 +133,7 @@ def create_key_if_not_exists(
if secret_string is None:
assert key_length >= 16, "Key length must be >= 16 for security reason"
secret_string = "".join(
secrets.choice(string.digits) for i in range(key_length)
secrets.choice(string.digits) for _ in range(key_length)
)
try:
# Try to access the existing secret
Expand Down
117 changes: 83 additions & 34 deletions orchestration/dags/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,55 +136,104 @@ def waiting_operator(

def delayed_waiting_operator(
dag,
external_dag_id,
external_task_id="end",
allowed_states=["success"],
failed_states=["failed", "upstream_failed", "skipped"],
external_dag_id: str,
external_task_id: str = "end",
allowed_states: list = ["success"],
failed_states: list = ["failed", "upstream_failed", "skipped"],
lower_date_limit=None,
skip_manually_triggered=False,
skip_manually_triggered: bool = False,
**kwargs,
):
"""
Function to create an ExternalTaskSensor that waits for a task in another DAG to finish,
with proper handling of manual triggers without affecting downstream tasks.
Creates an ExternalTaskSensor that waits for a task in another DAG to finish,
with two key features:
1. Uses execution_date_fn to dynamically determine the last execution date of the external DAG.
2. Skips waiting for manually triggered DAG runs.
Args:
dag: The DAG where this task will be added.
external_dag_id (str): The ID of the external DAG.
external_task_id (str): Task ID within the external DAG to wait for.
allowed_states (list): List of states considered as successful.
failed_states (list): List of states considered as failed.
lower_date_limit (datetime, optional): Lower bound for execution date filtering.
skip_manually_triggered (bool, optional): If True, skips waiting when DAG is manually triggered.
**kwargs: Additional arguments for the Airflow task.
Returns:
ExternalTaskSensor or PythonOperator: The sensor for scheduled DAG runs or a PythonOperator that skips waiting for manual DAG runs.
"""

task_id = f"wait_for_{external_dag_id}_{external_task_id}"

if not skip_manually_triggered:
return ExternalTaskSensor(
task_id=task_id,
external_dag_id=external_dag_id,
external_task_id=external_task_id,
allowed_states=allowed_states,
failed_states=failed_states,
mode="reschedule",
poke_interval=60,
dag=dag,
# Function to compute the last execution date of the external DAG
def compute_execution_date_fn(logical_date, **context):
"""
Compute the execution date for the ExternalTaskSensor using Airflow's context.
"""
if logical_date is None:
raise ValueError("The 'logical_date' is missing in the context.")

# Compute lower date limit (defaults to start of the same day if None)
lower_limit = lower_date_limit or logical_date.replace(
hour=0, minute=0, second=0, microsecond=0
)

def handle_manual_trigger(**context):
dag_run = context.get("dag_run")
if dag_run and dag_run.run_id.startswith("manual__"):
# For manual triggers, just return success
return None
# Fetch last execution date of the external DAG before the current execution date
return get_last_execution_date(
external_dag_id,
upper_date_limit=logical_date,
lower_date_limit=lower_limit,
)

# For scheduled runs, use the sensor
sensor = ExternalTaskSensor(
# If manual triggers should be skipped, create a PythonOperator instead
if skip_manually_triggered:

def handle_manual_trigger(**context):
"""
Skips waiting for manually triggered DAG runs, but waits for scheduled DAGs.
"""
dag_run = context.get("dag_run")
if dag_run and dag_run.run_id.startswith("manual__"):
logging.info(f"Skipping wait for manual trigger: {dag_run.run_id}")
return None # Skip waiting

logging.info(
f"Waiting for external task {external_dag_id}.{external_task_id}"
)

# Create a sensor dynamically at runtime
sensor = ExternalTaskSensor(
task_id=task_id,
external_dag_id=external_dag_id,
external_task_id=external_task_id,
execution_date_fn=compute_execution_date_fn,
allowed_states=allowed_states,
failed_states=failed_states,
dag=None, # Important: don't attach to DAG since this is runtime execution
)
return sensor.poke(
context=context
) # Check external task status dynamically

return PythonOperator(
task_id=task_id,
external_dag_id=external_dag_id,
external_task_id=external_task_id,
allowed_states=allowed_states,
failed_states=failed_states,
mode="reschedule",
poke_interval=60,
dag=None, # Important: don't attach to DAG since this is runtime
python_callable=handle_manual_trigger,
dag=dag,
)
return sensor.poke(context=context)

return PythonOperator(
# If manual triggers are NOT skipped, return the normal ExternalTaskSensor
return ExternalTaskSensor(
task_id=task_id,
python_callable=handle_manual_trigger,
external_dag_id=external_dag_id,
external_task_id=external_task_id,
execution_date_fn=compute_execution_date_fn,
check_existence=True,
mode="reschedule",
allowed_states=allowed_states,
failed_states=failed_states,
dag=dag,
**kwargs,
)


Expand Down

0 comments on commit 4baa0b3

Please sign in to comment.