[docs]defmovefile(fname,dest):
+ """ Move local file to HDFS. """
+ ifos.path.exists(fname):
+ putfile(fname,dest)
+ try:
+ os.remove(fname)
+ exceptOSError,err:
+ sys.stderr.write("(WARN) Failed to remove local copy of HDFS file"
+ " (%s): %s"%(fname,err))
+
+
[docs]defgetfile(fname):""" Download file from HDFS. Return value: file name (without directory) """cmd=["hadoop","fs","-get",fname]
- name=os.path.basename(fname)
+ name=basename(fname)try:proc=subprocess.Popen(cmd,stdin=subprocess.PIPE,
@@ -124,6 +137,32 @@
Source code for pyDKB.common.hdfs
returnname
+
[docs]defFile(fname):
+ """ Get and open temporary local copy of HDFS file
+
+ Return value: open file object (TemporaryFile).
+ """
+ cmd=["hadoop","fs","-cat",fname]
+ tmp_file=tempfile.TemporaryFile()
+ try:
+ proc=subprocess.Popen(cmd,
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ stdout=tmp_file)
+ check_stderr(proc)
+ tmp_file.seek(0)
+ except(subprocess.CalledProcessError,OSError),err:
+ ifisinstance(err,subprocess.CalledProcessError):
+ err.cmd=' '.join(cmd)
+ tmp_file.close()
+ raiseHDFSException("Failed to get file from HDFS: %s\n"
+ "Error message: %s\n"%(fname,err))
+ iftmp_file.closed:
+ returnNone
+
+ returntmp_file
+
+
[docs]deflistdir(dirname,mode='a'):""" List files and/or subdirectories of HDFS directory.
@@ -172,7 +211,7 @@
Source code for pyDKB.common.hdfs
# We need to return only the name of the file or subdirfilename=line[7]
- filename=os.path.basename(filename)
+ filename=basename(filename)ifline[0][0]=='d':subdirs.append(filename)elifline[0][0]=='-':
@@ -186,6 +225,29 @@
Source code for pyDKB.common.hdfs
result=subdirsreturnresult
+
+
+
[docs]defbasename(path):
+ """ Return file name without path. """
+ ifpathisNone:
+ path=''
+ returnpath.basename(path).strip()
[docs]defget_source_info(self):
+ """ Return current source info. """
+ raiseNotImplementedError
+
+
[docs]defget_message(self):
+ """ Get new message from current source.
+
+ Return values:
+ Message object
+ False (failed to parse message)
+ None (all input sources are empty)
+ """
+ s=self.get_stream()
+ ifnots:
+ msg=None
+ else:
+ msg=s.get_message()
+ returnmsg
+
+
[docs]defnext(self):
+ """ Return new Message, read from input stream. """
+ msg=self.get_message()
+ ifmsgisNone:
+ raiseStopIteration
+ returnmsg
+
+
[docs]defclose(self):
+ """ Close opened data stream and data source. """
+ forsin(self.get_stream(),self.get_source()):
+ ifsandnotgetattr(s,'closed',True):
+ s.close()
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/StreamConsumer.html b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/StreamConsumer.html
new file mode 100644
index 000000000..d17cff674
--- /dev/null
+++ b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/StreamConsumer.html
@@ -0,0 +1,140 @@
+
+
+
+
+
+
+
+ pyDKB.dataflow.communication.consumer.StreamConsumer — Data Knowledge Base documentation
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Source code for pyDKB.dataflow.communication.consumer.StreamConsumer
+"""
+pyDKB.dataflow.communication.consumer.StreamConsumer
+
+Data consumer implementation for a single stream.
+
+TODO: think about multiple streams (like a number of named
+ pipes, etc). Prehaps, even merge this class with FileConsumer.
+"""
+
+importsys
+importos
+
+importConsumer
+from.importDataflowException
+from.importlogLevel
+
+
+
[docs]classStreamConsumer(Consumer.Consumer):
+ """ Data consumer implementation for Stream data source. """
+
+ fd=None
+
+ # Override
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Docs/build/html/_modules/pyDKB/dataflow/messages.html b/Docs/build/html/_modules/pyDKB/dataflow/communication/messages.html
similarity index 74%
rename from Docs/build/html/_modules/pyDKB/dataflow/messages.html
rename to Docs/build/html/_modules/pyDKB/dataflow/communication/messages.html
index d1d02e31e..6f6aa449d 100644
--- a/Docs/build/html/_modules/pyDKB/dataflow/messages.html
+++ b/Docs/build/html/_modules/pyDKB/dataflow/communication/messages.html
@@ -6,17 +6,17 @@
- pyDKB.dataflow.messages — Data Knowledge Base documentation
-
-
-
-
-
-
-
-
+ pyDKB.dataflow.communication.messages — Data Knowledge Base documentation
+
+
+
+
+
+
+
+
-
+
@@ -31,13 +31,15 @@
-
Source code for pyDKB.dataflow.messages
+
Source code for pyDKB.dataflow.communication.messages
"""
+pyDKB.dataflow.communication.messages
+
Definition of abstract message class and specific message classes"""from.importmessageType
-frompyDKB.dataflow.typesimportcodeType
+from.importcodeTypeimportjsonimportsys
@@ -45,7 +47,7 @@
[docs]classDecodeUnknownType(NotImplementedError):""" Exception to be thrown when message type is not decodable. """def__init__(self,code,cls):message="%s can`t be decoded from %s" \
@@ -53,7 +55,7 @@
[docs]classEncodeUnknownType(NotImplementedError):""" Exception to be thrown when message type is not encodable. """def__init__(self,code,cls):message="%s can`t be encoded into %s" \
@@ -61,7 +63,7 @@
[docs]defMessage(msg_type):""" Return class XXXMessage, where XXX is the passed type. """ifnotmessageType.hasMember(msg_type):raiseValueError("Message type must be a member of messageType")
@@ -75,7 +77,7 @@
[docs]defencode(self,code):""" Encode original message from TYPE-specific format to CODE. Raises ValueError """raiseEncodeUnknownType(code,self.__class__)