diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 5afea316c2..d9c066a795 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -822,8 +822,9 @@ def update(self, **opts): self.inputs.update(**opts) def is_gpu_node(self): - return ((hasattr(self.inputs, 'use_cuda') and self.inputs.use_cuda) - or (hasattr(self.inputs, 'use_gpu') and self.inputs.use_gpu)) + return (hasattr(self.inputs, 'use_cuda') and self.inputs.use_cuda) or ( + hasattr(self.inputs, 'use_gpu') and self.inputs.use_gpu + ) class JoinNode(Node): diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 8213c6c821..9aec6ae072 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -134,13 +134,14 @@ def __init__(self, plugin_args=None): # GPU found on system self.n_gpus_visible = MultiProcPlugin.gpu_count() # proc per GPU set by user - self.n_gpu_procs = plugin_args.get('n_gpu_procs', self.n_gpus_visible) + self.n_gpu_procs = self.plugin_args.get('n_gpu_procs', self.n_gpus_visible) # total no. of processes allowed on all gpus if self.n_gpu_procs > self.n_gpus_visible: logger.info( - 'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!' % ( - self.n_gpu_procs, self.n_gpus_visible)) + 'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!' + % (self.n_gpu_procs, self.n_gpus_visible) + ) # Instantiate different thread pools for non-daemon processes logger.debug( @@ -220,9 +221,7 @@ def _prerun_check(self, graph): if self.raise_insufficient: raise RuntimeError("Insufficient resources available for job") if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs): - logger.warning( - 'Nodes demand more GPU than allowed (%d).', - self.n_gpu_procs) + logger.warning('Nodes demand more GPU than allowed (%d).', self.n_gpu_procs) if self.raise_insufficient: raise RuntimeError('Insufficient GPU resources available for job') @@ -257,7 +256,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): ) # Check available resources by summing all threads and memory used - free_memory_gb, free_processors, free_gpu_slots = self._check_resources(self.pending_tasks) + free_memory_gb, free_processors, free_gpu_slots = self._check_resources( + self.pending_tasks + ) stats = ( len(self.pending_tasks), @@ -267,7 +268,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): free_processors, self.processors, free_gpu_slots, - self.n_gpu_procs + self.n_gpu_procs, ) if self._stats != stats: tasks_list_msg = "" @@ -338,8 +339,11 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): is_gpu_node = self.procs[jobid].is_gpu_node() # If node does not fit, skip at this moment - if (next_job_th > free_processors or next_job_gb > free_memory_gb - or (is_gpu_node and next_job_gpu_th > free_gpu_slots)): + if ( + next_job_th > free_processors + or next_job_gb > free_memory_gb + or (is_gpu_node and next_job_gpu_th > free_gpu_slots) + ): logger.debug( "Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).", jobid, @@ -424,6 +428,7 @@ def gpu_count(): n_gpus = 1 try: import GPUtil + return len(GPUtil.getGPUs()) except ImportError: return n_gpus diff --git a/nipype/pipeline/plugins/tests/test_multiproc.py b/nipype/pipeline/plugins/tests/test_multiproc.py index b954cb9517..484c0d07bc 100644 --- a/nipype/pipeline/plugins/tests/test_multiproc.py +++ b/nipype/pipeline/plugins/tests/test_multiproc.py @@ -56,7 +56,7 @@ def test_run_multiproc(tmpdir): class InputSpecSingleNode(nib.TraitedSpec): input1 = nib.traits.Int(desc="a random int") input2 = nib.traits.Int(desc="a random int") - use_gpu = nib.traits.Bool(False, mandatory = False, desc="boolean for GPU nodes") + use_gpu = nib.traits.Bool(False, mandatory=False, desc="boolean for GPU nodes") class OutputSpecSingleNode(nib.TraitedSpec): @@ -117,6 +117,7 @@ def test_no_more_threads_than_specified(tmpdir): with pytest.raises(RuntimeError): pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads}) + def test_no_more_gpu_threads_than_specified(tmpdir): tmpdir.chdir() @@ -129,7 +130,10 @@ def test_no_more_gpu_threads_than_specified(tmpdir): max_threads = 2 max_gpu = 1 with pytest.raises(RuntimeError): - pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu}) + pipe.run( + plugin="MultiProc", + plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu}, + ) @pytest.mark.skipif(