From c60c8373f43f3bfdefdf068b9a9658d6dc321479 Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Thu, 19 May 2022 17:00:06 +0200 Subject: [PATCH] ENH: Replace `print` calls with `Logger.log` --- noodles/display/dumb_term.py | 25 +++++++++++++++---------- noodles/display/pretty.py | 9 +++++++-- noodles/display/simple.py | 29 +++++++++++++++++------------ noodles/display/simple_nc.py | 27 +++++++++++++++------------ noodles/pilot_job.py | 25 +++++++++++++++---------- noodles/prov/sqlite.py | 7 +++++-- noodles/run/config.py | 5 ++++- noodles/run/job_keeper.py | 10 ++++++---- noodles/run/logging.py | 22 ++++++++++++++++++++++ noodles/run/process.py | 5 ++++- noodles/run/scheduler.py | 29 +++++++++++++++-------------- noodles/run/worker.py | 6 ++++-- noodles/run/xenon/dynamic_pool.py | 9 ++++++--- noodles/tutorial.py | 5 ++++- 14 files changed, 139 insertions(+), 74 deletions(-) diff --git a/noodles/display/dumb_term.py b/noodles/display/dumb_term.py index c055079..6d9e02b 100644 --- a/noodles/display/dumb_term.py +++ b/noodles/display/dumb_term.py @@ -1,8 +1,12 @@ +import logging + from .pretty_term import OutStream from ..workflow import FunctionNode from inspect import Parameter import sys +logger = logging.getLogger("noodles") + def _format_arg_list(a, v): if len(a) == 0: @@ -32,9 +36,8 @@ def __init__(self, error_filter=None): def print_message(self, key, msg): if key in self.jobs: - print("{1:12} | {2}".format( - key, '['+msg.upper()+']', self.jobs[key]['name']), - file=sys.stderr) + logger.info("{1:12} | {2}".format( + key, '['+msg.upper()+']', self.jobs[key]['name'])) def add_job(self, key, name): self.jobs[key] = {'name': name} @@ -50,21 +53,20 @@ def report(self): self.out << "[ERROR!]\n\n" for job, e in self.errors: - msg = 'ERROR ' if 'display' in job.hints: - msg += job.hints['display'].format( + msg = job.hints['display'].format( **job.bound_args.arguments) else: - msg += 'calling {} with {}'.format( + msg = 'calling {} with {}'.format( job.foo.__name__, dict(job.bound_args.arguments) ) - print(msg) + logger.error(msg) err_msg = self.error_filter(e) if err_msg: - print(err_msg) + logger.error(err_msg) else: - print(e) + logger.error(e) def __call__(self, msg): key, status, data, err = msg @@ -96,7 +98,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.out << "\n" << "User interrupt detected, abnormal exit.\n" return True - print("Internal error encountered. Contact the developers.") + logger.critical( + "Internal error encountered. Contact the developers.", + exc_info=exc_val, + ) return False self.report() diff --git a/noodles/display/pretty.py b/noodles/display/pretty.py index 3b4fd2a..9f063f7 100644 --- a/noodles/display/pretty.py +++ b/noodles/display/pretty.py @@ -1,4 +1,7 @@ # from .termapp import TermApp +import logging + +logger = logging.getLogger("noodles") class Display: @@ -26,8 +29,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is SystemExit: return False - print("Internal error encountered. Contact the developers: \n", - exc_type, exc_val) + logger.critical( + "Internal error encountered. Contact the developers", + exc_info=exc_val, + ) return False self.report() diff --git a/noodles/display/simple.py b/noodles/display/simple.py index d36d2a8..ade5a53 100644 --- a/noodles/display/simple.py +++ b/noodles/display/simple.py @@ -1,6 +1,10 @@ from .pretty_term import OutStream import sys +import logging + +logger = logging.getLogger("noodles") + class Display: """A modest display to track jobs being run. The message being printed @@ -62,37 +66,35 @@ def report(self): self.out << "There were warnings: \n\n" for job, w in self.messages: - msg = 'WARNING ' if job.hints and 'display' in job.hints: - msg += job.hints['display'].format( + msg = job.hints['display'].format( **job.bound_args.arguments) else: - msg += 'calling {} with {}'.format( + msg = 'calling {} with {}'.format( job.foo.__name__, dict(job.bound_args.arguments) ) - print(msg) - print(w) + logger.warning(msg) + logger.warning(w) else: self.out << "╰─(" << ['fg', 240, 100, 60] << "ERROR!" \ << ['reset'] << ")\n\n" for job, e in self.errors: - msg = 'ERROR ' if job.hints and 'display' in job.hints: - msg += job.hints['display'].format( + msg = job.hints['display'].format( **job.bound_args.arguments) else: - msg += 'calling {} with {}'.format( + msg = 'calling {} with {}'.format( job.foo.__name__, dict(job.bound_args.arguments) ) - print(msg) + logger.error(msg) err_msg = self.error_filter(e) if err_msg: - print(err_msg) + logger.error(err_msg) else: - print(e) + logger.error(e) def __call__(self, q): self.q = q @@ -112,7 +114,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): << ['reset'] return True - print("Internal error encountered. Contact the developers.") + logger.critical( + "Internal error encountered. Contact the developers.", + exc_info=exc_val, + ) return False self.report() diff --git a/noodles/display/simple_nc.py b/noodles/display/simple_nc.py index f5f798b..1ce48ad 100644 --- a/noodles/display/simple_nc.py +++ b/noodles/display/simple_nc.py @@ -1,5 +1,8 @@ from .pretty_term import OutStream import sys +import logging + +logger = logging.getLogger("noodles") class Display: @@ -115,16 +118,15 @@ def report(self): self.out << "There were warnings: \n\n" for job, w in self.messages: - msg = 'WARNING ' if job.hints and 'display' in job.hints: - msg += job.hints['display'].format( + msg = job.hints['display'].format( **job.bound_args.arguments) else: - msg += 'calling {} with {}'.format( + msg = 'calling {} with {}'.format( job.foo.__name__, dict(job.bound_args.arguments) ) - print(msg) - print(w) + logger.warning(msg) + logger.warning(w) else: self.out << self.chars['line-bl'] << self.chars['line-hor'] \ @@ -132,17 +134,16 @@ def report(self): << ['reset'] << ")\n\n" for job, e in self.errors: - msg = 'ERROR ' if job.hints and 'display' in job.hints: - msg += job.hints['display'].format( + msg = job.hints['display'].format( **job.bound_args.arguments) else: - msg += 'calling {} with {}'.format( + msg = 'calling {} with {}'.format( job.foo.__name__, dict(job.bound_args.arguments) ) - print(msg) - print(e) + logger.error(msg) + logger.error(e) def __call__(self, msg): key, status, data, err_msg = msg @@ -164,8 +165,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is SystemExit: return False - print("Internal error encountered. Contact the developers: \n", - exc_type, exc_val) + logger.critical( + f"Internal error encountered. Contact the developers.", + exc_info=exc_val, + ) return False self.report() diff --git a/noodles/pilot_job.py b/noodles/pilot_job.py index 62a2577..3b7a18a 100644 --- a/noodles/pilot_job.py +++ b/noodles/pilot_job.py @@ -14,6 +14,7 @@ > python3 -m noodles.worker -online [-use ] """ +import logging import argparse import sys import uuid @@ -34,6 +35,8 @@ from .run.remote.io import (JSONObjectReader, JSONObjectWriter) +logger = logging.getLogger("noodles") + def run_online_mode(args): """Run jobs. @@ -46,9 +49,9 @@ def run_online_mode(args): Messages can be encoded as either JSON or MessagePack. """ - print("\033[47;30m Netherlands\033[48;2;0;174;239;37m▌" - "\033[38;2;255;255;255me\u20d2Science\u20d2\033[37m▐" - "\033[47;30mcenter \033[m Noodles worker", file=sys.stderr) + logger.info("\033[47;30m Netherlands\033[48;2;0;174;239;37m▌" + "\033[38;2;255;255;255me\u20d2Science\u20d2\033[37m▐" + "\033[47;30mcenter \033[m Noodles worker") if args.n == 1: registry = look_up(args.registry)() @@ -72,7 +75,7 @@ def run_online_mode(args): if isinstance(msg, JobMessage): key, job = msg elif msg is EndOfWork: - print("received EndOfWork, bye", file=sys.stderr) + logger.info("received EndOfWork, bye") sys.exit(0) elif isinstance(msg, tuple): key, job = msg @@ -88,17 +91,19 @@ def run_online_mode(args): os.chdir("noodles-{0}".format(key.hex)) if args.verbose: - print("worker: ", - job.foo.__name__, - job.bound_args.args, - job.bound_args.kwargs, - file=sys.stderr, flush=True) + logger.info( + f"worker: {job.foo.__name__} {job.bound_args.args} {job.bound_args.kwarg}" + ) + for handler in logger.handlers: + handler.flush() with redirect_stdout(sys.stderr): result = run_job(key, job) if args.verbose: - print("result: ", result.value, file=sys.stderr, flush=True) + logger.info(f"result: {result.value}") + for handler in logger.handlers: + handler.flush() if args.jobdirs: # parent directory diff --git a/noodles/prov/sqlite.py b/noodles/prov/sqlite.py index cc679c7..44eaf46 100644 --- a/noodles/prov/sqlite.py +++ b/noodles/prov/sqlite.py @@ -21,6 +21,7 @@ once a non-workflow result is known. """ +import logging import sqlite3 from pathlib import Path from threading import Lock @@ -40,6 +41,8 @@ except ImportError: import json +logger = logging.getLogger("noodles") + schema = ''' create table if not exists "jobs" ( @@ -193,8 +196,8 @@ def store_result(self, key, status, value, _): return if key not in self.jobs: - print("WARNING: store_result called but job not in registry:\n" - " race condition? Not doing anything.\n", file=sys.stderr) + logger.warnings("store_result called but job not in registry:\n" + " race condition? Not doing anything.\n") return with self.lock: diff --git a/noodles/run/config.py b/noodles/run/config.py index d735c1b..2555bf5 100644 --- a/noodles/run/config.py +++ b/noodles/run/config.py @@ -1,7 +1,10 @@ +import logging import configparser from typing import (List) from ..lib import (look_up) +logger = logging.getLogger("noodles") + runners = [ # ======================================================================= # @@ -145,7 +148,7 @@ def run_with_config(config_file, workflow, machine=None): machine = config.get('default', 'machine', fallback=machine) if machine is None: - print("No machine given, running local in single thread.") + logger.info("No machine given, running local in single thread.") runner = find_runner(name='single', features=[]) settings = {} diff --git a/noodles/run/job_keeper.py b/noodles/run/job_keeper.py index 057144b..bee48ff 100644 --- a/noodles/run/job_keeper.py +++ b/noodles/run/job_keeper.py @@ -1,3 +1,4 @@ +import logging import uuid import time import json @@ -7,6 +8,8 @@ from ..lib import (coroutine, EndOfQueue) from .messages import (JobMessage, EndOfWork) +logger = logging.getLogger("noodles") + class JobKeeper(dict): def __init__(self, keep=False): @@ -34,8 +37,8 @@ def store_result(self, key, status, value, err): return if key not in self: - print("WARNING: store_result called but job not in registry:\n" - " race condition? Not doing anything.\n", file=sys.stderr) + logger.warning("store_result called but job not in registry:\n" + " race condition? Not doing anything.\n") return with self.lock: @@ -50,8 +53,7 @@ def message(self): if msg is EndOfQueue: return if msg is None: - print("Warning: `None` received where not expected.", - file=sys.stderr) + logger.warning("`None` received where not expected.") return key, status, value, err = msg diff --git a/noodles/run/logging.py b/noodles/run/logging.py index 53b1215..2d3cf8d 100644 --- a/noodles/run/logging.py +++ b/noodles/run/logging.py @@ -3,12 +3,34 @@ of streams. """ +import sys import logging from ..lib import (EndOfQueue) from ..workflow import (is_workflow) from .messages import (JobMessage, ResultMessage) +__all__ = ["make_logger", "logger"] + + +def _initialize_logger() -> logging.Logger: + """Initialize the ``noodles`` logger and set its stream handlers.""" + logger = logging.getLogger("noodles") + logger.setLevel(logging.INFO) + + stream_handler = logging.StreamHandler() + stream_handler.setLevel(logging.INFO) + stream_handler.setFormatter(logging.Formatter( + fmt="[%(asctime)s] %(levelname)s: %(message)s", + datefmt="%H:%M:%S", + )) + + logger.addHandler(stream_handler) + return logger + + +logger = _initialize_logger() + def _sugar(s): """Shorten strings that are too long for decency.""" diff --git a/noodles/run/process.py b/noodles/run/process.py index 47d5498..1f60b88 100644 --- a/noodles/run/process.py +++ b/noodles/run/process.py @@ -5,6 +5,7 @@ Run jobs using a process backend. """ +import logging import sys import uuid from subprocess import Popen, PIPE @@ -23,6 +24,8 @@ from .remote.io import (JSONObjectReader, JSONObjectWriter) +logger = logging.getLogger("noodles") + def process_worker(registry, verbose=False, jobdirs=False, init=None, finish=None, status=True): @@ -49,7 +52,7 @@ def process_worker(registry, verbose=False, jobdirs=False, def read_stderr(): """Read stderr of remote process and sends lines to logger.""" for line in remote.stderr: - print(name + ": " + line.rstrip()) + logger.info(name + ": " + line.rstrip()) stderr_reader_thread = threading.Thread(target=read_stderr, daemon=True) stderr_reader_thread.start() diff --git a/noodles/run/scheduler.py b/noodles/run/scheduler.py index f8874dd..9db5c18 100644 --- a/noodles/run/scheduler.py +++ b/noodles/run/scheduler.py @@ -1,3 +1,4 @@ +import logging from ..lib import (Connection, FlushQueue, EndOfQueue) from .job_keeper import (JobKeeper) @@ -6,6 +7,8 @@ Workflow, is_node_ready) import sys +logger = logging.getLogger("noodles") + class Job: def __init__(self, workflow, node_id): @@ -99,16 +102,17 @@ def run(self, connection: Connection, master: Workflow): except StopIteration: pass - print("Uncaught error running job: {}, {}".format(n, err_msg), - file=sys.stderr) - print("Flushing queue and waiting for threads to close.", - file=sys.stderr, flush=True) + logger.error(f"Uncaught error running job {n}", exc_info=err_msg) + logger.info("Flushing queue and waiting for threads to close.") + for handler in logger.handlers: + handler.flush() if status == 'aborted': - print("Job {} got aborted: {}".format(n, err_msg), - file=sys.stderr) - print("Flushing queue and waiting for threads to close.", - file=sys.stderr, flush=True) + logger.info(f"Job {n} got aborted", exc_info=err_msg) + logger.info("Flushing queue and waiting for threads to close.") + for handler in logger.handlers: + handler.flush() + graceful_exit = True errors.append(err_msg) try: @@ -117,15 +121,12 @@ def run(self, connection: Connection, master: Workflow): pass if self.verbose: - print("sched result [{0}]: ".format(self.key_map[job_key]), - result, - file=sys.stderr, flush=True) + logger.info("sched result [{0}]: {1}".format(self.key_map[job_key], result)) + for handler in logger.handlers: + handler.flush() del self.jobs[job_key] if len(self.jobs) == 0 and graceful_exit: - for error in errors: - print("Exception of type", type(error), ":") - print(error) raise errors[0] # if this result is the root of a workflow, pop to parent diff --git a/noodles/run/worker.py b/noodles/run/worker.py index 32e0d64..87f025a 100644 --- a/noodles/run/worker.py +++ b/noodles/run/worker.py @@ -1,7 +1,10 @@ +import logging from ..lib import (pull_map, EndOfQueue) from .messages import (ResultMessage, JobMessage) import sys +logger = logging.getLogger("noodles") + @pull_map def worker(job): @@ -15,8 +18,7 @@ def worker(job): return if not isinstance(job, JobMessage): - print("Warning: Job should be communicated using `JobMessage`.", - file=sys.stderr) + logger.warning("Job should be communicated using `JobMessage`.") key, node = job return run_job(key, node) diff --git a/noodles/run/xenon/dynamic_pool.py b/noodles/run/xenon/dynamic_pool.py index a47558d..67430bc 100644 --- a/noodles/run/xenon/dynamic_pool.py +++ b/noodles/run/xenon/dynamic_pool.py @@ -5,6 +5,7 @@ from ..messages import ( JobMessage, ResultMessage, EndOfWork) +import logging import threading import sys from collections import namedtuple @@ -12,6 +13,8 @@ from .xenon import (Machine) import grpc +logger = logging.getLogger("noodles") + def xenon_interactive_worker( machine, worker_config, input_queue=None, stderr_sink=None): @@ -36,12 +39,12 @@ def xenon_interactive_worker( def serialise(obj): """Serialise incoming objects, yielding strings.""" if isinstance(obj, JobMessage): - print('serializing:', str(obj.node), file=sys.stderr) + logger.info(f'serializing: {obj.node}') return (registry.to_json(obj, host='scheduler') + '\n').encode() @pull_map def echo(line): - print('{} input: {}'.format(worker_config.name, line), file=sys.stderr) + logger.info('{} input: {}'.format(worker_config.name, line)) return line def do_iterate(source): @@ -59,7 +62,7 @@ def do_iterate(source): def echo_stderr(text): """Print lines.""" for line in text.split('\n'): - print("{}: {}".format(worker_config.name, line), file=sys.stderr) + logger.info("{}: {}".format(worker_config.name, line)) if stderr_sink is None: stderr_sink = echo_stderr() diff --git a/noodles/tutorial.py b/noodles/tutorial.py index bce46b8..29626dc 100644 --- a/noodles/tutorial.py +++ b/noodles/tutorial.py @@ -2,6 +2,7 @@ Functions useful for tutorial work and unit testing. """ +import logging from inspect import Parameter from graphviz import Digraph import html @@ -9,10 +10,12 @@ from . import schedule, schedule_hint, get_workflow +logger = logging.getLogger("noodles") + @schedule def echo_add(x, y): - print('adding', x, 'and', y, file=sys.stderr) + logger.info(f'adding {x} and {y}') return x + y