Skip to content

Commit

Permalink
Explicitly shutdown logging in tasks so concurrent.futures can be used (
Browse files Browse the repository at this point in the history
#13057)

This fixes three problems:

1. That remote logs weren't being uploaded due to the fork change
2. That the S3 hook attempted to fetch credentials from the DB, but the
   ORM had already been disposed.
3. That even if forking was disabled, that S3 logs would fail due to use
   of concurrent.futures. See https://bugs.python.org/issue33097
  • Loading branch information
ashb authored Dec 14, 2020
1 parent 6bf9acb commit ab5f770
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 3 deletions.
16 changes: 14 additions & 2 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def error(self, message):
class Arg:
"""Class to keep information about command line argument"""

# pylint: disable=redefined-builtin,unused-argument
# pylint: disable=redefined-builtin,unused-argument,too-many-arguments
def __init__(
self,
flags=_UNSET,
Expand All @@ -101,6 +101,7 @@ def __init__(
choices=_UNSET,
required=_UNSET,
metavar=_UNSET,
dest=_UNSET,
):
self.flags = flags
self.kwargs = {}
Expand All @@ -112,7 +113,7 @@ def __init__(

self.kwargs[k] = v

# pylint: enable=redefined-builtin,unused-argument
# pylint: enable=redefined-builtin,unused-argument,too-many-arguments

def add_to_parser(self, parser: argparse.ArgumentParser):
"""Add this argument to an ArgumentParser"""
Expand Down Expand Up @@ -308,6 +309,16 @@ def positive_int(value):
# list_tasks
ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true")

# tasks_run
# This is a hidden option -- not meant for users to set or know about
ARG_SHUT_DOWN_LOGGING = Arg(
("--no-shut-down-logging",),
help=argparse.SUPPRESS,
dest="shut_down_logging",
action="store_false",
default=True,
)

# clear
ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", action="store_true")
ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", action="store_true")
Expand Down Expand Up @@ -943,6 +954,7 @@ class GroupCommand(NamedTuple):
ARG_PICKLE,
ARG_JOB_ID,
ARG_INTERACTIVE,
ARG_SHUT_DOWN_LOGGING,
),
),
ActionCommand(
Expand Down
7 changes: 6 additions & 1 deletion airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ def _run_task_by_local_task_job(args, ti):
ignore_ti_state=args.force,
pool=args.pool,
)
run_job.run()
try:
run_job.run()

finally:
if args.shut_down_logging:
logging.shutdown()


RAW_TASK_UNSUPPORTED_OPTION = [
Expand Down
2 changes: 2 additions & 0 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def _execute_in_fork(command_to_exec: CommandType) -> None:
parser = get_parser()
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(command_to_exec[1:])
args.shut_down_logging = False

setproctitle(f"airflow task supervisor: {command_to_exec}")

Expand All @@ -116,6 +117,7 @@ def _execute_in_fork(command_to_exec: CommandType) -> None:
ret = 1
finally:
Sentry.flush()
logging.shutdown()
os._exit(ret) # pylint: disable=protected-access


Expand Down
3 changes: 3 additions & 0 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
For more information on how the LocalExecutor works, take a look at the guide:
:ref:`executor:LocalExecutor`
"""
import logging
import os
import subprocess
from abc import abstractmethod
Expand Down Expand Up @@ -115,6 +116,7 @@ def _execute_work_in_fork(self, command: CommandType) -> str:
parser = get_parser()
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(command[1:])
args.shut_down_logging = False

setproctitle(f"airflow task supervisor: {command}")

Expand All @@ -125,6 +127,7 @@ def _execute_work_in_fork(self, command: CommandType) -> str:
self.log.error("Failed to execute task %s.", str(e))
finally:
Sentry.flush()
logging.shutdown()
os._exit(ret) # pylint: disable=protected-access
raise RuntimeError('unreachable -- keep mypy happy')

Expand Down
2 changes: 2 additions & 0 deletions airflow/task/task_runner/standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""Standard task runner"""
import logging
import os

import psutil
Expand Down Expand Up @@ -87,6 +88,7 @@ def _start_by_fork(self): # pylint: disable=inconsistent-return-statements
finally:
# Explicitly flush any pending exception to Sentry if enabled
Sentry.flush()
logging.shutdown()
os._exit(return_code) # pylint: disable=protected-access

def return_code(self, timeout=0):
Expand Down

0 comments on commit ab5f770

Please sign in to comment.