From 7031cfbc862b5094d247ff97846962faab7556ba Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Tue, 17 Jul 2018 13:42:25 +0300 Subject: [PATCH] pyDKB: fix issue with source-is-empty detection (custom EOM). `FileConsumer.source_is_empty()`, used to check if current (file) data source is fully read, checked current cursor position against file size; but when custom EOM is used, `custom_readline()` reads data into local buffer and thus file can be fully read, while not all the values from the buffer are processed. To fix this issue new method `InputStream.is_empty()` is added with different implementations: for ordinar file reading (used to work with `'\n'` or empty EOM) and for `custom_readline()` generator. --- .../Dataflow/pyDKB/common/custom_readline.py | 15 +++++++++-- .../communication/consumer/Consumer.py | 16 +++++++++++- .../communication/consumer/FileConsumer.py | 13 +++------- .../communication/stream/InputStream.py | 26 +++++++++++++++++++ 4 files changed, 57 insertions(+), 13 deletions(-) diff --git a/Utils/Dataflow/pyDKB/common/custom_readline.py b/Utils/Dataflow/pyDKB/common/custom_readline.py index b354d3e0a..fc3f53c15 100644 --- a/Utils/Dataflow/pyDKB/common/custom_readline.py +++ b/Utils/Dataflow/pyDKB/common/custom_readline.py @@ -11,6 +11,9 @@ def custom_readline(f, newline): The last line can be incomplete, if the input data flow is interrupted in the middle of data writing. + To check if iteration is over without reading next value, one may + `send(True)` to the generator. + Keyword arguments: f -- file/stream to read newline -- custom delimiter @@ -24,10 +27,18 @@ def custom_readline(f, newline): if poller.poll(500): chunk = f.read() if not chunk: - yield buf + if (yield buf): + # `send(True)` was called + is_empty = True + while (yield is_empty): + pass break buf += chunk while newline in buf: pos = buf.index(newline) - yield buf[:pos] + if (yield buf[:pos]): + # `send(True)` was called + is_empty = (pos + len(newline) == len(buf)) + while (yield is_empty): + pass buf = buf[pos + len(newline):] diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py index 5abd3dcee..9c9af47cd 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py @@ -69,6 +69,18 @@ def init_stream(self): .setType(self.message_type) \ .build() + def stream_is_empty(self): + """ Check if current stream is empty. + + Return value: + True (empty) + False (not empty) + None (_stream is not defined or not initialized) + """ + if not self._stream: + return None + return self._stream.is_empty() + def get_stream(self): """ Get input stream linked to the current source. @@ -76,7 +88,9 @@ def get_stream(self): InputStream None (no sources left to read from) """ - if self.reset_stream(): + if not self.stream_is_empty(): + result = self._stream + elif self.reset_stream(): result = self._stream else: result = None diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py index d014a9679..4df710dcb 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py @@ -42,19 +42,12 @@ def source_is_empty(self): Return value: True (empty) - False (not empty) + False (not empty or stream for given source is not defined) None (no source) """ - f = self.current_file - if not f: + if not self.current_file: return None - fd = f['fd'] - if not fd or getattr(fd, 'closed', True): - return None - if not f.get('size'): - stat = os.fstat(fd.fileno()) - f['size'] = stat.st_size - return fd.tell() == f['size'] + return bool(self.stream_is_empty()) def get_source_info(self): """ Return current source info. """ diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py index 9f9739eb0..b3162eb10 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py @@ -8,6 +8,9 @@ from . import Message from pyDKB.common import custom_readline +import os +import sys + class InputStream(Stream): """ Implementation of the input stream. """ @@ -24,10 +27,13 @@ def _reset_iterator(self): fd = self.get_fd() if self.EOM == '\n': self.__iterator = iter(fd.readline, "") + self.is_empty = self._is_fd_empty elif self.EOM == '': self.__iterator = iter(fd.read, "") + self.is_empty = self._is_fd_empty else: self.__iterator = custom_readline(fd, self.EOM) + self.is_empty = self._is_generator_empty def reset(self, fd, close=True, force=False): """ Reset current stream with new file descriptor. @@ -40,6 +46,26 @@ def reset(self, fd, close=True, force=False): if force or fd != self.get_fd(): self._reset_iterator() + def _is_unknown_empty(self): + """ Implementation of `is_empty()` for not initialized iterator. """ + return None + + is_empty = _is_unknown_empty + + def _is_fd_empty(self): + """ Implement `is_empty()` method for read/readline iterator. """ + fd = self._fd + if not fd or getattr(fd, 'closed', True): + return False + if fd.fileno() == sys.stdin.fileno(): + return False + stat = os.fstat(fd.fileno()) + return fd.tell() == stat.st_size + + def _is_generator_empty(self): + """ Implement `is_empty()` method for generator. """ + return self.__iterator.send(True) + def parse_message(self, message): """ Verify and parse input message.