From 3f9ba94084d6eda5a782609ff07591a78ae7fe1d Mon Sep 17 00:00:00 2001 From: Stuart Berg Date: Sun, 20 Dec 2020 12:34:42 -0500 Subject: [PATCH] Configure worker logging via the dask.distributed config, rather than manually running worker configuration in a task --- flyemflows/util/cluster_context.py | 27 ----- flyemflows/util/dask_schema.py | 154 +++++++++++++++++++++++++-- flyemflows/util/dask_util.py | 8 ++ flyemflows/workflow/base/contexts.py | 58 +++++----- 4 files changed, 179 insertions(+), 68 deletions(-) diff --git a/flyemflows/util/cluster_context.py b/flyemflows/util/cluster_context.py index 3ce88037..f292c300 100644 --- a/flyemflows/util/cluster_context.py +++ b/flyemflows/util/cluster_context.py @@ -9,7 +9,6 @@ import dask from distributed import Client, LocalCluster -from neuclease import configure_default_logging from neuclease.util import Timer from . import extract_ip_from_link, construct_ganglia_link @@ -195,32 +194,6 @@ def _init_dask(self): if self.wait_for_workers and self.cluster_type == "lsf": self._write_worker_graph_urls('graph-links.txt') - if self.wait_for_workers: - self._initialize_worker_logging() - else: - logger.warning("Not configuring worker logging!") - - def _initialize_worker_logging(self): - """ - Configure the logging module on the worker nodes. - - FIXME: - Ideally, this should be done via the distributed config, - using the "new style" logging configuration option. - (See distributed/config.py) - But I seem to recall some deficiency with that approach last - time I tried it, and I'm too lazy to try again right now. - """ - def _configure_worker_logging(): - configure_default_logging() - - # Levels copied from defaults in distributed/config.py - logging.getLogger('distributed.client').setLevel(logging.WARNING) - logging.getLogger('bokeh').setLevel(logging.ERROR) - logging.getLogger('tornado').setLevel(logging.CRITICAL) - logging.getLogger('tornado.application').setLevel(logging.ERROR) - - run_on_each_worker(_configure_worker_logging, self.client, True, False) def _write_driver_graph_urls(self): """ diff --git a/flyemflows/util/dask_schema.py b/flyemflows/util/dask_schema.py index 0167b7a5..32b8029c 100644 --- a/flyemflows/util/dask_schema.py +++ b/flyemflows/util/dask_schema.py @@ -38,7 +38,7 @@ "increase this setting accordingly.\n" "Specified as a string with a suffix for units, e.g. 4GB\n", "type": "string", - "default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default. + "default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default. }, "mem": { "description": "How much memory to reserve from LSF for each 'job'.\n" @@ -129,7 +129,7 @@ "increase this setting accordingly.\n" "Specified as a string with a suffix for units, e.g. 4GB\n", "type": "string", - "default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default. + "default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default. }, "processes": { "description": "How many processes ('workers') per 'job'.\n" @@ -188,7 +188,7 @@ "increase this setting accordingly.\n" "Specified as a string with a suffix for units, e.g. 4GB\n", "type": "string", - "default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default. + "default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default. }, "processes": { "description": "How many processes ('workers') per 'job'.\n" @@ -248,17 +248,158 @@ "additionalProperties": True, "default": {}, "properties": { + "scheduler": { + "type": "object", + "default": {}, + "properties": { + "preload": { + "description": "See https://docs.dask.org/en/latest/setup/custom-startup.html", + "type": "array", + "items": {"type": "string"}, + "default": ["distributed.config"] # Make sure logging config is loaded. + }, + "preload-argv": { + "type": "array", + "items": {"type": "string"}, + "default": [] + } + } + }, + "worker": { + "type": "object", + "default": {}, + "properties": { + "preload": { + "description": "See https://docs.dask.org/en/latest/setup/custom-startup.html", + "type": "array", + "items": {"type": "string"}, + "default": ["distributed.config"] # Make sure logging config is loaded. + }, + "preload-argv": { + "type": "array", + "items": {"type": "string"}, + "default": [] + } + } + }, + "nanny": { + "type": "object", + "default": {}, + "properties": { + "preload": { + "description": "See https://docs.dask.org/en/latest/setup/custom-startup.html", + "type": "array", + "items": {"type": "string"}, + "default": ["distributed.config"] # Make sure logging config is loaded. + }, + "preload-argv": { + "type": "array", + "items": {"type": "string"}, + "default": [] + } + } + }, "admin": { "description": "dask.distributed 'admin' config section.", - "type": "object", "additionalProperties": True, "default": {}, "properties": { 'log-format': { + "description": "In the distributed.config code, this is referred to as part of the 'old-style'\n" + "logging configuration, but it seems to be used unconditionally within\n" + "the Worker (node.py), so I'm confused.", 'type': 'string', 'default': '[%(asctime)s] %(levelname)s %(message)s' } } + }, + "logging": { + "description": "dask.distributed 'new-style' logging config just uses the standard Python configuration dictionary schema.\n" + "See distributed.config.initialize_logging(), and the Python docs:\n" + "https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema\n", + "type": "object", + + # For readability, here's the default configuration we use all in one place. + # Each of these properties' schemas are also listed below, to enable default + # value injection in case the user supplies one or more custom entries. + "default": { + "version": 1, + "formatters": { + "timestamped": { + "format": "[%(asctime)s] %(levelname)s %(message)s" + } + }, + "handlers": { + "console": { + "level": "DEBUG", + "class": "logging.StreamHandler", + "stream": "ext://sys.stdout", + "formatter": "timestamped", + } + }, + "root": { + "handlers": ["console"], + "level": "INFO" + }, + "loggers": { + "distributed.client": {"level": "WARNING"}, + "bokeh": {"level": "ERROR"}, + "tornado": {"level": "CRITICAL"}, + "tornado.application": {"level": "ERROR"}, + } + }, + + # List each property's schema independently to ensure their default values are + # injected into the config, even if the user has supplied some of their + # own logging options. + "properties": { + "version": { + "type": "integer", + "default": 1 + }, + "formatters": { + "type": "object", + "default": {}, + "properties": { + "timestamped": { + "default": { + "format": "[%(asctime)s] %(levelname)s %(message)s" + } + } + } + } + }, + "handlers": { + "type": "object", + "default": {}, + "properties": { + "console": { + "type": "object", + "default": { + "level": "DEBUG", + "class": "logging.StreamHandler", + "stream": "ext://sys.stdout", + "formatter": "timestamped", + } + } + } + }, + "root": { + "type": "object", + "default": { + "handlers": ['console'], + "level": "INFO" + } + }, + "loggers": { + "type": "object", + "default": { + "distributed.client": {"level": "WARNING"}, + "bokeh": {"level": "ERROR"}, + "tornado": {"level": "CRITICAL"}, + "tornado.application": {"level": "ERROR"}, + } + } } } } @@ -271,8 +412,7 @@ "additionalProperties": True, "default": {}, "properties": { - "distributed": DistributedSchema, - "jobqueue": JobQueueSchema + "jobqueue": JobQueueSchema, + "distributed": DistributedSchema } } - diff --git a/flyemflows/util/dask_util.py b/flyemflows/util/dask_util.py index 3e35f521..d8c1ef15 100644 --- a/flyemflows/util/dask_util.py +++ b/flyemflows/util/dask_util.py @@ -12,6 +12,8 @@ import dask.config import dask.dataframe from dask.bag import Bag + +import distributed.config from distributed.utils import parse_bytes from distributed.client import futures_of @@ -121,6 +123,8 @@ def load_and_overwrite_dask_config(cluster_type, dask_config_path=None, overwrit """ Load dask config, inject defaults for (selected) missing entries, and optionally overwrite in-place. + + Note: Also re-initializes the distributed logging configuration. """ if dask_config_path is None and 'DASK_CONFIG' in os.environ: dask_config_path = os.environ["DASK_CONFIG"] @@ -158,6 +162,10 @@ def load_and_overwrite_dask_config(cluster_type, dask_config_path=None, overwrit dask.config.paths.append(dask_config_path) dask.config.refresh() + # Must be imported this way due to aliased name 'config' in distributed.__init__ + from distributed.config import initialize_logging + initialize_logging(dask.config.config) + def run_on_each_worker(func, client=None, once_per_machine=False, return_hostnames=True): """ diff --git a/flyemflows/workflow/base/contexts.py b/flyemflows/workflow/base/contexts.py index c64d8233..b73ccdc6 100644 --- a/flyemflows/workflow/base/contexts.py +++ b/flyemflows/workflow/base/contexts.py @@ -24,18 +24,13 @@ from contextlib import contextmanager from os.path import splitext, basename -import dask -from distributed import Client, LocalCluster, get_worker +from distributed import get_worker from confiddler import validate -from neuclease import configure_default_logging -from neuclease.util import Timer import confiddler.json as json -from ...util import get_localhost_ip_address, kill_if_running, extract_ip_from_link, construct_ganglia_link -from ...util.lsf import construct_rtm_url, get_job_submit_time -from ...util.dask_util import update_jobqueue_config_with_defaults, dump_dask_config, DebugClient -from .base_schema import JOBQUEUE_CLUSTERS, ResourceManagerSchema +from ...util import get_localhost_ip_address, kill_if_running +from .base_schema import ResourceManagerSchema logger = logging.getLogger(__name__) USER = getpass.getuser() @@ -43,16 +38,17 @@ # driver_ip_addr = '127.0.0.1' driver_ip_addr = get_localhost_ip_address() + @contextmanager def environment_context(update_dict, workflow=None): """ Context manager. Update the environment variables specified in the given dict when the context is entered, and restore the old environment when the context exits. - + If workflow is given, the environment all of the workflow's cluster's workers will be updated, too, but their environment won't be cleaned up. - + Note: You can modify these or other environment variables while the context is active, those modifications will be lost when this context manager exits. @@ -70,27 +66,27 @@ def update_env(): os.environ.clear() os.environ.update(old_env) - + class LocalResourceManager: """ Context manager. - + Based on a workflow's 'resource-manager' config section, launch a dvid_resource_manager process on the local machine, and shut it down upon exit. - + If the 'server' section is not 'driver', then this context manager does nothing. - + Note: If a resource manager is started, the 'server' configuration setting will be overwritten with the local IP address. - + Usage: - + with LocalResourceManager(workflow.config['resource-manager']): # ...workflow executes here... """ - + def __init__(self, resource_manager_config): validate(resource_manager_config, ResourceManagerSchema, inject_defaults=True) self.resource_manager_config = resource_manager_config @@ -157,7 +153,6 @@ def __enter__(self): self.resource_server_process = subprocess.Popen(cmd, stderr=subprocess.STDOUT, shell=True) return self.resource_server_process - def __exit__(self, *args): if self.resource_server_process is None: return @@ -168,7 +163,7 @@ def __exit__(self, *args): self.resource_server_process.wait(10.0) except subprocess.TimeoutExpired: kill_if_running(self.resource_server_process.pid, 10.0) - + sys.stderr.flush() @@ -177,14 +172,13 @@ class WorkerDaemons: Context manager. Runs an 'initialization script' or background process on every worker (like a daemon, but we'll clean it up when the workflow exits). - + See the documentation in the 'worker-initialization' schema for details. """ def __init__(self, workflow_instance): self.workflow = workflow_instance self.worker_init_pids = {} self.driver_init_pid = None - def __enter__(self): """ @@ -203,7 +197,7 @@ def __enter__(self): os.makedirs(init_options["log-dir"], exist_ok=True) launch_delay = init_options["launch-delay"] - + def launch_init_script(): script_name = splitext(basename(init_options["script-path"]))[0] log_dir = init_options["log-dir"] @@ -231,7 +225,7 @@ def launch_init_script(): return None return p.pid - + worker_init_pids = self.workflow.run_on_each_worker( launch_init_script, init_options["only-once-per-machine"], return_hostnames=False) @@ -241,7 +235,7 @@ def launch_init_script(): if self.workflow.config["cluster-type"] not in ("lsf", "sge"): warnings.warn("Warning: You are using a local-cluster, yet your worker initialization specified 'also-run-on-driver'.") driver_init_pid = launch_init_script() - + if launch_delay > 0: logger.info(f"Pausing after launching worker initialization scripts ({launch_delay} seconds).") time.sleep(launch_delay) @@ -249,23 +243,23 @@ def launch_init_script(): self.worker_init_pids = worker_init_pids self.driver_init_pid = driver_init_pid - def __exit__(self, *args): """ Kill any initialization processes (as launched from _run_worker_initializations) that might still running on the workers and/or the driver. - + If they don't respond to SIGTERM, they'll be force-killed with SIGKILL after 10 seconds. """ launch_delay = self.workflow.config["worker-initialization"]["launch-delay"] once_per_machine = self.workflow.config["worker-initialization"]["only-once-per-machine"] - + if launch_delay == -1: # Nothing to do: # We already waited for the the init scripts to complete. return - + worker_init_pids = self.worker_init_pids + def kill_init_proc(): try: worker_addr = get_worker().address @@ -273,15 +267,14 @@ def kill_init_proc(): # Special case for synchronous cluster. # See run_on_each_worker worker_addr = 'tcp://127.0.0.1' - + try: pid_to_kill = worker_init_pids[worker_addr] except KeyError: return None else: return kill_if_running(pid_to_kill, 10.0) - - + if any(self.worker_init_pids.values()): worker_statuses = self.workflow.run_on_each_worker(kill_init_proc, once_per_machine, True) for k,_v in filter(lambda k_v: k_v[1] is None, worker_statuses.items()): @@ -290,7 +283,6 @@ def kill_init_proc(): logger.info(f"Worker {k}: initialization script had to be forcefully killed.") else: logger.info("No worker init processes to kill") - if self.driver_init_pid: kill_if_running(self.driver_init_pid, 10.0) @@ -298,5 +290,3 @@ def kill_init_proc(): logger.info("No driver init process to kill") sys.stderr.flush() - -