Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] airflow-side retries test behavior #26711

Open
wants to merge 1 commit into
base: dpeng817/run_retries
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from dagster_airlift.core.serialization.serialized_data import DagInfo, TaskInfo

TERMINAL_STATES = {"success", "failed", "skipped", "up_for_retry", "up_for_reschedule"}
TERMINAL_STATES = {"success", "failed", "canceled"}
# This limits the number of task ids that we attempt to query from airflow's task instance rest API at a given time.
# Airflow's batch task instance retrieval rest API doesn't have a limit parameter, but we query a single run at a time, meaning we should be getting
# a single task instance per task id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from tempfile import TemporaryDirectory
from typing import Any, Callable, Generator, List, Optional

import psutil
import pytest
import requests
from dagster._core.test_utils import environ
Expand Down Expand Up @@ -100,8 +101,9 @@ def stand_up_airflow(
assert airflow_ready, "Airflow did not start within 30 seconds..."
yield process
finally:
# Kill process group, since process.kill and process.terminate do not work.
os.killpg(process.pid, signal.SIGKILL)
if psutil.Process(pid=process.pid).is_running():
# Kill process group, since process.kill and process.terminate do not work.
os.killpg(process.pid, signal.SIGKILL)


@pytest.fixture(name="airflow_instance")
Expand Down Expand Up @@ -179,7 +181,9 @@ def stand_up_dagster(
assert dagster_ready, "Dagster did not start within 30 seconds..."
yield process
finally:
os.killpg(process.pid, signal.SIGKILL)
if psutil.Process(pid=process.pid).is_running():
# Kill process group, since process.kill and process.terminate do not work.
os.killpg(process.pid, signal.SIGKILL)


####################################################################################################
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator


def succeeds_on_retry(**context) -> None:
if context["task_instance"].try_number == 1:
raise Exception("Failing on first try")
else:
print("Succeeding on retry") # noqa: T201


default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"retries": 2,
}


with DAG(
"unmapped__dag_with_retries",
default_args=default_args,
schedule_interval=None,
is_paused_upon_creation=False,
) as dag:
PythonOperator(
task_id="print_task", python_callable=succeeds_on_retry, retry_delay=timedelta(seconds=1)
)
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from datetime import timedelta

from dagster import Definitions, asset, define_asset_job
from dagster import Definitions, asset, define_asset_job, multi_asset
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._time import get_current_datetime_midnight
from dagster_airlift.core import (
assets_with_dag_mappings,
assets_with_task_mappings,
build_defs_from_airflow_instance,
dag_defs,
load_airflow_dag_asset_specs,
task_defs,
)
from dagster_airlift.core.multiple_tasks import targeted_by_multiple_tasks
Expand Down Expand Up @@ -76,6 +78,7 @@ def migrated_daily_interval_dag__partitioned() -> None:
def build_mapped_defs() -> Definitions:
return build_defs_from_airflow_instance(
airflow_instance=local_airflow_instance(),
dag_selector_fn=lambda dag: not dag.dag_id.startswith("unmapped"),
defs=Definitions.merge(
dag_defs(
"print_dag",
Expand Down Expand Up @@ -164,4 +167,22 @@ def build_mapped_defs() -> Definitions:
)


defs = build_mapped_defs()
unmapped_specs = load_airflow_dag_asset_specs(
airflow_instance=local_airflow_instance(),
dag_selector_fn=lambda dag: dag.dag_id.startswith("unmapped"),
)


@multi_asset(specs=unmapped_specs)
def materialize_dags(context: AssetExecutionContext):
for spec in unmapped_specs:
af_instance = local_airflow_instance()
dag_id = spec.metadata["Dag ID"]
dag_run_id = af_instance.trigger_dag(dag_id=dag_id)
af_instance.wait_for_run_completion(dag_id=dag_id, run_id=dag_run_id)
state = af_instance.get_run_state(dag_id=dag_id, run_id=dag_run_id)
if state != "success":
raise Exception(f"Failed to materialize {dag_id} with state {state}")


defs = Definitions.merge(build_mapped_defs(), Definitions([materialize_dags]))
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import pytest
from dagster import AssetKey, DagsterInstance, DagsterRunStatus
from dagster._core.definitions import materialize
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.metadata.metadata_value import JsonMetadataValue
from dagster._core.events.log import EventLogEntry
from dagster._core.storage.dagster_run import RunsFilter
Expand Down Expand Up @@ -276,3 +278,22 @@ def test_failure_when_asset_failures_tag_set(
runs = dg_instance.get_runs(filters=RunsFilter(tags={DAG_RUN_ID_TAG_KEY: run_id}))
assert len(runs) == 1
assert next(iter(runs)).status == DagsterRunStatus.FAILURE


def test_respect_airflow_retries(
airflow_instance: None,
dagster_dev: None,
dagster_home: str,
) -> None:
"""Airflow doesn't actually have dag-level retries; only task-level retries. This test just ensures that we handle task-level retries gracefully."""
from kitchen_sink.dagster_defs.mapped_defs import materialize_dags

af_instance = local_airflow_instance()

dagster_instance = DagsterInstance.get()
result = materialize(
assets=[materialize_dags],
instance=dagster_instance,
selection=AssetSelection.keys([af_instance.name, "dag", "unmapped__dag_with_retries"]),
)
assert result.success