From cf681a205306b4deb2840b631ae64ad9cb65d78b Mon Sep 17 00:00:00 2001 From: Celine Provins Date: Fri, 8 Sep 2023 09:23:27 -0700 Subject: [PATCH 1/2] fix: adapt code so it works with different versions of networkx --- mriqc/engine/plugin.py | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/mriqc/engine/plugin.py b/mriqc/engine/plugin.py index c8d9d50bf..151145778 100644 --- a/mriqc/engine/plugin.py +++ b/mriqc/engine/plugin.py @@ -180,9 +180,7 @@ def run(self, graph, config, updatehash=False): else: if result: if result["traceback"]: - notrun.append( - self._clean_queue(jobid, graph, result=result) - ) + notrun.append(self._clean_queue(jobid, graph, result=result)) errors.append("".join(result["traceback"])) else: self._task_finished_cb(jobid) @@ -289,12 +287,8 @@ def _submit_mapnode(self, jobid): "lil", ) self.depidx[-numnodes:, jobid] = 1 - self.proc_done = np.concatenate( - (self.proc_done, np.zeros(numnodes, dtype=bool)) - ) - self.proc_pending = np.concatenate( - (self.proc_pending, np.zeros(numnodes, dtype=bool)) - ) + self.proc_done = np.concatenate((self.proc_done, np.zeros(numnodes, dtype=bool))) + self.proc_pending = np.concatenate((self.proc_pending, np.zeros(numnodes, dtype=bool))) return False def _local_hash_check(self, jobid, graph): @@ -311,11 +305,7 @@ def _local_hash_check(self, jobid, graph): overwrite = self.procs[jobid].overwrite always_run = self.procs[jobid].interface.always_run - if ( - cached - and updated - and (overwrite is False or overwrite is None and not always_run) - ): + if cached and updated and (overwrite is False or overwrite is None and not always_run): try: self._task_finished_cb(jobid, cached=True) self._remove_node_dirs() @@ -339,7 +329,10 @@ def _task_finished_cb(self, jobid, cached=False): rowview = self.depidx.getrowview(jobid) rowview[rowview.nonzero()] = 0 if jobid not in self.mapnodesubids: - self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0 + try: + self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0 + except NotImplementedError: + self.refidx[self.refidx[:, [jobid]].nonzero()[0], jobid] = 0 def _generate_dependency_list(self, graph): """Generate a dependency list for a list of graphs.""" @@ -348,9 +341,13 @@ def _generate_dependency_list(self, graph): from nipype.pipeline.engine.utils import topological_sort self.procs, _ = topological_sort(graph) - self.depidx = nx.to_scipy_sparse_matrix( - graph, nodelist=self.procs, format="lil" + + # In different versions of networkx, the function is called differently + to_sparse = getattr(nx, 'to_scipy_sparse_array', None) or getattr( + nx, 'to_scipy_sparse_matrix' ) + self.depidx = to_sparse(graph, nodelist=self.procs, format="lil") + self.refidx = self.depidx.astype(int) self.proc_done = np.zeros(len(self.procs), dtype=bool) self.proc_pending = np.zeros(len(self.procs), dtype=bool) @@ -505,9 +502,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): # Check to see if a job is available (jobs with all dependencies run) # See https://github.com/nipy/nipype/pull/2200#discussion_r141605722 # See also https://github.com/nipy/nipype/issues/2372 - jobids = np.flatnonzero( - ~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__() - ) + jobids = np.flatnonzero(~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__()) # Check available resources by summing all threads and memory used free_memory_gb, free_processors = self._check_resources(self.pending_tasks) From ce40b9758b36331ebd298381a1c5cd8654e8fd97 Mon Sep 17 00:00:00 2001 From: Celine Provins Date: Tue, 19 Sep 2023 10:55:40 -0700 Subject: [PATCH 2/2] sty: copy Nipype solution to networkx import issue --- mriqc/engine/plugin.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/mriqc/engine/plugin.py b/mriqc/engine/plugin.py index a530d0ca8..268148808 100644 --- a/mriqc/engine/plugin.py +++ b/mriqc/engine/plugin.py @@ -337,17 +337,15 @@ def _task_finished_cb(self, jobid, cached=False): def _generate_dependency_list(self, graph): """Generate a dependency list for a list of graphs.""" import numpy as np - import networkx as nx from nipype.pipeline.engine.utils import topological_sort - self.procs, _ = topological_sort(graph) - - # In different versions of networkx, the function is called differently - to_sparse = getattr(nx, 'to_scipy_sparse_array', None) or getattr( - nx, 'to_scipy_sparse_matrix' - ) - self.depidx = to_sparse(graph, nodelist=self.procs, format="lil") + try: + from networkx import to_scipy_sparse_array + except ImportError: # NetworkX < 2.7 + from networkx import to_scipy_sparse_matrix as to_scipy_sparse_array + self.procs, _ = topological_sort(graph) + self.depidx = to_scipy_sparse_array(graph, nodelist=self.procs, format="lil") self.refidx = self.depidx.astype(int) self.proc_done = np.zeros(len(self.procs), dtype=bool) self.proc_pending = np.zeros(len(self.procs), dtype=bool)