Skip to content

Commit

Permalink
Add pathos support
Browse files Browse the repository at this point in the history
This enables wrapping of non serializable wrapper with standard pickle.
It uses pathos which relies on dill.

Plus, it changes the maximum number of cpus to use to cpu_count() - 1.
This leaves one for the scheduler.

Also, this correct some pep8.
  • Loading branch information
Pamphile ROY authored and adumasphi committed Nov 2, 2016
1 parent cfa04fb commit 384fd49
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 32 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.eggs/
build/
dist/
doc/_generated/
*.egg-info/
*__pycache__/
*.log
5 changes: 3 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ audience. Documentation is available
`ot.NumericalMathFunction <http://doc.openturns.org/openturns-latest/sphinx/user_manual/_generated/openturns.NumericalMathFunction.html#openturns.NumericalMathFunction>`_
into a parallel wrapper using either
`multiprocessing <https://docs.python.org/2/library/multiprocessing.html>`_,
`ipyparallel <http://ipyparallel.readthedocs.io/en/latest/>`_ or
`joblib <https://pythonhosted.org/joblib/>`_.
`ipyparallel <http://ipyparallel.readthedocs.io/en/latest/>`_,
`joblib <https://pythonhosted.org/joblib/>`_ or
`pathos <https://pypi.python.org/pypi/pathos>`_.
- A set of usefull tools that simply recurrent tasks when constructing code
wrappers:

Expand Down
1 change: 1 addition & 0 deletions doc/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Dependencies
- openturns
- ipyparallel (optional)
- joblib (optional). scikit-learn comes with joblib installed (sklearn.externals.joblib)
- pathos (optional)
- pytest (optional for testing)
- sphinx and numpydoc for building the doc

Expand Down
122 changes: 92 additions & 30 deletions otwrapy/_otwrapy.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ def load_array(filename, compressed=False):
with open(filename, 'rb') as fh:
return pickle.load(fh)


def dump_array(array, filename, compress=False):
"""Dump an array to a (possibly compressed) file
"""Dump an array to a (possibly compressed) file.
Parameters
----------
Expand All @@ -66,6 +67,7 @@ def dump_array(array, filename, compress=False):
with open(filename, 'wb') as fh:
pickle.dump(array, fh, protocol=2)


def safemakedirs(folder):
"""Make a directory without raising an error if it exists.
Expand All @@ -77,8 +79,9 @@ def safemakedirs(folder):
if not os.path.exists(folder):
os.makedirs(folder)


def create_logger(logfile, loglevel=None):
"""Create a logger with a FileHandler at the given loglevel
"""Create a logger with a FileHandler at the given loglevel.
Parameters
----------
Expand Down Expand Up @@ -112,7 +115,9 @@ def create_logger(logfile, loglevel=None):

return logger


class Debug(object):

"""Decorator that catches exceptions inside a function and logs them.
A decorator used to protect functions so that exceptions are logged to a
Expand Down Expand Up @@ -171,8 +176,10 @@ def func_debugged(*args, **kwargs):

return func_debugged


class NumericalMathFunctionDecorator(object):
"""Convert an OpenTURNSPythonFunction into a NumericalMathFunction

"""Convert an OpenTURNSPythonFunction into a NumericalMathFunction.
This class is intended to be used as a decorator.
Expand Down Expand Up @@ -232,7 +239,8 @@ def numericalmathfunction(*args, **kwargs):
else:
func.__doc__ = self.doc

# Add the kwargs as attributes of the function for reference purposes.
# Add the kwargs as attributes of the function for reference
# purposes.
func.__dict__.update(kwargs)
return func
# Keep the wrapper class as reference
Expand All @@ -241,6 +249,7 @@ def numericalmathfunction(*args, **kwargs):


class TempWorkDir(object):

"""Implement a context manager that creates a temporary working directory.
Create a temporary working directory on `base_temp_work_dir` preceeded by
Expand Down Expand Up @@ -296,25 +305,28 @@ class TempWorkDir(object):
I'm back to my project directory :
/home/aguirre/otwrapy
"""

def __init__(self, base_temp_work_dir=None, prefix='run-', cleanup=False,
transfer=None):
transfer=None):
self.dirname = mkdtemp(dir=base_temp_work_dir, prefix=prefix)
self.cleanup = cleanup
self.transfer = transfer

def __enter__(self):
self.curdir = os.getcwd()
os.chdir(self.dirname)
if self.transfer is not None:
for file in self.transfer:
shutil.copy(file, self.dirname)

def __exit__(self, type, value, traceback):
os.chdir(self.curdir)
if self.cleanup:
shutil.rmtree(self.dirname)


def _exec_sample_joblib(func, n_cpus, verbosity):
"""Return a function that executes a sample in parallel using joblib
"""Return a function that executes a sample in parallel using joblib.
Parameters
----------
Expand All @@ -334,15 +346,17 @@ def _exec_sample_joblib(func, n_cpus, verbosity):
from joblib import Parallel, delayed
except ImportError:
from sklearn.externals.joblib import Parallel, delayed

def _exec_sample(X):
Y = Parallel(n_jobs=n_cpus, verbose=verbosity)(delayed(func)(x) for x in X)
Y = Parallel(n_jobs=n_cpus, verbose=verbosity)(
delayed(func)(x) for x in X)
return ot.NumericalSample(Y)

return _exec_sample


def _exec_sample_multiprocessing(func, n_cpus):
"""Return a function that executes a sample in parallel using multiprocessing
"""Return a function that executes a sample in parallel using multiprocessing.
Parameters
----------
Expand All @@ -358,8 +372,6 @@ def _exec_sample_multiprocessing(func, n_cpus):
_exec_sample : Function or callable
The parallelized funtion.
"""

import time
def _exec_sample(X):
from multiprocessing import Pool
p = Pool(processes=n_cpus)
Expand All @@ -369,8 +381,48 @@ def _exec_sample(X):
return _exec_sample


def _exec_sample_pathos(func, n_cpus):
"""Return a function that executes a sample in parallel using pathos.
Parameters
----------
func : Function or calable
A callable python object, usually a function. The function should take
an input vector as argument and return an output vector.
n_cpus : int
Number of CPUs on which to distribute the function calls.
Returns
-------
_exec_sample : Function or callable
The parallelized funtion.
"""
def _exec_sample(X):
from pathos.multiprocessing import ProcessingPool
try:
p = ProcessingPool(n_cpus)
X = np.array(X)
x = np.array_split(X, n_cpus)
pipe = []
for i in range(n_cpus):
pipe.append(p.apipe(func, x[i]))

rs = []
for i in range(n_cpus):
rs.append(pipe[i].get())

rs = [item for sublist in rs for item in sublist]

return ot.NumericalSample(rs)
except ValueError:
# Get there if the chuck size left some single evaluations left
return func(X)
return _exec_sample


def _exec_sample_ipyparallel(func, n, p):
"""Return a function that executes a sample in parallel using ipyparallel
"""Return a function that executes a sample in parallel using ipyparallel.
Parameters
----------
Expand All @@ -391,13 +443,15 @@ def _exec_sample_ipyparallel(func, n, p):
rc = ipp.Client()

return ot.PythonFunction(func_sample=lambda X:
rc[:].map_sync(func, X),
n=func.getInputDimension(),
p=func.getOutputDimension())
rc[:].map_sync(func, X),
n=func.getInputDimension(),
p=func.getOutputDimension())


@NumericalMathFunctionDecorator(enableCache=True)
class Parallelizer(ot.OpenTURNSPythonFunction):
"""Parallelize a Wrapper using 'ipyparallel', 'joblib' or 'multiprocessing'.

"""Parallelize a Wrapper using 'ipyparallel', 'joblib', pathos or 'multiprocessing'.
Parameters
----------
Expand All @@ -406,12 +460,12 @@ class Parallelizer(ot.OpenTURNSPythonFunction):
openturns wrapper to be distributed
backend : string (Optional)
Whether to parallelize using 'ipyparallel', 'joblib' or
Whether to parallelize using 'ipyparallel', 'joblib', pathos, or
'multiprocessing'.
n_cpus : int (Optional)
Number of CPUs on which the simulations will be distributed. Needed Only
if using 'joblib' or 'multiprocessing' as backend.
if using 'joblib', pathos or 'multiprocessing' as backend.
verbosity : int (Optional)
verbose parameter when using 'joblib'. Default is 10.
Expand All @@ -426,33 +480,35 @@ class Parallelizer(ot.OpenTURNSPythonFunction):
>>> model = otw.Parallelizer(Wrapper(), n_cpus=-1)
:code:`model` will distribute calls to Wrapper() using multiprocessing and
as many CPUs as you have.
as many CPUs as you have minus one for the scheduler.
Because Parallelize is decorated with :class:`NumericalMathFunctionDecorator`,
:code:`model` is already an :class:`ot.NumericalMathFunction`.
"""

def __init__(self, wrapper, backend='multiprocessing', n_cpus=-1, verbosity=10):

# -1 cpus means all available cpus
# -1 cpus means all available cpus - 1 for the scheduler
if n_cpus == -1:
import multiprocessing
n_cpus = multiprocessing.cpu_count()
n_cpus = multiprocessing.cpu_count() - 1

self.n_cpus = n_cpus
self.wrapper = wrapper
self.verbosity = verbosity
# This configures how to run single point simulations on the model :
# This configures how to run single point simulations on the model:
self._exec = self.wrapper

ot.OpenTURNSPythonFunction.__init__(self,
self.wrapper.getInputDimension(),
self.wrapper.getOutputDimension())
self.wrapper.getInputDimension(),
self.wrapper.getOutputDimension())

self.setInputDescription(self.wrapper.getInputDescription())
self.setOutputDescription(self.wrapper.getOutputDescription())

assert backend in ['ipython', 'ipyparallel', 'multiprocessing',
'joblib'], "Unknown backend"
assert backend in ['ipython', 'ipyparallel',
'multiprocessing', 'pathos',
'joblib'], "Unknown backend"

# This configures how to run samples on the model :
if self.n_cpus == 1:
Expand All @@ -477,11 +533,12 @@ def __init__(self, wrapper, backend='multiprocessing', n_cpus=-1, verbosity=10):

if ipy_backend:
self._exec_sample = _exec_sample_ipyparallel(self.wrapper,
self.getInputDimension(), self.getOutputDimension())
self.getInputDimension(),
self.getOutputDimension())
else:
logging.warn('Using multiprocessing backend instead')
self._exec_sample = _exec_sample_multiprocessing(self.wrapper,
self.n_cpus)
self.n_cpus)

elif backend == 'joblib':
# Check that joblib is installed
Expand All @@ -498,12 +555,17 @@ def __init__(self, wrapper, backend='multiprocessing', n_cpus=-1, verbosity=10):
logging.warn('joblib package missing.')

if joblib_backend:
self._exec_sample = _exec_sample_joblib(self.wrapper, self.n_cpus,
self._exec_sample = _exec_sample_joblib(self.wrapper,
self.n_cpus,
self.verbosity)
else:
logging.warn('Using multiprocessing backend instead')
self._exec_sample = _exec_sample_multiprocessing(self.wrapper,
self.n_cpus)
self.n_cpus)

elif backend == 'multiprocessing':
self._exec_sample = _exec_sample_multiprocessing(self.wrapper, self.n_cpus)
self._exec_sample = _exec_sample_multiprocessing(
self.wrapper, self.n_cpus)

elif backend == 'pathos':
self._exec_sample = _exec_sample_pathos(self.wrapper, self.n_cpus)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
extras_require = {
'joblib': ["joblib>=0.9.3"],
'ipyparallel': ["ipyparallel>=5.0.1"],
'pathos': ["pathos>=0.2.0"]
},
author="Felipe Aguirre Martinez",
author_email="[email protected]",
Expand Down

0 comments on commit 384fd49

Please sign in to comment.