diff --git a/Utils/Dataflow/pyDKB/VERSION b/Utils/Dataflow/pyDKB/VERSION index 3a2e926bf..f255823f4 100644 --- a/Utils/Dataflow/pyDKB/VERSION +++ b/Utils/Dataflow/pyDKB/VERSION @@ -1 +1 @@ -0.3.20190703 +0.3.20190916 diff --git a/Utils/Dataflow/pyDKB/common/LoggableObject.py b/Utils/Dataflow/pyDKB/common/LoggableObject.py new file mode 100644 index 000000000..64e4ab92f --- /dev/null +++ b/Utils/Dataflow/pyDKB/common/LoggableObject.py @@ -0,0 +1,21 @@ +""" +pyDKB.common.LoggableObject +""" + +from types import logLevel +from misc import log + + +class LoggableObject(object): + """ Common ancestor for all classes that need 'log' method. """ + + @classmethod + def log(cls, message, level=logLevel.INFO): + """ Output log message with given log level. + + :param message: message to output + :type message: str + :param level: log level of the message + :type level: ``pyDKB.common.types.logLevel`` member + """ + log(message, level, cls.__name__) diff --git a/Utils/Dataflow/pyDKB/common/__init__.py b/Utils/Dataflow/pyDKB/common/__init__.py index 024e2871f..7b8a0c215 100644 --- a/Utils/Dataflow/pyDKB/common/__init__.py +++ b/Utils/Dataflow/pyDKB/common/__init__.py @@ -7,3 +7,4 @@ import json_utils as json from custom_readline import custom_readline from Type import Type +from LoggableObject import LoggableObject diff --git a/Utils/Dataflow/pyDKB/common/hdfs.py b/Utils/Dataflow/pyDKB/common/hdfs.py index 2093a8583..3be69580c 100644 --- a/Utils/Dataflow/pyDKB/common/hdfs.py +++ b/Utils/Dataflow/pyDKB/common/hdfs.py @@ -10,6 +10,7 @@ import tempfile from . import HDFSException +from misc import (log, logLevel) DEVNULL = open(os.path.devnull, "w") DKB_HOME = "/user/DKB/" @@ -33,7 +34,7 @@ def check_stderr(proc, timeout=None, max_lines=1): if err: n_lines += 1 if max_lines is None or n_lines <= max_lines: - sys.stderr.write("(INFO) (proc) %s\n" % err) + log("%s" % err, logLevel.INFO, __name__, 'proc') if proc.poll(): raise subprocess.CalledProcessError(proc.returncode, None) return proc.poll() @@ -78,8 +79,8 @@ def movefile(fname, dest): try: os.remove(fname) except OSError, err: - sys.stderr.write("(WARN) Failed to remove local copy of HDFS file" - " (%s): %s" % (fname, err)) + log("Failed to remove local copy of HDFS file" + " (%s): %s" % (fname, err), logLevel.WARN) def getfile(fname): diff --git a/Utils/Dataflow/pyDKB/common/misc.py b/Utils/Dataflow/pyDKB/common/misc.py new file mode 100644 index 000000000..08a7e8963 --- /dev/null +++ b/Utils/Dataflow/pyDKB/common/misc.py @@ -0,0 +1,58 @@ +""" +pyDKB.common.misc + +Miscellanious utility functions. +""" + +import sys +import inspect +from datetime import datetime + +from types import logLevel + +# Datetime format for log messages +DTFORMAT = '%Y-%m-%d %H:%M:%S' + + +def log(message, level=logLevel.INFO, *args): + """ Output log message with given log level. + + In case of multiline messages or list of messages only first line (message) + is prepended with provided prefixes and timestamp; in all the next lines + (messages) they are replaced with special prefix '(==)', representing that + these lines belong to the same log record. + + Empty lines and lines containing only whitespace symbols are ignored. + + :param message: message to output (string, list of strings or + any other object) + :type message: object + :param level: log level of the message + :type level: ``pyDKB.common.types.logLevel`` member + :param *args: additional prefixes (will be output between log + level prefix and message body) + :type *args: str + """ + if not logLevel.hasMember(level): + log("Unknown log level: %s" % level, logLevel.WARN) + level = logLevel.INFO + if type(message) != list: + message = [message] + lines = [] + for m in message: + lines += [line for line in str(m).splitlines() if line.strip()] + if args: + prefix = ' ' + ' '.join(['(%s)' % p for p in args]) + else: + frm = inspect.stack()[1] + mod = inspect.getmodule(frm[0]) + modname = getattr(mod, '__name__', 'main') + prefix = ' (%s)' % modname + if lines: + dtime = datetime.now().strftime(DTFORMAT) + out_message = "%s (%s)%s %s" % (dtime, logLevel.memberName(level), + prefix, lines[0]) + for l in lines[1:]: + out_message += "\n(==) %s" % l + out_message += "\n" + sys.stderr.write(out_message) diff --git a/Utils/Dataflow/pyDKB/common/types.py b/Utils/Dataflow/pyDKB/common/types.py new file mode 100644 index 000000000..59111a5c4 --- /dev/null +++ b/Utils/Dataflow/pyDKB/common/types.py @@ -0,0 +1,9 @@ +""" +pyDKB.common.types + +Definitions of types used across all the library modules. +""" + +from Type import Type + +logLevel = Type("TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL") diff --git a/Utils/Dataflow/pyDKB/dataflow/cds.py b/Utils/Dataflow/pyDKB/dataflow/cds.py index 342eae719..8e090db2f 100644 --- a/Utils/Dataflow/pyDKB/dataflow/cds.py +++ b/Utils/Dataflow/pyDKB/dataflow/cds.py @@ -6,6 +6,7 @@ import signal import os +from pyDKB.common.misc import (log, logLevel) __all__ = ["CDSInvenioConnector", "KerberizedCDSInvenioConnector"] @@ -19,7 +20,7 @@ from invenio_client.contrib import cds import splinter except ImportError, e: - sys.stderr.write("(WARN) %s failed (%s)\n" % (__name__, e)) + log("Submodule failed (%s)" % e, logLevel.WARN) __all__ = [] else: @@ -80,9 +81,9 @@ def __init__(self, login="user", password="password"): try: kerberos except NameError: - sys.stderr.write("(ERROR) Kerberos Python package is not" - " installed. Can't proceed with Kerberos" - " authorization.\n") + log("Kerberos Python package is not" + " installed. Can't proceed with Kerberos" + " authorization.", logLevel.ERROR) sys.exit(4) super(KerberizedCDSInvenioConnector, self).__init__("user", @@ -105,5 +106,5 @@ def _init_browser(self): self.browser.find_link_by_partial_text("Sign in").click() except kerberos.GSSError, e: - sys.stderr.write("(ERROR) %s\n" % str(e)) + log("%s" % str(e), logLevel.ERROR) sys.exit(3) diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/__init__.py b/Utils/Dataflow/pyDKB/dataflow/communication/__init__.py index 014963cdc..ef2908394 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/__init__.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/__init__.py @@ -4,7 +4,6 @@ from .. import messageType from .. import codeType -from .. import logLevel from .. import DataflowException from messages import Message diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py index 1bc235f94..20ef4bdd7 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py @@ -4,7 +4,7 @@ import sys -from . import logLevel +from pyDKB.common import LoggableObject from . import DataflowException from .. import Message @@ -16,7 +16,7 @@ class ConsumerException(DataflowException): pass -class Consumer(object): +class Consumer(LoggableObject): """ Data consumer implementation. """ config = None @@ -30,24 +30,6 @@ def __init__(self, config={}): self.config = config self.reconfigure() - def log(self, message, level=logLevel.INFO): - """ Output log message with given log level. """ - if not logLevel.hasMember(level): - self.log("Unknown log level: %s" % level, logLevel.WARN) - level = logLevel.INFO - if type(message) == list: - lines = message - else: - lines = message.splitlines() - if lines: - out_message = "(%s) (%s) %s" % (logLevel.memberName(level), - self.__class__.__name__, - lines[0]) - for l in lines[1:]: - out_message += "\n(==) %s" % l - out_message += "\n" - sys.stderr.write(out_message) - def __iter__(self): """ Initialize iteration. """ return self diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py index b0da3d579..52fb0776c 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py @@ -14,7 +14,7 @@ import os import Consumer -from . import logLevel +from pyDKB.common.types import logLevel from .. import Message diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/__init__.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/__init__.py index 382ebaf50..331aa1cee 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/__init__.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/__init__.py @@ -3,7 +3,6 @@ """ from .. import messageType -from .. import logLevel from .. import DataflowException from Consumer import Consumer diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/messages.py b/Utils/Dataflow/pyDKB/dataflow/communication/messages.py index 4754af2f5..bc98bdd33 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/messages.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/messages.py @@ -6,6 +6,7 @@ from . import messageType from . import codeType +from pyDKB.common.misc import (log, logLevel) import json import sys @@ -35,9 +36,8 @@ def Message(msg_type): raise ValueError("Message type must be a member of messageType") cls = __message_class.get(msg_type) if not cls: - sys.stderr.write( - "(WARN) Message class for type %s is not implemented. " - "Using AbstractMessage instead.") + log("Message class for type %s is not implemented. " + "Using AbstractMessage instead.", logLevel.WARN) cls = AbstractMessage return cls diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/producer/FileProducer.py b/Utils/Dataflow/pyDKB/dataflow/communication/producer/FileProducer.py index c82d049c1..fd8232b24 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/producer/FileProducer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/producer/FileProducer.py @@ -13,7 +13,7 @@ import time from Producer import Producer, ProducerException -from . import logLevel +from pyDKB.common.types import logLevel class FileProducer(Producer): diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py b/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py index cde3f870a..d953cf070 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py @@ -4,7 +4,7 @@ import sys -from . import logLevel +from pyDKB.common import LoggableObject from . import DataflowException from .. import Message @@ -16,7 +16,7 @@ class ProducerException(DataflowException): pass -class Producer(object): +class Producer(LoggableObject): """ Data producer implementation. """ config = None @@ -30,23 +30,6 @@ def __init__(self, config={}): self.config = config self.reconfigure() - def log(self, message, level=logLevel.INFO): - """ Output log message with given log level. """ - if not logLevel.hasMember(level): - self.log("Unknown log level: %s" % level, logLevel.WARN) - level = logLevel.INFO - if type(message) == list: - lines = message - else: - lines = message.splitlines() - if lines: - out_message = "(%s) (%s) %s" % (logLevel.memberName(level), - self.__class__.__name__, lines[0]) - for l in lines[1:]: - out_message += "\n(==) %s" % l - out_message += "\n" - sys.stderr.write(out_message) - def reconfigure(self, config={}): """ (Re)initialize producer with stage config arguments. """ if config: diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/producer/__init__.py b/Utils/Dataflow/pyDKB/dataflow/communication/producer/__init__.py index 0e9a1cef5..a8cef8be5 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/producer/__init__.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/producer/__init__.py @@ -3,7 +3,6 @@ """ from .. import messageType -from .. import logLevel from .. import DataflowException from Producer import Producer diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py index fcbc11e2d..121bcb79d 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py @@ -3,7 +3,7 @@ """ from Stream import Stream -from . import logLevel +from pyDKB.common.types import logLevel from . import Message from pyDKB.common import custom_readline diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/Stream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/Stream.py index 75c0b87b1..52d55096d 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/Stream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/Stream.py @@ -4,12 +4,12 @@ import sys +from pyDKB.common import LoggableObject from . import messageType -from . import logLevel from exceptions import StreamException -class Stream(object): +class Stream(LoggableObject): """ Abstract class for input/output streams. """ message_type = None @@ -21,24 +21,6 @@ def __init__(self, fd=None, config={}): self.reset(fd) self.configure(config) - def log(self, message, level=logLevel.INFO): - """ Output log message with given log level. """ - if not logLevel.hasMember(level): - self.log("Unknown log level: %s" % level, logLevel.WARN) - level = logLevel.INFO - if type(message) == list: - lines = message - else: - lines = message.splitlines() - if lines: - out_message = "(%s) (%s) %s" % (logLevel.memberName(level), - self.__class__.__name__, - lines[0]) - for l in lines[1:]: - out_message += "\n(==) %s" % l - out_message += "\n" - sys.stderr.write(out_message) - def configure(self, config): """ Stream configuration. """ if not isinstance(config, dict): diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/__init__.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/__init__.py index 396a71be0..645ccdeff 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/__init__.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/__init__.py @@ -4,7 +4,6 @@ from .. import messageType from .. import codeType -from .. import logLevel from .. import DataflowException from .. import Message from InputStream import InputStream diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py index d3148afa1..1c477301f 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py @@ -8,16 +8,18 @@ from collections import defaultdict import textwrap -from . import logLevel +from pyDKB.common import LoggableObject +from pyDKB.common.types import logLevel +from pyDKB.common.misc import log try: import argparse except ImportError, e: - sys.stderr.write("(ERROR) argparse package is not installed.\n") + log("argparse package is not installed.", logLevel.ERROR) raise e -class AbstractStage(object): +class AbstractStage(LoggableObject): """ Class/instance variable description: * Argument parser (argparse.ArgumentParser) @@ -51,24 +53,6 @@ def __init__(self, description="DKB Dataflow stage"): self._error = None - def log(self, message, level=logLevel.INFO): - """ Output log message with given log level. """ - if not logLevel.hasMember(level): - self.log("Unknown log level: %s" % level, logLevel.WARN) - level = logLevel.INFO - if type(message) == list: - lines = message - else: - lines = message.splitlines() - if lines: - out_message = "(%s) (%s) %s" % (logLevel.memberName(level), - self.__class__.__name__, - lines[0]) - for l in lines[1:]: - out_message += "\n(==) %s" % l - out_message += "\n" - sys.stderr.write(out_message) - def log_configuration(self): """ Log stage configuration. """ self.log("Configuration parameters:") @@ -178,8 +162,8 @@ def parse_args(self, args): try: self.ARGS.eom = self.ARGS.eom.decode('string_escape') except (ValueError), err: - sys.stderr.write("(ERROR) Failed to read arguments.\n" - "(ERROR) Case: %s\n" % (err)) + self.log("Failed to read arguments.\n" + "Case: %s" % (err), logLevel.ERROR) sys.exit(1) if self.ARGS.eop is None: @@ -193,8 +177,8 @@ def parse_args(self, args): try: self.ARGS.eop = self.ARGS.eop.decode('string_escape') except (ValueError), err: - sys.stderr.write("(ERROR) Failed to read arguments.\n" - "(ERROR) Case: %s\n" % (err)) + self.log("Failed to read arguments.\n" + "Case: %s" % (err), logLevel.ERROR) sys.exit(1) if self.ARGS.mode == 'm': diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py index 8373c2235..fceb5d826 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py @@ -43,7 +43,7 @@ from . import AbstractStage from . import messageType -from . import logLevel +from pyDKB.common.types import logLevel from pyDKB.dataflow import DataflowException from pyDKB.common import hdfs from pyDKB.dataflow import communication diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/__init__.py b/Utils/Dataflow/pyDKB/dataflow/stage/__init__.py index 611ef0cd5..5c5c799f6 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/__init__.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/__init__.py @@ -3,7 +3,6 @@ """ from .. import messageType -from .. import logLevel from AbstractStage import AbstractStage from ProcessorStage import ProcessorStage diff --git a/Utils/Dataflow/pyDKB/dataflow/types.py b/Utils/Dataflow/pyDKB/dataflow/types.py index ecc1f790a..b21e0db9a 100644 --- a/Utils/Dataflow/pyDKB/dataflow/types.py +++ b/Utils/Dataflow/pyDKB/dataflow/types.py @@ -4,9 +4,8 @@ from ..common import Type -__all__ = ["dataType", "messageType", "codeType", "logLevel"] +__all__ = ["dataType", "messageType", "codeType"] dataType = Type("DOCUMENT", "AUTHOR", "DATASET") messageType = Type("STRING", "JSON", "TTL") codeType = Type("STRING") -logLevel = Type("TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL") diff --git a/Utils/Dataflow/test/pyDKB/case/20/err b/Utils/Dataflow/test/pyDKB/case/20/err index 794acb20a..8a893f676 100644 --- a/Utils/Dataflow/test/pyDKB/case/20/err +++ b/Utils/Dataflow/test/pyDKB/case/20/err @@ -18,11 +18,11 @@ (==) self.flush_buffer() (==) File "./../../pyDKB/dataflow/stage/ProcessorStage.py", line 341, in flush_buffer (==) self.__output.flush() -(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 124, in flush +(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 107, in flush (==) self.get_stream().flush() -(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 71, in get_stream +(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 54, in get_stream (==) self.reset_stream() -(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 81, in reset_stream +(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 64, in reset_stream (==) dest = self.get_dest() (==) File "./../../pyDKB/dataflow/communication/producer/FileProducer.py", line 60, in get_dest (==) self.reset_file() diff --git a/Utils/Dataflow/test/pyDKB/case/21/err b/Utils/Dataflow/test/pyDKB/case/21/err index 3c55e6dc6..315397776 100644 --- a/Utils/Dataflow/test/pyDKB/case/21/err +++ b/Utils/Dataflow/test/pyDKB/case/21/err @@ -19,11 +19,11 @@ (==) self.flush_buffer() (==) File "./../../pyDKB/dataflow/stage/ProcessorStage.py", line 341, in flush_buffer (==) self.__output.flush() -(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 124, in flush +(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 107, in flush (==) self.get_stream().flush() -(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 71, in get_stream +(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 54, in get_stream (==) self.reset_stream() -(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 81, in reset_stream +(==) File "./../../pyDKB/dataflow/communication/producer/Producer.py", line 64, in reset_stream (==) dest = self.get_dest() (==) File "./../../pyDKB/dataflow/communication/producer/FileProducer.py", line 60, in get_dest (==) self.reset_file() diff --git a/Utils/Dataflow/test/pyDKB/test.sh b/Utils/Dataflow/test/pyDKB/test.sh index ece2f74db..2ecbc6e10 100755 --- a/Utils/Dataflow/test/pyDKB/test.sh +++ b/Utils/Dataflow/test/pyDKB/test.sh @@ -52,8 +52,9 @@ test_case() { after=`cat $case/after 2>/dev/null` eval "$before $cmd; $after" 2>&1 1> out.tmp | \ - grep -a -v '(WARN) pyDKB.dataflow.cds failed (No module named invenio_client.contrib)' | \ - sed -e"s#$base_dir#\$base_dir#" > err.tmp + grep -a -v '(WARN) (pyDKB.dataflow.cds) Submodule failed (No module named invenio_client.contrib)' | \ + sed -E -e"s#$base_dir#\$base_dir#" \ + -e"s#^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} ##" > err.tmp err_correct=0 out_correct=0