Skip to content

Commit

Permalink
Configure worker logging via the dask.distributed config, rather than…
Browse files Browse the repository at this point in the history
… manually running worker configuration in a task
  • Loading branch information
stuarteberg committed Dec 20, 2020
1 parent 56bf760 commit 3f9ba94
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 68 deletions.
27 changes: 0 additions & 27 deletions flyemflows/util/cluster_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import dask
from distributed import Client, LocalCluster

from neuclease import configure_default_logging
from neuclease.util import Timer

from . import extract_ip_from_link, construct_ganglia_link
Expand Down Expand Up @@ -195,32 +194,6 @@ def _init_dask(self):
if self.wait_for_workers and self.cluster_type == "lsf":
self._write_worker_graph_urls('graph-links.txt')

if self.wait_for_workers:
self._initialize_worker_logging()
else:
logger.warning("Not configuring worker logging!")

def _initialize_worker_logging(self):
"""
Configure the logging module on the worker nodes.
FIXME:
Ideally, this should be done via the distributed config,
using the "new style" logging configuration option.
(See distributed/config.py)
But I seem to recall some deficiency with that approach last
time I tried it, and I'm too lazy to try again right now.
"""
def _configure_worker_logging():
configure_default_logging()

# Levels copied from defaults in distributed/config.py
logging.getLogger('distributed.client').setLevel(logging.WARNING)
logging.getLogger('bokeh').setLevel(logging.ERROR)
logging.getLogger('tornado').setLevel(logging.CRITICAL)
logging.getLogger('tornado.application').setLevel(logging.ERROR)

run_on_each_worker(_configure_worker_logging, self.client, True, False)

def _write_driver_graph_urls(self):
"""
Expand Down
154 changes: 147 additions & 7 deletions flyemflows/util/dask_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"increase this setting accordingly.\n"
"Specified as a string with a suffix for units, e.g. 4GB\n",
"type": "string",
"default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default.
"default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default.
},
"mem": {
"description": "How much memory to reserve from LSF for each 'job'.\n"
Expand Down Expand Up @@ -129,7 +129,7 @@
"increase this setting accordingly.\n"
"Specified as a string with a suffix for units, e.g. 4GB\n",
"type": "string",
"default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default.
"default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default.
},
"processes": {
"description": "How many processes ('workers') per 'job'.\n"
Expand Down Expand Up @@ -188,7 +188,7 @@
"increase this setting accordingly.\n"
"Specified as a string with a suffix for units, e.g. 4GB\n",
"type": "string",
"default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default.
"default": "15GB" # On the Janelia cluster, each slot gets 15 GB by default.
},
"processes": {
"description": "How many processes ('workers') per 'job'.\n"
Expand Down Expand Up @@ -248,17 +248,158 @@
"additionalProperties": True,
"default": {},
"properties": {
"scheduler": {
"type": "object",
"default": {},
"properties": {
"preload": {
"description": "See https://docs.dask.org/en/latest/setup/custom-startup.html",
"type": "array",
"items": {"type": "string"},
"default": ["distributed.config"] # Make sure logging config is loaded.
},
"preload-argv": {
"type": "array",
"items": {"type": "string"},
"default": []
}
}
},
"worker": {
"type": "object",
"default": {},
"properties": {
"preload": {
"description": "See https://docs.dask.org/en/latest/setup/custom-startup.html",
"type": "array",
"items": {"type": "string"},
"default": ["distributed.config"] # Make sure logging config is loaded.
},
"preload-argv": {
"type": "array",
"items": {"type": "string"},
"default": []
}
}
},
"nanny": {
"type": "object",
"default": {},
"properties": {
"preload": {
"description": "See https://docs.dask.org/en/latest/setup/custom-startup.html",
"type": "array",
"items": {"type": "string"},
"default": ["distributed.config"] # Make sure logging config is loaded.
},
"preload-argv": {
"type": "array",
"items": {"type": "string"},
"default": []
}
}
},
"admin": {
"description": "dask.distributed 'admin' config section.",
"type": "object",
"additionalProperties": True,
"default": {},
"properties": {
'log-format': {
"description": "In the distributed.config code, this is referred to as part of the 'old-style'\n"
"logging configuration, but it seems to be used unconditionally within\n"
"the Worker (node.py), so I'm confused.",
'type': 'string',
'default': '[%(asctime)s] %(levelname)s %(message)s'
}
}
},
"logging": {
"description": "dask.distributed 'new-style' logging config just uses the standard Python configuration dictionary schema.\n"
"See distributed.config.initialize_logging(), and the Python docs:\n"
"https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema\n",
"type": "object",

# For readability, here's the default configuration we use all in one place.
# Each of these properties' schemas are also listed below, to enable default
# value injection in case the user supplies one or more custom entries.
"default": {
"version": 1,
"formatters": {
"timestamped": {
"format": "[%(asctime)s] %(levelname)s %(message)s"
}
},
"handlers": {
"console": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"formatter": "timestamped",
}
},
"root": {
"handlers": ["console"],
"level": "INFO"
},
"loggers": {
"distributed.client": {"level": "WARNING"},
"bokeh": {"level": "ERROR"},
"tornado": {"level": "CRITICAL"},
"tornado.application": {"level": "ERROR"},
}
},

# List each property's schema independently to ensure their default values are
# injected into the config, even if the user has supplied some of their
# own logging options.
"properties": {
"version": {
"type": "integer",
"default": 1
},
"formatters": {
"type": "object",
"default": {},
"properties": {
"timestamped": {
"default": {
"format": "[%(asctime)s] %(levelname)s %(message)s"
}
}
}
}
},
"handlers": {
"type": "object",
"default": {},
"properties": {
"console": {
"type": "object",
"default": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"formatter": "timestamped",
}
}
}
},
"root": {
"type": "object",
"default": {
"handlers": ['console'],
"level": "INFO"
}
},
"loggers": {
"type": "object",
"default": {
"distributed.client": {"level": "WARNING"},
"bokeh": {"level": "ERROR"},
"tornado": {"level": "CRITICAL"},
"tornado.application": {"level": "ERROR"},
}
}
}
}
}
Expand All @@ -271,8 +412,7 @@
"additionalProperties": True,
"default": {},
"properties": {
"distributed": DistributedSchema,
"jobqueue": JobQueueSchema
"jobqueue": JobQueueSchema,
"distributed": DistributedSchema
}
}

8 changes: 8 additions & 0 deletions flyemflows/util/dask_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import dask.config
import dask.dataframe
from dask.bag import Bag

import distributed.config
from distributed.utils import parse_bytes
from distributed.client import futures_of

Expand Down Expand Up @@ -121,6 +123,8 @@ def load_and_overwrite_dask_config(cluster_type, dask_config_path=None, overwrit
"""
Load dask config, inject defaults for (selected) missing entries,
and optionally overwrite in-place.
Note: Also re-initializes the distributed logging configuration.
"""
if dask_config_path is None and 'DASK_CONFIG' in os.environ:
dask_config_path = os.environ["DASK_CONFIG"]
Expand Down Expand Up @@ -158,6 +162,10 @@ def load_and_overwrite_dask_config(cluster_type, dask_config_path=None, overwrit
dask.config.paths.append(dask_config_path)
dask.config.refresh()

# Must be imported this way due to aliased name 'config' in distributed.__init__
from distributed.config import initialize_logging
initialize_logging(dask.config.config)


def run_on_each_worker(func, client=None, once_per_machine=False, return_hostnames=True):
"""
Expand Down
Loading

0 comments on commit 3f9ba94

Please sign in to comment.