diff --git a/Docs/build/html/_modules/index.html b/Docs/build/html/_modules/index.html index 767990934..eb719722c 100644 --- a/Docs/build/html/_modules/index.html +++ b/Docs/build/html/_modules/index.html @@ -37,12 +37,26 @@

All modules for which code is available

  • pyDKB.common.exceptions
  • pyDKB.common.hdfs
  • pyDKB.common.json_utils
  • -
  • pyDKB.dataflow.dkbID
  • +
  • pyDKB.dataflow.communication.consumer
  • +
  • pyDKB.dataflow.communication.messages
  • +
  • pyDKB.dataflow.communication.producer
  • +
  • pyDKB.dataflow.communication.stream
  • +
  • pyDKB.dataflow.dkbID
  • pyDKB.dataflow.exceptions
  • -
  • pyDKB.dataflow.messages
  • -
  • pyDKB.dataflow.stage.AbstractProcessorStage
  • pyDKB.dataflow.stage.AbstractStage
  • -
  • pyDKB.dataflow.stage.processors
  • +
  • pyDKB.dataflow.stage.ProcessorStage
  • diff --git a/Docs/build/html/_modules/pyDKB/common/hdfs.html b/Docs/build/html/_modules/pyDKB/common/hdfs.html index 20fcf2070..ca01c2323 100644 --- a/Docs/build/html/_modules/pyDKB/common/hdfs.html +++ b/Docs/build/html/_modules/pyDKB/common/hdfs.html @@ -40,6 +40,8 @@

    Source code for pyDKB.common.hdfs

     import subprocess
     import select
     import os
    +import posixpath as path
    +import tempfile
     
     from . import HDFSException
     
    @@ -103,13 +105,24 @@ 

    Source code for pyDKB.common.hdfs

                                 "Error message: %s\n" % (fname, err))
    +
    [docs]def movefile(fname, dest): + """ Move local file to HDFS. """ + if os.path.exists(fname): + putfile(fname, dest) + try: + os.remove(fname) + except OSError, err: + sys.stderr.write("(WARN) Failed to remove local copy of HDFS file" + " (%s): %s" % (fname, err))
    + +
    [docs]def getfile(fname): """ Download file from HDFS. Return value: file name (without directory) """ cmd = ["hadoop", "fs", "-get", fname] - name = os.path.basename(fname) + name = basename(fname) try: proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, @@ -124,6 +137,32 @@

    Source code for pyDKB.common.hdfs

         return name
    +
    [docs]def File(fname): + """ Get and open temporary local copy of HDFS file + + Return value: open file object (TemporaryFile). + """ + cmd = ["hadoop", "fs", "-cat", fname] + tmp_file = tempfile.TemporaryFile() + try: + proc = subprocess.Popen(cmd, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + stdout=tmp_file) + check_stderr(proc) + tmp_file.seek(0) + except (subprocess.CalledProcessError, OSError), err: + if isinstance(err, subprocess.CalledProcessError): + err.cmd = ' '.join(cmd) + tmp_file.close() + raise HDFSException("Failed to get file from HDFS: %s\n" + "Error message: %s\n" % (fname, err)) + if tmp_file.closed: + return None + + return tmp_file
    + +
    [docs]def listdir(dirname, mode='a'): """ List files and/or subdirectories of HDFS directory. @@ -172,7 +211,7 @@

    Source code for pyDKB.common.hdfs

     
             # We need to return only the name of the file or subdir
             filename = line[7]
    -        filename = os.path.basename(filename)
    +        filename = basename(filename)
             if line[0][0] == 'd':
                 subdirs.append(filename)
             elif line[0][0] == '-':
    @@ -186,6 +225,29 @@ 

    Source code for pyDKB.common.hdfs

             result = subdirs
     
         return result
    + + +
    [docs]def basename(path): + """ Return file name without path. """ + if path is None: + path = '' + return path.basename(path).strip()
    + + +
    [docs]def dirname(path): + """ Return dirname without filename. """ + if path is None: + path = '' + return path.dirname(path).strip()
    + + +
    [docs]def join(path, filename): + """ Join path and filename. """ + if path is None: + path = '' + if filename is None: + filename = '' + return path.join(path, filename).strip()
    diff --git a/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer.html b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer.html new file mode 100644 index 000000000..e9bbcc6b0 --- /dev/null +++ b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer.html @@ -0,0 +1,161 @@ + + + + + + + + pyDKB.dataflow.communication.consumer — Data Knowledge Base documentation + + + + + + + + + + + + + + + + + +
    +
    +
    + + +
    + +

    Source code for pyDKB.dataflow.communication.consumer

    +"""
    +Consumer submodule init file.
    +"""
    +
    +from .. import messageType
    +from .. import logLevel
    +from .. import DataflowException
    +
    +from Consumer import Consumer
    +from FileConsumer import FileConsumer
    +from HDFSConsumer import HDFSConsumer
    +from StreamConsumer import StreamConsumer
    +
    +__all__ = ['ConsumerBuilder']
    +
    +
    +
    [docs]class ConsumerBuilder(object): + """ Constructor for Consumer instance. """ + + consumerClass = None + + def __init__(self, config={}): + """ Constructor initialization. """ + if not isinstance(config, dict): + raise TypeError("ConsumerBuilder expects argument of type 'dict'" + " (got '%s')" % config.__class__.__name__) + self.config = config + + if config.get('hdfs'): + self.setSource('h') + elif config.get('mode') in ('s', 'm'): + self.setSource('s') + else: + self.setSource(config.get('source')) + +
    [docs] def setSource(self, source): + """ Set data source for the consumer. """ + sources = { + 'h': HDFSConsumer, + 's': StreamConsumer, + 'f': FileConsumer + } + if source not in sources: + raise ValueError("ConsumerBuilder.setSource() expects one of the" + " following values: %s (got '%s')" + % (sources.keys(), source)) + + self.consumerClass = sources[source] + return self
    + +
    [docs] def setType(self, Type): + """ Set message type for the consumer. """ + if Type is not None and not messageType.hasMember(Type): + raise ValueError("Unknown message type: %s" % Type) + self.message_type = Type + return self
    + +
    [docs] def build(self, config={}): + """ Return constructed consumer. """ + if not config: + config = self.config + instance = self.consumerClass(config) + if self.message_type: + instance.set_message_type(self.message_type) + return instance
    +
    + +
    + +
    +
    + +
    +
    + + + + + + + \ No newline at end of file diff --git a/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/Consumer.html b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/Consumer.html new file mode 100644 index 000000000..12bc0ba15 --- /dev/null +++ b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/Consumer.html @@ -0,0 +1,234 @@ + + + + + + + + pyDKB.dataflow.communication.consumer.Consumer — Data Knowledge Base documentation + + + + + + + + + + + + + + + + + +
    +
    +
    + + +
    + +

    Source code for pyDKB.dataflow.communication.consumer.Consumer

    +"""
    +pyDKB.dataflow.communication.consumer.Consumer
    +"""
    +
    +import sys
    +from collections import defaultdict
    +
    +from . import messageType
    +from . import logLevel
    +from . import DataflowException
    +
    +from .. import Message
    +from ..stream import StreamBuilder
    +
    +
    +
    [docs]class ConsumerException(DataflowException): + """ Dataflow Consumer exception. """ + pass
    + + +
    [docs]class Consumer(object): + """ Data consumer implementation. """ + + config = None + + message_type = None + + _stream = None + + def __init__(self, config={}): + """ Initialize Consumer instance. """ + self.config = config + self.reconfigure() + +
    [docs] def log(self, message, level=logLevel.INFO): + """ Output log message with given log level. """ + if not logLevel.hasMember(level): + self.log("Unknown log level: %s" % level, logLevel.WARN) + level = logLevel.INFO + if type(message) == list: + lines = message + else: + lines = message.splitlines() + if lines: + out_message = "(%s) (%s) %s" % (logLevel.memberName(level), + self.__class__.__name__, + lines[0]) + for l in lines[1:]: + out_message += "\n(==) %s" % l + out_message += "\n" + sys.stderr.write(out_message)
    + + def __iter__(self): + """ Initialize iteration. """ + return self + +
    [docs] def reconfigure(self, config={}): + """ (Re)initialize consumer with stage config arguments. """ + if config: + self.config = config
    + +
    [docs] def init_stream(self): + """ Init input stream. """ + src = self.get_source() + if src: + self._stream = \ + StreamBuilder(src, self.config) \ + .setStream('input') \ + .setType(self.message_type) \ + .build()
    + +
    [docs] def get_stream(self): + """ Get input stream linked to the current source. + + Return value: + InputStream + None (no sources left to read from) + """ + if self.reset_stream(): + result = self._stream + else: + result = None + return result
    + +
    [docs] def reset_stream(self): + """ Reset input stream to the current source. """ + src = self.get_source() + if src: + if not self._stream: + self.init_stream() + else: + self._stream.reset(src) + return src
    + +
    [docs] def set_message_type(self, Type): + """ Set input message type. """ + self.message_type = Type + stream = self.get_stream() + if stream: + stream.set_message_type(Type)
    + +
    [docs] def message_class(self): + """ Return message class. """ + return Message(self.message_type)
    + +
    [docs] def get_source_info(self): + """ Return current source info. """ + raise NotImplementedError
    + +
    [docs] def get_message(self): + """ Get new message from current source. + + Return values: + Message object + False (failed to parse message) + None (all input sources are empty) + """ + s = self.get_stream() + if not s: + msg = None + else: + msg = s.get_message() + return msg
    + +
    [docs] def next(self): + """ Return new Message, read from input stream. """ + msg = self.get_message() + if msg is None: + raise StopIteration + return msg
    + +
    [docs] def close(self): + """ Close opened data stream and data source. """ + for s in (self.get_stream(), self.get_source()): + if s and not getattr(s, 'closed', True): + s.close()
    +
    + +
    + +
    +
    + +
    +
    + + + + + + + \ No newline at end of file diff --git a/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/FileConsumer.html b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/FileConsumer.html new file mode 100644 index 000000000..a01cb0c80 --- /dev/null +++ b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/FileConsumer.html @@ -0,0 +1,241 @@ + + + + + + + + pyDKB.dataflow.communication.consumer.FileConsumer — Data Knowledge Base documentation + + + + + + + + + + + + + + + + + +
    +
    +
    + + +
    + +

    Source code for pyDKB.dataflow.communication.consumer.FileConsumer

    +"""
    +pyDKB.dataflow.communication.consumer.FileConsumer
    +
    +Data consumer implementation for common (static) files.
    +
    +TODO: think about:
    +      * updatable files
    +      * pipes (better, from the point of StreamConsumer)
    +      * round-robin (for updatable sources)
    +      * ...
    +"""
    +
    +import sys
    +import os
    +
    +import Consumer
    +from . import DataflowException
    +from . import logLevel
    +from .. import Message
    +
    +
    +
    [docs]class FileConsumer(Consumer.Consumer): + """ Data consumer implementation for HDFS data source. """ + + # Current file + current_file = None + + # Override +
    [docs] def reconfigure(self, config={}): + """ (Re)initialize consumer with Stage configuration. """ + if not config: + config = self.config + + if not self.config.get('input_dir'): + self.config['input_dir'] = os.path.curdir + self.input_files = None + + super(FileConsumer, self).reconfigure(config)
    + +
    [docs] def source_is_empty(self): + """ Check if current source is empty. + + Return value: + True (empty) + False (not empty) + None (no source) + """ + f = self.current_file + if not f: + return None + fd = f['fd'] + if not f.get('size'): + stat = os.fstat(fd.fileno()) + f['size'] = stat.st_size + return fd.tell() == f['size']
    + +
    [docs] def get_source_info(self): + """ Return current source info. """ + return self.current_file
    + +
    [docs] def init_sources(self): + """ Initialize sources iterator if not initialized yet. """ + if not self.input_files: + self.input_files = self._input_files()
    + +
    [docs] def get_source(self): + """ Get nearest non-empty source (current or next). """ + if self.source_is_empty() is not False: + result = self.next_source() + else: + result = self.current_file['fd'] + return result
    + +
    [docs] def next_source(self): + """ Reset $current_file to the next non-empty file. + + Return value: + File descriptor of the new $current_file + None (no files left) + """ + if not self.input_files: + self.init_sources() + try: + self.current_file = self.input_files.next() + result = self.get_source() + except StopIteration: + self.current_file = None + result = None + return result
    + + def _filenames(self): + """ Return iterable object with filenames, taken from input. """ + if self.config.get('input_files'): + files = self.config['input_files'] + elif self.config.get('input_dir'): + files = self._filenames_from_dir(self.config['input_dir']) + else: + self.log("No input files configured; reading filenames from" + " STDIN.", logLevel.WARN) + files = self._filenames_from_stdin() + return files + + def _filenames_from_stdin(self): + """ Return iterable object, yielding filenames read from STDIN. """ + return iter(sys.stdin.readline, "") + + def _filenames_from_dir(self, dirname): + """ Return list of files in given local directory. """ + files = [] + ext = Message(self.message_type).extension() + try: + dir_content = os.listdir(dirname) + for f in dir_content: + if os.path.isfile(os.path.join(dirname, f)) \ + and f.lower().endswith(ext): + files.append(f) + yield f + except OSError, err: + raise Consumer.ConsumerException(err) + + def _adjusted_filenames(self): + """ Return iterable object, yielding filename and path to file. """ + for f in self._filenames(): + d = {} + d['name'] = os.path.basename(f).strip() + d['dir'] = os.path.join(self.config.get('input_dir', '').strip(), + os.path.dirname(f).strip()) + d['full_path'] = os.path.join(d['dir'], d['name']) + yield d + + def _input_files(self): + """ Return iterable object, yielding dict with open file metadata. + + Metadata include: + * fd -- open file descriptor + * name -- file name + * dir -- directory name + * full_path -- full path to the file + """ + input_files = self._adjusted_filenames() + for f in input_files: + with open(f['full_path'], 'r') as f['fd']: + yield f
    +
    + +
    + +
    +
    + +
    +
    + + + + + + + \ No newline at end of file diff --git a/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/HDFSConsumer.html b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/HDFSConsumer.html new file mode 100644 index 000000000..2088ff515 --- /dev/null +++ b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/HDFSConsumer.html @@ -0,0 +1,175 @@ + + + + + + + + pyDKB.dataflow.communication.consumer.HDFSConsumer — Data Knowledge Base documentation + + + + + + + + + + + + + + + + + +
    +
    +
    + + +
    + +

    Source code for pyDKB.dataflow.communication.consumer.HDFSConsumer

    +"""
    +pyDKB.dataflow.communication.consumer.HDFSConsumer
    +"""
    +
    +import sys
    +
    +import FileConsumer
    +import Consumer
    +from . import DataflowException
    +from pyDKB.common import hdfs
    +from pyDKB.common import HDFSException
    +from . import logLevel
    +
    +
    +
    [docs]class HDFSConsumer(FileConsumer.FileConsumer): + """ Data consumer implementation for HDFS data source. """ + + # Override +
    [docs] def reconfigure(self, config={}): + """ Configure HDFS Consumer according to the config parameters. """ + if not config: + config = self.config + + if not config.get('input_dir'): + config['input_dir'] = hdfs.DKB_HOME + + super(HDFSConsumer, self).reconfigure(config)
    + + # Override + def _filenames(self): + """ Return iterable object with filenames, taken from input. """ + if self.config.get('mode') == 'm': + if self.config.get('input_files'): + self.log("Input file names are ignored in MapReduce mode.") + del self.config['input_files'] + files = self._filenames_from_stdin() + else: + files = super(HDFSConsumer, self)._filenames() + return files + + # Override + def _filenames_from_dir(self, dirname): + """ Return list of files in given HDFS directory. + + Raises pyDKB.common.HDFSException + """ + try: + files = hdfs.listdir(dirname, "f") + except HDFSException, err: + raise Consumer.ConsumerException(err) + return files + + # Override + def _adjusted_filenames(self): + """ Return iterable object, yielding filename and path to file. """ + for f in self._filenames(): + d = {} + d['name'] = hdfs.basename(f) + d['dir'] = hdfs.join(self.config.get('input_dir', '').strip(), + hdfs.dirname(f)) + d['full_path'] = hdfs.join(d['dir'], d['name']) + yield d + + # Override + def _input_files(self): + """ Return iterable object, yielding dict with open file metadata. + + Metadata include: + * fd -- open file descriptor + * name -- file name + * dir -- HDFS directory name + * full_path -- full path to the file in HDFS + """ + input_files = self._adjusted_filenames() + for f in input_files: + with hdfs.File(f['full_path']) as f['fd']: + yield f
    +
    + +
    + +
    +
    + +
    +
    + + + + + + + \ No newline at end of file diff --git a/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/StreamConsumer.html b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/StreamConsumer.html new file mode 100644 index 000000000..d17cff674 --- /dev/null +++ b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/StreamConsumer.html @@ -0,0 +1,140 @@ + + + + + + + + pyDKB.dataflow.communication.consumer.StreamConsumer — Data Knowledge Base documentation + + + + + + + + + + + + + + + + + +
    +
    +
    + + +
    + +

    Source code for pyDKB.dataflow.communication.consumer.StreamConsumer

    +"""
    +pyDKB.dataflow.communication.consumer.StreamConsumer
    +
    +Data consumer implementation for a single stream.
    +
    +TODO: think about multiple streams (like a number of named
    +      pipes, etc). Prehaps, even merge this class with FileConsumer.
    +"""
    +
    +import sys
    +import os
    +
    +import Consumer
    +from . import DataflowException
    +from . import logLevel
    +
    +
    +
    [docs]class StreamConsumer(Consumer.Consumer): + """ Data consumer implementation for Stream data source. """ + + fd = None + + # Override +
    [docs] def reconfigure(self, config={}): + """ (Re)configure Stream consumer. """ + self.fd = sys.stdin + super(StreamConsumer, self).reconfigure(config)
    + +
    [docs] def get_source_info(self): + """ Return current source info. """ + return {'fd': self.fd}
    + +
    [docs] def get_source(self): + """ Get Stream file descriptor. """ + return self.fd
    + +
    [docs] def next_source(self): + """ Return None. + + As currenty we believe that there is only one input stream + """ + return None
    +
    + +
    + +
    +
    + +
    +
    + + + + + + + \ No newline at end of file diff --git a/Docs/build/html/_modules/pyDKB/dataflow/messages.html b/Docs/build/html/_modules/pyDKB/dataflow/communication/messages.html similarity index 74% rename from Docs/build/html/_modules/pyDKB/dataflow/messages.html rename to Docs/build/html/_modules/pyDKB/dataflow/communication/messages.html index d1d02e31e..6f6aa449d 100644 --- a/Docs/build/html/_modules/pyDKB/dataflow/messages.html +++ b/Docs/build/html/_modules/pyDKB/dataflow/communication/messages.html @@ -6,17 +6,17 @@ - pyDKB.dataflow.messages — Data Knowledge Base documentation - - - - - - - - + pyDKB.dataflow.communication.messages — Data Knowledge Base documentation + + + + + + + + - + @@ -31,13 +31,15 @@
    -

    Source code for pyDKB.dataflow.messages

    +  

    Source code for pyDKB.dataflow.communication.messages

     """
    +pyDKB.dataflow.communication.messages
    +
     Definition of abstract message class and specific message classes
     """
     
     from . import messageType
    -from pyDKB.dataflow.types import codeType
    +from . import codeType
     
     import json
     import sys
    @@ -45,7 +47,7 @@ 

    Source code for pyDKB.dataflow.messages

     __message_class = {}
     
     
    -
    [docs]class DecodeUnknownType(NotImplementedError): +
    [docs]class DecodeUnknownType(NotImplementedError): """ Exception to be thrown when message type is not decodable. """ def __init__(self, code, cls): message = "%s can`t be decoded from %s" \ @@ -53,7 +55,7 @@

    Source code for pyDKB.dataflow.messages

             super(DecodeUnknownType, self).__init__(message)
    -
    [docs]class EncodeUnknownType(NotImplementedError): +
    [docs]class EncodeUnknownType(NotImplementedError): """ Exception to be thrown when message type is not encodable. """ def __init__(self, code, cls): message = "%s can`t be encoded into %s" \ @@ -61,7 +63,7 @@

    Source code for pyDKB.dataflow.messages

             super(EncodeUnknownType, self).__init__(message)
    -
    [docs]def Message(msg_type): +
    [docs]def Message(msg_type): """ Return class XXXMessage, where XXX is the passed type. """ if not messageType.hasMember(msg_type): raise ValueError("Message type must be a member of messageType") @@ -75,7 +77,7 @@

    Source code for pyDKB.dataflow.messages

         return cls
    -
    [docs]class AbstractMessage(object): +
    [docs]class AbstractMessage(object): """ Abstract message """ msg_type = None @@ -92,47 +94,47 @@

    Source code for pyDKB.dataflow.messages

             if type(message) in self.native_types:
                 self.decoded = message
     
    -
    [docs] def getOriginal(self): +
    [docs] def getOriginal(self): """ Return original message. """ return self.__orig
    -
    [docs] def decode(self, code): +
    [docs] def decode(self, code): """ Decode original from CODE to TYPE-specific format. Raises ValueError """ raise DecodeUnknownType(code, self.__class__)
    -
    [docs] def encode(self, code): +
    [docs] def encode(self, code): """ Encode original message from TYPE-specific format to CODE. Raises ValueError """ raise EncodeUnknownType(code, self.__class__)
    -
    [docs] @classmethod +
    [docs] @classmethod def typeName(cls): """ Return message type name as string. """ return messageType.memberName(cls.msg_type)
    -
    [docs] def content(self): +
    [docs] def content(self): """ Return message content. """ return self.decode()
    -
    [docs] @classmethod +
    [docs] @classmethod def extension(cls): """ Return file extension corresponding this message type. """ return cls._ext
    -
    [docs]class JSONMessage(AbstractMessage): +
    [docs]class JSONMessage(AbstractMessage): """ Message in JSON format. """ msg_type = messageType.JSON native_types = [dict] _ext = ".json" -
    [docs] def decode(self, code=codeType.STRING): +
    [docs] def decode(self, code=codeType.STRING): """ Decode original data as JSON. """ if not self.decoded: orig = self.getOriginal() @@ -143,7 +145,7 @@

    Source code for pyDKB.dataflow.messages

                 self.encoded = orig
             return self.decoded
    -
    [docs] def encode(self, code=codeType.STRING): +
    [docs] def encode(self, code=codeType.STRING): """ Encode JSON as CODE. """ if not self.encoded: orig = self.getOriginal() @@ -158,7 +160,7 @@

    Source code for pyDKB.dataflow.messages

     __message_class[messageType.JSON] = JSONMessage
     
     
    -
    [docs]class TTLMessage(AbstractMessage): +
    [docs]class TTLMessage(AbstractMessage): """ Messages in TTL format Single message = single TTL statement @@ -172,7 +174,7 @@

    Source code for pyDKB.dataflow.messages

     
         _ext = ".ttl"
     
    -
    [docs] def decode(self, code=codeType.STRING): +
    [docs] def decode(self, code=codeType.STRING): """ Decode original data as TTL. Currently takes text as it is. @@ -187,8 +189,8 @@

    Source code for pyDKB.dataflow.messages

                 self.encoded = orig
             return self.decoded
    -
    [docs] def encode(self, code=codeType.STRING): - """ Encode JSON as CODE. """ +
    [docs] def encode(self, code=codeType.STRING): + """ Encode TTL as CODE. """ if not self.encoded: orig = self.getOriginal() if code == codeType.STRING: @@ -208,7 +210,7 @@

    Source code for pyDKB.dataflow.messages