Skip to content

Commit

Permalink
Merge pull request #314 from mila-udem/master
Browse files Browse the repository at this point in the history
Merge master into stable get a new stable version
  • Loading branch information
rizar committed Feb 25, 2016
2 parents 2ef42d3 + 5cb59cb commit 2349d28
Show file tree
Hide file tree
Showing 54 changed files with 3,317 additions and 258 deletions.
9 changes: 8 additions & 1 deletion .travis-data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ function download {
cd $FUEL_DATA_PATH

for dataset in "$@"; do
download $dataset
if [ "$dataset" == "ilsvrc2010" ]; then
wget "http://www.image-net.org/challenges/LSVRC/2010/download/ILSVRC2010_devkit-1.0.tar.gz"
wget "http://www.image-net.org/challenges/LSVRC/2010/ILSVRC2010_test_ground_truth.txt"
else
download $dataset
fi
done



cd -
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ before_install:
# Setup Python environment with BLAS libraries
- wget -q http://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh -O miniconda.sh
- chmod +x miniconda.sh
- ./miniconda.sh -b
- ./miniconda.sh -b -p $HOME/miniconda
- export PATH=$HOME/miniconda/bin:$PATH
- conda update -q --yes conda
- export FUEL_DATA_PATH=$TRAVIS_BUILD_DIR/data
Expand All @@ -38,7 +38,7 @@ install:
python setup.py build_ext --inplace
fi
script:
- ./.travis-data.sh adult mnist binarized_mnist "caltech101_silhouettes 16" cifar10 cifar100 iris
- ./.travis-data.sh adult mnist binarized_mnist "caltech101_silhouettes 16" cifar10 cifar100 iris ilsvrc2010
- function fail { export FAILED=1; }
- |
if [[ $TESTS == 'blocks' ]]; then
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Welcome to Fuel's documentation!
h5py_dataset
new_dataset
extending_fuel
server
api/index

Fuel is a data pipeline framework which provides your machine learning models
Expand Down
252 changes: 252 additions & 0 deletions docs/server.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
Parallelizing data processing
=============================

Here's a scenario that is commonly encountered in practice: a big model is
trained on a large dataset that doesn't fit in memory (e.g. a deep convolutional
neural network trained on ImageNet) using a GPU to accelerate training.

In that case, doing data processing and training in a single process is very
inefficient: the GPU is idle when data is read off disk and processed, and
nothing else is done while the GPU is at work.

An obvious solution is to do the preprocessing and training in parallel: if I/O
operations are executed while the GPU is busy, then less time is wasted waiting
for data to be available.

In this section, we'll cover how to spawn a data processing server in a separate
process and how to connect to that server and consume that data in a training
script.

Toy example
-----------

Let's first create a dummy dataset:

>>> from fuel.datasets import IndexableDataset
>>> dataset = IndexableDataset({'features': [[0] * 128] * 1000})

In practice, the dataset can be whatever you want, but we'll settle with that
for simplicity.

Since this is a pretty small dataset, we'll need to simulate slowdowns
associated with I/O operations and preprocessing. We'll create a transformer
whose sole purpose is to wait some period of time before returning the requested
data:

>>> import time
>>> from fuel.transformers import Transformer
>>> class Bottleneck(Transformer):
... def __init__(self, *args, **kwargs):
... self.slowdown = kwargs.pop('slowdown', 0)
... super(Bottleneck, self).__init__(*args, **kwargs)
...
... def get_data(self, request=None):
... if request is not None:
... raise ValueError
... time.sleep(self.slowdown)
... return next(self.child_epoch_iterator)

We'll also create a context manager to time a block of code and print the
result:

>>> from contextlib import contextmanager
>>> @contextmanager
... def timer(name):
... start_time = time.time()
... yield
... stop_time = time.time()
... print('{} took {} seconds'.format(name, stop_time - start_time))

Let's see how much of a slowdown we're incurring in our current setup:

>>> from fuel.schemes import ShuffledScheme
>>> from fuel.streams import DataStream
>>> iteration_scheme = ShuffledScheme(examples=1000, batch_size=100)
>>> data_stream = Bottleneck(
... data_stream=DataStream.default_stream(
... dataset=dataset, iteration_scheme=iteration_scheme),
... slowdown=0.005)
>>> with timer('Iteration'): # doctest: +ELLIPSIS
... for data in data_stream.get_epoch_iterator(): pass
Iteration took ... seconds

Next, we'll write a small piece of code that simulates some computation being
done on our data. Let's pretend that we're training for 5 epochs and that
training on a batch takes a fixed amount of time.

>>> with timer('Training'): # doctest: +ELLIPSIS
... for i in range(5):
... for data in data_stream.get_epoch_iterator(): time.sleep(0.01)
Training took ... seconds

Take note of the time it takes: we'll cut that down with the data processing
server.

Data processing server
----------------------

Fuel features a :func:`~.server.start_server` built-in function which takes a
data stream as input and sets up a data processing server that iterates over
this data stream. The function signature is the following:

.. code-block:: python
def start_server(data_stream, port=5557, hwm=10):
The ``data_stream`` argument is self-explanatory. The port the server listens to
defaults to 5557 but can be changed through the ``port`` argument. The ``hwm``
argument controls the high-water mark, which loosely translates to the buffer
size. The default value of 10 usually works well. Increasing this increases the
buffer, which can be useful if your data preprocessing times are very random.
However, it will increase memory usage. Be sure to set the corresponding
high-water mark on the client as well.

A client can then connect to that server and request data. The
:class:`~.streams.ServerDataStream` class is what we want to use. Its
``__init__`` method has the following signature:

.. code-block:: python
def __init__(self, sources, host='localhost', port=5557, hwm=10):
The ``sources`` argument is how you communicate source names to the data stream.
It's expected to be a tuple of strings with as many elements as there are
sources that will be received. The ``host`` and ``port`` arguments are used to
specify where to connect to the data processing server. Note that this allows
you to run the server on a completely different machine! The ``hwm`` argument
should mirror what you passed to :func:`start_server`.

Putting it together
-------------------

You'll need to separate your code in two files: one that spawns a data
processing server and one that handles the training loop.

Here's those two files:

.. code-block:: python
"""server.py"""
import time
from fuel.datasets import IndexableDataset
from fuel.schemes import ShuffledScheme
from fuel.server import start_server
from fuel.streams import DataStream
from fuel.transformers import Transformer
class Bottleneck(Transformer):
"""Waits every time data is requested to simulate a bottleneck.
Parameters
----------
slowdown : float, optional
Time (in seconds) to wait before returning data. Defaults to 0.
"""
def __init__(self, *args, **kwargs):
self.slowdown = kwargs.pop('slowdown', 0)
super(Bottleneck, self).__init__(*args, **kwargs)
def get_data(self, request=None):
if request is not None:
raise ValueError
time.sleep(self.slowdown)
return next(self.child_epoch_iterator)
def create_data_stream(slowdown=0):
"""Creates a bottlenecked data stream of dummy data.
Parameters
----------
slowdown : float
Time (in seconds) to wait each time data is requested.
Returns
-------
data_stream : fuel.streams.AbstactDataStream
Bottlenecked data stream.
"""
dataset = IndexableDataset({'features': [[0] * 128] * 1000})
iteration_scheme = ShuffledScheme(examples=1000, batch_size=100)
data_stream = Bottleneck(
data_stream=DataStream.default_stream(
dataset=dataset, iteration_scheme=iteration_scheme),
slowdown=slowdown)
return data_stream
if __name__ == "__main__":
start_server(create_data_stream(0.005))
.. code-block:: python
"""train.py"""
import argparse
import time
from contextlib import contextmanager
from fuel.streams import ServerDataStream
from server import create_data_stream
@contextmanager
def timer(name):
"""Times a block of code and prints the result.
Parameters
----------
name : str
What this block of code represents.
"""
start_time = time.time()
yield
stop_time = time.time()
print('{} took {} seconds'.format(name, stop_time - start_time))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
'-p', '--parallel', action='store_true',
help='run data preprocessing in a separate process')
args = parser.parse_args()
if args.parallel:
data_stream = ServerDataStream(('features',))
else:
data_stream = create_data_stream(0.005)
with timer('Training'):
for i in range(5):
for data in data_stream.get_epoch_iterator(): time.sleep(0.01)
We've modularized the code to be a little more convenient to re-use. Save the
two files in the same directory and type

.. code-block:: bash
$ python train.py
This will run the training and the data processing in the same process.

Now, type

.. code-block:: bash
$ python server.py
in a separate terminal window and type

.. code-block:: bash
$ python train.py -p
Compare the two running times: you should see a clear gain using the
separate data processing server.
10 changes: 9 additions & 1 deletion fuel/converters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,27 @@
from fuel.converters import adult
from fuel.converters import binarized_mnist
from fuel.converters import caltech101_silhouettes
from fuel.converters import celeba
from fuel.converters import cifar10
from fuel.converters import cifar100
from fuel.converters import dogs_vs_cats
from fuel.converters import iris
from fuel.converters import mnist
from fuel.converters import svhn
from fuel.converters import ilsvrc2010
from fuel.converters import youtube_audio

__version__ = '0.2'
all_converters = (
('adult', adult.fill_subparser),
('binarized_mnist', binarized_mnist.fill_subparser),
('caltech101_silhouettes', caltech101_silhouettes.fill_subparser),
('celeba', celeba.fill_subparser),
('cifar10', cifar10.fill_subparser),
('cifar100', cifar100.fill_subparser),
('dogs_vs_cats', dogs_vs_cats.fill_subparser),
('iris', iris.fill_subparser),
('mnist', mnist.fill_subparser),
('svhn', svhn.fill_subparser))
('svhn', svhn.fill_subparser),
('ilsvrc2010', ilsvrc2010.fill_subparser),
('youtube_audio', youtube_audio.fill_subparser))
2 changes: 1 addition & 1 deletion fuel/converters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def progress_bar(name, maxval, prefix='Converting'):
"""
widgets = ['{} {}: '.format(prefix, name), Percentage(), ' ',
Bar(marker='=', left='[', right=']'), ' ', ETA()]
bar = ProgressBar(widgets=widgets, maxval=maxval, fd=sys.stdout).start()
bar = ProgressBar(widgets=widgets, max_value=maxval, fd=sys.stdout).start()
try:
yield bar
finally:
Expand Down
Loading

0 comments on commit 2349d28

Please sign in to comment.