Skip to content

Commit

Permalink
pyDKB: fix issue with source-is-empty detection (custom EOM).
Browse files Browse the repository at this point in the history
`FileConsumer.source_is_empty()`, used to check if current (file) data
source is fully read, checked current cursor position against file size;
but when custom EOM is used, `custom_readline()` reads data into local
buffer and thus file can be fully read, while not all the values from
the buffer are processed.

To fix this issue new method `InputStream.is_empty()` is added with
different implementations: for ordinar file reading (used to work with
`'\n'` or empty EOM) and for `custom_readline()` generator.
  • Loading branch information
mgolosova committed Jul 17, 2018
1 parent 0c58cd0 commit 7031cfb
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 13 deletions.
15 changes: 13 additions & 2 deletions Utils/Dataflow/pyDKB/common/custom_readline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ def custom_readline(f, newline):
The last line can be incomplete, if the input data flow is interrupted
in the middle of data writing.
To check if iteration is over without reading next value, one may
`send(True)` to the generator.
Keyword arguments:
f -- file/stream to read
newline -- custom delimiter
Expand All @@ -24,10 +27,18 @@ def custom_readline(f, newline):
if poller.poll(500):
chunk = f.read()
if not chunk:
yield buf
if (yield buf):
# `send(True)` was called
is_empty = True
while (yield is_empty):
pass
break
buf += chunk
while newline in buf:
pos = buf.index(newline)
yield buf[:pos]
if (yield buf[:pos]):
# `send(True)` was called
is_empty = (pos + len(newline) == len(buf))
while (yield is_empty):
pass
buf = buf[pos + len(newline):]
16 changes: 15 additions & 1 deletion Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,28 @@ def init_stream(self):
.setType(self.message_type) \
.build()

def stream_is_empty(self):
""" Check if current stream is empty.
Return value:
True (empty)
False (not empty)
None (_stream is not defined or not initialized)
"""
if not self._stream:
return None
return self._stream.is_empty()

def get_stream(self):
""" Get input stream linked to the current source.
Return value:
InputStream
None (no sources left to read from)
"""
if self.reset_stream():
if not self.stream_is_empty():
result = self._stream
elif self.reset_stream():
result = self._stream
else:
result = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,12 @@ def source_is_empty(self):
Return value:
True (empty)
False (not empty)
False (not empty or stream for given source is not defined)
None (no source)
"""
f = self.current_file
if not f:
if not self.current_file:
return None
fd = f['fd']
if not fd or getattr(fd, 'closed', True):
return None
if not f.get('size'):
stat = os.fstat(fd.fileno())
f['size'] = stat.st_size
return fd.tell() == f['size']
return bool(self.stream_is_empty())

def get_source_info(self):
""" Return current source info. """
Expand Down
26 changes: 26 additions & 0 deletions Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from . import Message
from pyDKB.common import custom_readline

import os
import sys


class InputStream(Stream):
""" Implementation of the input stream. """
Expand All @@ -24,10 +27,13 @@ def _reset_iterator(self):
fd = self.get_fd()
if self.EOM == '\n':
self.__iterator = iter(fd.readline, "")
self.is_empty = self._is_fd_empty
elif self.EOM == '':
self.__iterator = iter(fd.read, "")
self.is_empty = self._is_fd_empty
else:
self.__iterator = custom_readline(fd, self.EOM)
self.is_empty = self._is_generator_empty

def reset(self, fd, close=True, force=False):
""" Reset current stream with new file descriptor.
Expand All @@ -40,6 +46,26 @@ def reset(self, fd, close=True, force=False):
if force or fd != self.get_fd():
self._reset_iterator()

def _is_unknown_empty(self):
""" Implementation of `is_empty()` for not initialized iterator. """
return None

is_empty = _is_unknown_empty

def _is_fd_empty(self):
""" Implement `is_empty()` method for read/readline iterator. """
fd = self._fd
if not fd or getattr(fd, 'closed', True):
return False
if fd.fileno() == sys.stdin.fileno():
return False
stat = os.fstat(fd.fileno())
return fd.tell() == stat.st_size

def _is_generator_empty(self):
""" Implement `is_empty()` method for generator. """
return self.__iterator.send(True)

def parse_message(self, message):
""" Verify and parse input message.
Expand Down

0 comments on commit 7031cfb

Please sign in to comment.