From ab5f770bfcd8c690cbe4d0825896325aca0beeca Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Mon, 14 Dec 2020 16:28:01 +0000 Subject: [PATCH] Explicitly shutdown logging in tasks so concurrent.futures can be used (#13057) This fixes three problems: 1. That remote logs weren't being uploaded due to the fork change 2. That the S3 hook attempted to fetch credentials from the DB, but the ORM had already been disposed. 3. That even if forking was disabled, that S3 logs would fail due to use of concurrent.futures. See https://bugs.python.org/issue33097 --- airflow/cli/cli_parser.py | 16 ++++++++++++++-- airflow/cli/commands/task_command.py | 7 ++++++- airflow/executors/celery_executor.py | 2 ++ airflow/executors/local_executor.py | 3 +++ airflow/task/task_runner/standard_task_runner.py | 2 ++ 5 files changed, 27 insertions(+), 3 deletions(-) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 7fc00b96c7c9c..61e8abfdaa055 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -89,7 +89,7 @@ def error(self, message): class Arg: """Class to keep information about command line argument""" - # pylint: disable=redefined-builtin,unused-argument + # pylint: disable=redefined-builtin,unused-argument,too-many-arguments def __init__( self, flags=_UNSET, @@ -101,6 +101,7 @@ def __init__( choices=_UNSET, required=_UNSET, metavar=_UNSET, + dest=_UNSET, ): self.flags = flags self.kwargs = {} @@ -112,7 +113,7 @@ def __init__( self.kwargs[k] = v - # pylint: enable=redefined-builtin,unused-argument + # pylint: enable=redefined-builtin,unused-argument,too-many-arguments def add_to_parser(self, parser: argparse.ArgumentParser): """Add this argument to an ArgumentParser""" @@ -308,6 +309,16 @@ def positive_int(value): # list_tasks ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true") +# tasks_run +# This is a hidden option -- not meant for users to set or know about +ARG_SHUT_DOWN_LOGGING = Arg( + ("--no-shut-down-logging",), + help=argparse.SUPPRESS, + dest="shut_down_logging", + action="store_false", + default=True, +) + # clear ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", action="store_true") ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", action="store_true") @@ -943,6 +954,7 @@ class GroupCommand(NamedTuple): ARG_PICKLE, ARG_JOB_ID, ARG_INTERACTIVE, + ARG_SHUT_DOWN_LOGGING, ), ), ActionCommand( diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 356f60cfe5950..b153d20b59dd9 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -116,7 +116,12 @@ def _run_task_by_local_task_job(args, ti): ignore_ti_state=args.force, pool=args.pool, ) - run_job.run() + try: + run_job.run() + + finally: + if args.shut_down_logging: + logging.shutdown() RAW_TASK_UNSUPPORTED_OPTION = [ diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index f4f3412904c13..ad5c76e5bf538 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -106,6 +106,7 @@ def _execute_in_fork(command_to_exec: CommandType) -> None: parser = get_parser() # [1:] - remove "airflow" from the start of the command args = parser.parse_args(command_to_exec[1:]) + args.shut_down_logging = False setproctitle(f"airflow task supervisor: {command_to_exec}") @@ -116,6 +117,7 @@ def _execute_in_fork(command_to_exec: CommandType) -> None: ret = 1 finally: Sentry.flush() + logging.shutdown() os._exit(ret) # pylint: disable=protected-access diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 4057b1423dcdf..7fcba8a6af425 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -22,6 +22,7 @@ For more information on how the LocalExecutor works, take a look at the guide: :ref:`executor:LocalExecutor` """ +import logging import os import subprocess from abc import abstractmethod @@ -115,6 +116,7 @@ def _execute_work_in_fork(self, command: CommandType) -> str: parser = get_parser() # [1:] - remove "airflow" from the start of the command args = parser.parse_args(command[1:]) + args.shut_down_logging = False setproctitle(f"airflow task supervisor: {command}") @@ -125,6 +127,7 @@ def _execute_work_in_fork(self, command: CommandType) -> str: self.log.error("Failed to execute task %s.", str(e)) finally: Sentry.flush() + logging.shutdown() os._exit(ret) # pylint: disable=protected-access raise RuntimeError('unreachable -- keep mypy happy') diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 61ea43a0f73e6..fee9b0d10de64 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. """Standard task runner""" +import logging import os import psutil @@ -87,6 +88,7 @@ def _start_by_fork(self): # pylint: disable=inconsistent-return-statements finally: # Explicitly flush any pending exception to Sentry if enabled Sentry.flush() + logging.shutdown() os._exit(return_code) # pylint: disable=protected-access def return_code(self, timeout=0):