Skip to content

Commit

Permalink
fix: adapt code so it works with different versions of networkx
Browse files Browse the repository at this point in the history
  • Loading branch information
celprov committed Sep 11, 2023
1 parent 5f95281 commit cf681a2
Showing 1 changed file with 15 additions and 20 deletions.
35 changes: 15 additions & 20 deletions mriqc/engine/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ def run(self, graph, config, updatehash=False):
else:
if result:
if result["traceback"]:
notrun.append(
self._clean_queue(jobid, graph, result=result)
)
notrun.append(self._clean_queue(jobid, graph, result=result))
errors.append("".join(result["traceback"]))
else:
self._task_finished_cb(jobid)
Expand Down Expand Up @@ -289,12 +287,8 @@ def _submit_mapnode(self, jobid):
"lil",
)
self.depidx[-numnodes:, jobid] = 1
self.proc_done = np.concatenate(
(self.proc_done, np.zeros(numnodes, dtype=bool))
)
self.proc_pending = np.concatenate(
(self.proc_pending, np.zeros(numnodes, dtype=bool))
)
self.proc_done = np.concatenate((self.proc_done, np.zeros(numnodes, dtype=bool)))
self.proc_pending = np.concatenate((self.proc_pending, np.zeros(numnodes, dtype=bool)))
return False

def _local_hash_check(self, jobid, graph):
Expand All @@ -311,11 +305,7 @@ def _local_hash_check(self, jobid, graph):
overwrite = self.procs[jobid].overwrite
always_run = self.procs[jobid].interface.always_run

if (
cached
and updated
and (overwrite is False or overwrite is None and not always_run)
):
if cached and updated and (overwrite is False or overwrite is None and not always_run):
try:
self._task_finished_cb(jobid, cached=True)
self._remove_node_dirs()
Expand All @@ -339,7 +329,10 @@ def _task_finished_cb(self, jobid, cached=False):
rowview = self.depidx.getrowview(jobid)
rowview[rowview.nonzero()] = 0
if jobid not in self.mapnodesubids:
self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0
try:
self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0
except NotImplementedError:
self.refidx[self.refidx[:, [jobid]].nonzero()[0], jobid] = 0

def _generate_dependency_list(self, graph):
"""Generate a dependency list for a list of graphs."""
Expand All @@ -348,9 +341,13 @@ def _generate_dependency_list(self, graph):
from nipype.pipeline.engine.utils import topological_sort

self.procs, _ = topological_sort(graph)
self.depidx = nx.to_scipy_sparse_matrix(
graph, nodelist=self.procs, format="lil"

# In different versions of networkx, the function is called differently
to_sparse = getattr(nx, 'to_scipy_sparse_array', None) or getattr(
nx, 'to_scipy_sparse_matrix'
)
self.depidx = to_sparse(graph, nodelist=self.procs, format="lil")

self.refidx = self.depidx.astype(int)
self.proc_done = np.zeros(len(self.procs), dtype=bool)
self.proc_pending = np.zeros(len(self.procs), dtype=bool)
Expand Down Expand Up @@ -505,9 +502,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
# Check to see if a job is available (jobs with all dependencies run)
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
# See also https://github.com/nipy/nipype/issues/2372
jobids = np.flatnonzero(
~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__()
)
jobids = np.flatnonzero(~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__())

# Check available resources by summing all threads and memory used
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
Expand Down

0 comments on commit cf681a2

Please sign in to comment.