Skip to content

Commit

Permalink
JP-2980 Documentation improvement for multiprocessing: a script which…
Browse files Browse the repository at this point in the history
… spawns processes on import will cause system failure (spacetelescope#8408)

Co-authored-by: Ned Molter <[email protected]>
Co-authored-by: Howard Bushouse <[email protected]>
Co-authored-by: Brett Graham <[email protected]>
  • Loading branch information
4 people authored Apr 22, 2024
1 parent d57f633 commit b4c490b
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ documentation

- Added docs for the NIRSpec MSA metadata file to the data products area of RTD.
[#8399]

- Added documentation for multiprocessing. [#8408]

extract_1d
----------
Expand Down
160 changes: 160 additions & 0 deletions docs/jwst/user_documentation/running_pipeline_python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -529,3 +529,163 @@ individual step parameter must be set when using this method, or else the coded
defaults will be used, which may be inappropriate for the dataset being processed.

See :ref:`call_examples` for more information.


.. _multiprocessing:

Multiprocessing
===============

Multiprocessing is supported to speed up certain computationally-intensive steps
in the pipeline, including the :ref:`jump detection <jump_step>`,
:ref:`ramp fitting <ramp_fitting_step>`, and
:ref:`WFSS contamination correction <wfss_contam_step>` steps. The examples below show how
multiprocessing can be enabled for these steps, as well as how to set up
multiprocessing to simultaneously run the entire pipeline on multiple observations.

Since the pipeline uses multiprocessing it is critical that any code using the pipeline adhere
to the guidelines described in the
`python multiprocessing documentation <https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming>`_.
The pipeline uses the `forkserver` start method internally and it is recommended that any
multiprocessing scripts that use the pipline use the same start. As detailed in the
`python documentation <https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods>`_
this will require that code be "protected" with a ``if __name__ == '__main__':`` check as follows

::

if __name__ = '__main__':
[code used in multiprocessing]


There are a couple of scenarios to use multiprocessing with the pipeline:

1. Multiprocessing within a pipeline step. At the moment, the steps that
support this are the :ref:`jump <jump_step>`,
:ref:`ramp_fitting <ramp_fitting_step>`,
and :ref:`wfss_contam <wfss_contam_step>` steps. To enable multiprocessing, the
optional parameter is `maximum_cores` for the ``jump``, ``ramp_fitting``, and
``wfss_contam`` steps. This parameter can be set to a numerical value given
as a string or it can be set to the words `quarter`, `half`, `all`,
or `none`, which is the default value.

The following example turns on a step's multiprocessing option. Notice only
one of the steps has multiprocessing turned on. We do not recommend
simultaneously enabling both steps to do multiprocessing, as this may likely
lead to running out of system memory.



::

# SampleScript1

import os, sys
from jwst.pipeline import Detector1Pipeline

uncal_file = 'jw0000_0000_uncal.fits'
output_dir = '/my_project'

def main():
det1 = Detector1Pipeline()
parameter_dict = {"ramp_fit": {"maximum_cores": 'all'}}
det1.call(uncal_file, save_results=True, steps=parameter_dict, output_dir=output_dir)

if __name__ = '__main__':
sys.exit(main())


2. Calling the pipeline using multiprocessing. The following example uses this
option setting up a log file for each run of the pipeline and a text file with
the full traceback in case there is a crash. Notice that the ``import`` statement
of the pipeline is within the multiprocessing block that gets called by every
worker. This is to avoid a known memory leak.


::

# SampleScript2

import os, sys
import traceback
import configparser
import multiprocessing
from glob import glob

def mk_stpipe_log_cfg(output_dir, log_name):
"""
Create a configuration file with the name log_name, where
the pipeline will write all output.
Args:
outpur_dir: str, path of the output directory
log_name: str, name of the log to record screen output
Returns:
nothing
"""
config = configparser.ConfigParser()
config.add_section("*")
config.set("*", "handler", "file:" + log_name)
config.set("*", "level", "INFO")
pipe_log_config = os.path.join(output_dir, "pipeline-log.cfg")
config.write(open(pipe_log_config, "w"))

def run_det1(uncal_file, output_dir):
"""
Run the Detector1 pipeline on the given file.
Args:
uncal_file: str, name of uncalibrated file to run
outpur_dir: str, path of the output directory
Returns:
nothing
"""
log_name = os.path.basename(uncal_file).replace('.fits', '')
mk_stpipe_log_cfg(output_dir, log_name+'.log')
from jwst.pipeline.calwebb_detector1 import Detector1Pipeline
pipe_success = False
try:
det1 = Detector1Pipeline()
det1.call(uncal_file, output_dir=output_dir, logcfg="pipeline-log.cfg", save_results=True)
pipe_success = True
print('\n * Pipeline finished for file: ', uncal_file, ' \n')
except Exception:
print('\n *** OH NO! The detector1 pipeline crashed! *** \n')
pipe_crash_msg = traceback.print_exc()
if not pipe_success:
crashfile = open(log_name+'_pipecrash.txt', 'w')
print('Printing file with full traceback')
print(pipe_crash_msg, file=crashfile)

def main():
input_data_dir = '/my_project_dir'
output_dir = input_data_dir

# get the files to run
files_to_run = glob(os.path.join(input_data_dir, '*_uncal.fits'))
print('Will run the pipeline on {} files'.format(len(files_to_run)))

# the output list should be the same length as the files to run
outptd = [output_dir for _ in range(len(files_to_run))]

# get the cores to use
cores2use = int(os.cpu_count()/2) # half of all available cores
print('* Using ', cores2use, ' cores for multiprocessing.')

# set the pool and run multiprocess
with multiprocessing.Pool(cores2use) as pool:
pool.starmap(run_det1, zip(files_to_run, outptd))

print('\n * Finished multiprocessing! \n')

if __name__ == '__main__':
sys.exit(main())


.. warning::
Although it is technically possible to call the pipeline with
multiprocessing while also enabling this option in a step, we
strongly recommend not to do this. This scenario would be the same as
`SampleScript2` except with adding and calling the parameter dictionary
`parameter_dict` in `SampleScript1`. However, Python will crash
if both multiprocessing options are set to use all the cores or even
less, because it is not permitted that a worker has children processes.
We recommend not enabling step multiprocessing for parallel pipeline
runs to avoid potentially running out of memory.

0 comments on commit b4c490b

Please sign in to comment.