diff --git a/mriqc/engine/plugin.py b/mriqc/engine/plugin.py index f65be584b..268148808 100644 --- a/mriqc/engine/plugin.py +++ b/mriqc/engine/plugin.py @@ -180,9 +180,7 @@ def run(self, graph, config, updatehash=False): else: if result: if result["traceback"]: - notrun.append( - self._clean_queue(jobid, graph, result=result) - ) + notrun.append(self._clean_queue(jobid, graph, result=result)) errors.append("".join(result["traceback"])) else: self._task_finished_cb(jobid) @@ -289,12 +287,8 @@ def _submit_mapnode(self, jobid): "lil", ) self.depidx[-numnodes:, jobid] = 1 - self.proc_done = np.concatenate( - (self.proc_done, np.zeros(numnodes, dtype=bool)) - ) - self.proc_pending = np.concatenate( - (self.proc_pending, np.zeros(numnodes, dtype=bool)) - ) + self.proc_done = np.concatenate((self.proc_done, np.zeros(numnodes, dtype=bool))) + self.proc_pending = np.concatenate((self.proc_pending, np.zeros(numnodes, dtype=bool))) return False def _local_hash_check(self, jobid, graph): @@ -311,11 +305,7 @@ def _local_hash_check(self, jobid, graph): overwrite = self.procs[jobid].overwrite always_run = self.procs[jobid].interface.always_run - if ( - cached - and updated - and (overwrite is False or overwrite is None and not always_run) - ): + if cached and updated and (overwrite is False or overwrite is None and not always_run): try: self._task_finished_cb(jobid, cached=True) self._remove_node_dirs() @@ -339,18 +329,23 @@ def _task_finished_cb(self, jobid, cached=False): rowview = self.depidx.getrowview(jobid) rowview[rowview.nonzero()] = 0 if jobid not in self.mapnodesubids: - self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0 + try: + self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0 + except NotImplementedError: + self.refidx[self.refidx[:, [jobid]].nonzero()[0], jobid] = 0 def _generate_dependency_list(self, graph): """Generate a dependency list for a list of graphs.""" import numpy as np - import networkx as nx from nipype.pipeline.engine.utils import topological_sort + try: + from networkx import to_scipy_sparse_array + except ImportError: # NetworkX < 2.7 + from networkx import to_scipy_sparse_matrix as to_scipy_sparse_array + self.procs, _ = topological_sort(graph) - self.depidx = nx.to_scipy_sparse_matrix( - graph, nodelist=self.procs, format="lil" - ) + self.depidx = to_scipy_sparse_array(graph, nodelist=self.procs, format="lil") self.refidx = self.depidx.astype(int) self.proc_done = np.zeros(len(self.procs), dtype=bool) self.proc_pending = np.zeros(len(self.procs), dtype=bool) @@ -505,9 +500,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): # Check to see if a job is available (jobs with all dependencies run) # See https://github.com/nipy/nipype/pull/2200#discussion_r141605722 # See also https://github.com/nipy/nipype/issues/2372 - jobids = np.flatnonzero( - ~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__() - ) + jobids = np.flatnonzero(~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__()) # Check available resources by summing all threads and memory used free_memory_gb, free_processors = self._check_resources(self.pending_tasks)