From 6e777b016944d604963ebd3577000ca066954b85 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Wed, 10 Jul 2024 12:13:44 +0200 Subject: [PATCH] Improve airflow operator (#10) * Improve airflow OpenDbtExecutorOperator * Improve airflow OpenDbtExecutorOperator --- opendbt/airflow/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/opendbt/airflow/__init__.py b/opendbt/airflow/__init__.py index 73b5082..be2cbf5 100644 --- a/opendbt/airflow/__init__.py +++ b/opendbt/airflow/__init__.py @@ -21,6 +21,8 @@ def __init__(self, profiles_dir: Path = None, select: str = None, args: list = None, + # without using subprocess airflow randomly gets deadlock + use_subprocess: bool = True, execution_timeout=timedelta(minutes=60), **kwargs) -> None: super().__init__(execution_timeout=execution_timeout, **kwargs) @@ -28,6 +30,7 @@ def __init__(self, self.command = command self.profiles_dir: Path = profiles_dir self.target = target + self.use_subprocess = use_subprocess self.args = args if args else [] if select: @@ -46,7 +49,7 @@ def execute(self, context): runner = opendbt.OpenDbtProject(project_dir=self.project_dir, profiles_dir=self.profiles_dir, target=self.target) - runner.run(command=self.command, args=self.args, use_subprocess=True) + runner.run(command=self.command, args=self.args, use_subprocess=self.use_subprocess) class OpenDbtAirflowProject(opendbt.OpenDbtProject):