From 754785f18ea57275e21c197529068e72852d7647 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Wed, 31 Jul 2024 11:38:07 +0200 Subject: [PATCH 1/2] enh: prepare code for easy parallelization with a process pool executor Resolves: #214. --- nitransforms/resampling.py | 41 +++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/nitransforms/resampling.py b/nitransforms/resampling.py index abfe2b71..bb1bb309 100644 --- a/nitransforms/resampling.py +++ b/nitransforms/resampling.py @@ -8,6 +8,7 @@ ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## """Resampling utilities.""" +from functools import partial from pathlib import Path import numpy as np from nibabel.loadsave import load as _nbload @@ -135,33 +136,37 @@ def apply( else None ) - # Order F ensures individual volumes are contiguous in memory - # Also matches NIfTI, making final save more efficient - resampled = np.zeros( - (len(ref_ndcoords), len(transform)), dtype=input_dtype, order="F" + map_coordinates = partial( + ndi.map_coordinates, + order=order, + mode=mode, + cval=cval, + prefilter=prefilter, ) - for t in range(n_resamplings): - xfm_t = transform if n_resamplings == 1 else transform[t] + def _apply_volume(index, data, transform, targets=None): + xfm_t = transform if n_resamplings == 1 else transform[index] if targets is None: targets = ImageGrid(spatialimage).index( # data should be an image _as_homogeneous(xfm_t.map(ref_ndcoords), dim=_ref.ndim) ) - # Interpolate - resampled[..., t] = ndi.map_coordinates( - ( - data - if data is not None - else spatialimage.dataobj[..., t].astype(input_dtype, copy=False) - ), - targets, - order=order, - mode=mode, - cval=cval, - prefilter=prefilter, + data_t = ( + data + if data is not None + else spatialimage.dataobj[..., index].astype(input_dtype, copy=False) ) + return map_coordinates(data_t, targets) + + # Order F ensures individual volumes are contiguous in memory + # Also matches NIfTI, making final save more efficient + resampled = np.zeros( + (len(ref_ndcoords), len(transform)), dtype=input_dtype, order="F" + ) + for t in range(n_resamplings): + # Interpolate + resampled[..., t] = _apply_volume(t, data, transform, targets=targets) else: data = np.asanyarray(spatialimage.dataobj, dtype=input_dtype) From 38bb388374fcb900cde1ff966e58cad66658ff0d Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Wed, 31 Jul 2024 11:44:17 +0200 Subject: [PATCH 2/2] enh: create process pool --- nitransforms/resampling.py | 83 +++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 27 deletions(-) diff --git a/nitransforms/resampling.py b/nitransforms/resampling.py index bb1bb309..1b76dba1 100644 --- a/nitransforms/resampling.py +++ b/nitransforms/resampling.py @@ -8,7 +8,8 @@ ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## """Resampling utilities.""" -from functools import partial +from os import cpu_count +from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path import numpy as np from nibabel.loadsave import load as _nbload @@ -26,6 +27,25 @@ """Minimum number of volumes to automatically serialize 4D transforms.""" +def _apply_volume( + index, + data, + targets, + order=3, + mode="constant", + cval=0.0, + prefilter=True, +): + return index, ndi.map_coordinates( + data, + targets, + order=order, + mode=mode, + cval=cval, + prefilter=prefilter, + ) + + def apply( transform, spatialimage, @@ -136,38 +156,47 @@ def apply( else None ) - map_coordinates = partial( - ndi.map_coordinates, - order=order, - mode=mode, - cval=cval, - prefilter=prefilter, - ) + if njobs is None: + njobs = cpu_count() + + with ProcessPoolExecutor(max_workers=min(njobs, n_resamplings)) as executor: + results = [] + for t in range(n_resamplings): + xfm_t = transform if n_resamplings == 1 else transform[t] - def _apply_volume(index, data, transform, targets=None): - xfm_t = transform if n_resamplings == 1 else transform[index] + if targets is None: + targets = ImageGrid(spatialimage).index( # data should be an image + _as_homogeneous(xfm_t.map(ref_ndcoords), dim=_ref.ndim) + ) - if targets is None: - targets = ImageGrid(spatialimage).index( # data should be an image - _as_homogeneous(xfm_t.map(ref_ndcoords), dim=_ref.ndim) + data_t = ( + data + if data is not None + else spatialimage.dataobj[..., t].astype(input_dtype, copy=False) ) - data_t = ( - data - if data is not None - else spatialimage.dataobj[..., index].astype(input_dtype, copy=False) - ) - return map_coordinates(data_t, targets) + results.append( + executor.submit( + _apply_volume, + t, + data_t, + targets, + order=order, + mode=mode, + cval=cval, + prefilter=prefilter, + ) + ) - # Order F ensures individual volumes are contiguous in memory - # Also matches NIfTI, making final save more efficient - resampled = np.zeros( - (len(ref_ndcoords), len(transform)), dtype=input_dtype, order="F" - ) - for t in range(n_resamplings): - # Interpolate - resampled[..., t] = _apply_volume(t, data, transform, targets=targets) + # Order F ensures individual volumes are contiguous in memory + # Also matches NIfTI, making final save more efficient + resampled = np.zeros( + (len(ref_ndcoords), len(transform)), dtype=input_dtype, order="F" + ) + for future in as_completed(results): + t, resampled_t = future.result() + resampled[..., t] = resampled_t else: data = np.asanyarray(spatialimage.dataobj, dtype=input_dtype)