-
Notifications
You must be signed in to change notification settings - Fork 268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow multiple clients to use the same Transformer stream #288
base: master
Are you sure you want to change the base?
Conversation
@@ -407,6 +407,13 @@ dataset in the order defined by the iteration scheme. | |||
(4, 2, 2) (4, 1) | |||
(4, 2, 2) (4, 1) | |||
|
|||
.. warning:: | |||
|
|||
:class:`DataStream` assumes that there is only one consumer of its |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One consumer per epoch iterator or one consumer per stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per stream. I'll clarify that.
Just wondering if I'm missing anything, but data streams should behave like any other iterator in Python, right? Or does "undefined" behaviour mean something different here? |
What I'm trying to convey here is that users should not use data streams for more than one purpose, e.g. training and monitoring with the same data stream. Even though data streams behave like any other iterator in Python, I don't think transformers behave as one would expect, as demonstrated by the following piece of code: import numpy
from fuel.datasets import IndexableDataset
from fuel.streams import DataStream
from fuel.schemes import SequentialScheme
from fuel.transformers import Mapping
def run(bug=True):
constructor = DataStream.default_stream if bug else DataStream
dataset = IndexableDataset(numpy.arange(100))
dataset.default_transformers = ((Mapping, [lambda d: (2 * d[0],)], dict()),)
data_stream = constructor(dataset, iteration_scheme=SequentialScheme(100, 10))
iterator_1 = data_stream.get_epoch_iterator()
iterator_2 = data_stream.get_epoch_iterator()
for batch in iterator_2:
pass
next(iterator_1) The problem is I've seen many people get bit by this (myself included), and I think a proper warning in the documentation would be a good thing. |
It seems like the |
db8e434
to
19a1d09
Compare
@dwf @bartvm I decided to go ahead and fix this once and for all. I implemented a new This has the effect of moving ownership of the child epoch iterator from the stream to the iterator. Now that transformers are stateless, we can finally safely re-use data streams at multiple places in the code. This means no more weird |
Hold on guys, what will happen if the user creates two iterators from a transformer that uses a stateful data stream, like e.g. a datastream over a |
@rizar I suppose it's a question of whether we want the stream object or the iterator object to be stateful, or both. Often, the situation is straightforward enough that the latter is clearly preferable. There are situations where it's not, though, see #310. It seems like there's value in being able to separate a (stateless) description of a pipeline with the act of sucking things through that pipeline. |
There are other stateful transformers, such as the Perhaps on the long term the solution is to take the same approach as we do with datasets, storing the state of a pipeline separately from the actual transformer instances. You could imagine a single object, owned by the iterator, holding the state of all the transformers. When you call Did that make sense? |
Yes, it does. It's unclear to me why you'd want a single state object and not want to encapsulate it as one per transformer though, since you already have such state objects: the iterator. So the iterator, when querying the stream object, passes a state object (call it a "context", maybe, since that's what people call containers of stateful nonsense with no better descriptor) that consists of the child iterator that it's receiving data from and whatever state the stream object needs to do its thing. Anyway, this PR clearly doesn't quite fit the bill. |
I think we're talking about the same thing, but just to clarify: class DataIterator(six.Iterator):
def __init__(self, data_stream):
self.data_stream = data_stream
self.context = defaultdict(lambda: defaultdict(dict))
def __next__(self):
data = self.data_stream.get_data(self.context) And then stateful transformers and streams do something like: class StatefulTransformer(Transformer):
def open(self, context):
context['file'] = open('some_file.txt')
def get_data(self, context, request=None):
if not context[self]:
self.open(context[self])
return context['file'].readline() |
I think I understand what you're getting at. I'll try to come up with a tentative implementation in the near future. |
Fixes #106.