diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 7fc00b96c7c9c..61e8abfdaa055 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -89,7 +89,7 @@ def error(self, message): class Arg: """Class to keep information about command line argument""" - # pylint: disable=redefined-builtin,unused-argument + # pylint: disable=redefined-builtin,unused-argument,too-many-arguments def __init__( self, flags=_UNSET, @@ -101,6 +101,7 @@ def __init__( choices=_UNSET, required=_UNSET, metavar=_UNSET, + dest=_UNSET, ): self.flags = flags self.kwargs = {} @@ -112,7 +113,7 @@ def __init__( self.kwargs[k] = v - # pylint: enable=redefined-builtin,unused-argument + # pylint: enable=redefined-builtin,unused-argument,too-many-arguments def add_to_parser(self, parser: argparse.ArgumentParser): """Add this argument to an ArgumentParser""" @@ -308,6 +309,16 @@ def positive_int(value): # list_tasks ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true") +# tasks_run +# This is a hidden option -- not meant for users to set or know about +ARG_SHUT_DOWN_LOGGING = Arg( + ("--no-shut-down-logging",), + help=argparse.SUPPRESS, + dest="shut_down_logging", + action="store_false", + default=True, +) + # clear ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", action="store_true") ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", action="store_true") @@ -943,6 +954,7 @@ class GroupCommand(NamedTuple): ARG_PICKLE, ARG_JOB_ID, ARG_INTERACTIVE, + ARG_SHUT_DOWN_LOGGING, ), ), ActionCommand( diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 356f60cfe5950..b153d20b59dd9 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -116,7 +116,12 @@ def _run_task_by_local_task_job(args, ti): ignore_ti_state=args.force, pool=args.pool, ) - run_job.run() + try: + run_job.run() + + finally: + if args.shut_down_logging: + logging.shutdown() RAW_TASK_UNSUPPORTED_OPTION = [ diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index f4f3412904c13..ad5c76e5bf538 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -106,6 +106,7 @@ def _execute_in_fork(command_to_exec: CommandType) -> None: parser = get_parser() # [1:] - remove "airflow" from the start of the command args = parser.parse_args(command_to_exec[1:]) + args.shut_down_logging = False setproctitle(f"airflow task supervisor: {command_to_exec}") @@ -116,6 +117,7 @@ def _execute_in_fork(command_to_exec: CommandType) -> None: ret = 1 finally: Sentry.flush() + logging.shutdown() os._exit(ret) # pylint: disable=protected-access diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 4057b1423dcdf..7fcba8a6af425 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -22,6 +22,7 @@ For more information on how the LocalExecutor works, take a look at the guide: :ref:`executor:LocalExecutor` """ +import logging import os import subprocess from abc import abstractmethod @@ -115,6 +116,7 @@ def _execute_work_in_fork(self, command: CommandType) -> str: parser = get_parser() # [1:] - remove "airflow" from the start of the command args = parser.parse_args(command[1:]) + args.shut_down_logging = False setproctitle(f"airflow task supervisor: {command}") @@ -125,6 +127,7 @@ def _execute_work_in_fork(self, command: CommandType) -> str: self.log.error("Failed to execute task %s.", str(e)) finally: Sentry.flush() + logging.shutdown() os._exit(ret) # pylint: disable=protected-access raise RuntimeError('unreachable -- keep mypy happy') diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 61ea43a0f73e6..fee9b0d10de64 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. """Standard task runner""" +import logging import os import psutil @@ -87,6 +88,7 @@ def _start_by_fork(self): # pylint: disable=inconsistent-return-statements finally: # Explicitly flush any pending exception to Sentry if enabled Sentry.flush() + logging.shutdown() os._exit(return_code) # pylint: disable=protected-access def return_code(self, timeout=0):