From aae74e72897912f7fcb12be11f90872a06ea0ed8 Mon Sep 17 00:00:00 2001 From: Derek Alexander Date: Fri, 27 Sep 2024 13:07:49 -0400 Subject: [PATCH 01/19] wip: add execute_parallel meta-function to abstract out joblib components, demo on betweenness centrality algorithm only --- .../algorithms/centrality/betweenness.py | 68 ++++++----- nx_parallel/utils/chunk.py | 107 ++++++++++++------ 2 files changed, 111 insertions(+), 64 deletions(-) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 3f396b1..44e3760 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -1,4 +1,3 @@ -from joblib import Parallel, delayed from networkx.algorithms.centrality.betweenness import ( _accumulate_basic, _accumulate_endpoints, @@ -46,23 +45,28 @@ def betweenness_centrality( else: nodes = seed.sample(list(G.nodes), k) - n_jobs = nxp.get_n_jobs() + def process_func(G, chunk, weight, endpoints): + return _betweenness_centrality_node_subset( + G, chunk, weight=weight, endpoints=endpoints + ) - if get_chunks == "chunks": - node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes) - else: - node_chunks = get_chunks(nodes) + def iterator_func(G): + return G.nodes - bt_cs = Parallel()( - delayed(_betweenness_centrality_node_subset)(G, chunk, weight, endpoints) - for chunk in node_chunks + results = nxp.utils.chunks.execute_parallel( + G, + process_func=process_func, + iterator_func=iterator_func, + get_chunks=get_chunks, + weight=weight, + endpoints=endpoints, ) - # Reducing partial solution - bt_c = bt_cs[0] - for bt in bt_cs[1:]: - for n in bt: - bt_c[n] += bt[n] + # Aggregation of partial results + bt_c = {} + for bt in results: + for n, value in bt.items(): + bt_c[n] = bt_c.get(n, 0.0) + value betweenness = _rescale( bt_c, @@ -116,29 +120,35 @@ def edge_betweenness_centrality( else: nodes = seed.sample(list(G.nodes), k) - n_jobs = nxp.get_n_jobs() + def process_func(G, chunk, weight) -> dict: + return _edge_betweenness_centrality_node_subset(G, chunk, weight=weight) - if get_chunks == "chunks": - node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes) - else: - node_chunks = get_chunks(nodes) + def iterator_func(G): + return nodes - bt_cs = Parallel()( - delayed(_edge_betweenness_centrality_node_subset)(G, chunk, weight) - for chunk in node_chunks + # Execute the parallel processing + results = nxp.utils.chunk.execute_parallel( + G, + process_func=process_func, + iterator_func=iterator_func, + get_chunks=get_chunks, + weight=weight, ) - # Reducing partial solution - bt_c = bt_cs[0] - for bt in bt_cs[1:]: - for e in bt: - bt_c[e] += bt[e] + # Aggregation of partial results + bt_c = {} + for partial_bt in results: + for edge, value in partial_bt.items(): + bt_c[edge] = bt_c.get(edge, 0.0) + value - for n in G: # remove nodes to only return edges - del bt_c[n] + # Remove node entries to retain only edges (in case any nodes were mistakenly included) + for node in G: + bt_c.pop(node, None) + # Rescale the betweenness centrality values betweenness = _rescale_e(bt_c, len(G), normalized=normalized, k=k) + # Handle MultiGraphs by adding edge keys if G.is_multigraph(): betweenness = _add_edge_keys(G, betweenness, weight=weight) diff --git a/nx_parallel/utils/chunk.py b/nx_parallel/utils/chunk.py index 5a9052d..c0fc593 100644 --- a/nx_parallel/utils/chunk.py +++ b/nx_parallel/utils/chunk.py @@ -1,9 +1,34 @@ import itertools import os +import threading +from contextlib import contextmanager import networkx as nx +import nx_parallel as nxp +from joblib import Parallel, delayed +__all__ = ["chunks", "get_n_jobs", "execute_parallel"] -__all__ = ["chunks", "get_n_jobs", "create_iterables"] +_joblib_config = ( + threading.local() +) # thread-local storage ensures that parallel configs are thread-safe and do not interfere with each other during concurrent executions. + + +@contextmanager +def parallel_config(**kwargs): + """ + Context manager to set Joblib's Parallel configurations in thread-local storage. + + Parameters + ---------- + **kwargs : dict + Keyword arguments corresponding to Joblib's Parallel parameters (e.g., backend, verbose). + """ + original_kwargs = getattr(_joblib_config, "parallel_kwargs", {}) + _joblib_config.parallel_kwargs = kwargs + try: + yield + finally: + _joblib_config.parallel_kwargs = original_kwargs def chunks(iterable, n_chunks): @@ -52,44 +77,56 @@ def get_n_jobs(n_jobs=None): return int(n_jobs) -def create_iterables(G, iterator, n_cores, list_of_iterator=None): - """Create an iterable of function inputs for parallel computation - based on the provided iterator type. +def execute_parallel( + G: nx.Graph, + process_func, + iterator_func, + get_chunks="chunks", + **kwargs, +): + """ + Helper function to execute a processing function in parallel over chunks of data. Parameters ---------- - G : NetworkX graph - The NetworkX graph. - iterator : str - Type of iterator. Valid values are 'node', 'edge', 'isolate' - n_cores : int - The number of cores to use. - list_of_iterator : list, optional - A precomputed list of items to iterate over. If None, it will - be generated based on the iterator type. + G : networkx.Graph + The graph on which the algorithm operates. + process_func : callable + The function to process each chunk. Should accept (G, chunk, **kwargs). + iterator_func : callable, optional + A function that takes G and returns an iterable of data to process. + get_chunks : str or callable, optional (default="chunks") + Determines how to chunk the data. + - If "chunks", chunks are created automatically based on the number of jobs. + - If callable, it should take the data iterable and return an iterable of chunks. + **kwargs : dict + Additional keyword arguments to pass to `process_func`. Returns ------- - iterable : Iterable - An iterable of function inputs. - - Raises - ------ - ValueError - If the iterator type is not one of "node", "edge" or "isolate". + list + A list of results from each parallel execution. """ - - if not list_of_iterator: - if iterator == "node": - list_of_iterator = list(G.nodes) - elif iterator == "edge": - list_of_iterator = list(G.edges) - elif iterator == "isolate": - list_of_iterator = list(nx.isolates(G)) - else: - raise ValueError(f"Invalid iterator type: {iterator}") - - if not list_of_iterator: - return iter([]) - - return chunks(list_of_iterator, n_cores) + n_jobs = nxp.get_n_jobs() + + # generate data using the iterator function + data = iterator_func(G) + + # handle chunking + if get_chunks == "chunks": + # convert data to a list if it's a generator or other iterable + data = list(data) + data_chunks = nxp.chunks(data, max(len(data) // n_jobs, 1)) + elif callable(get_chunks): + data_chunks = get_chunks(data) + else: + raise ValueError( + "get_chunks must be 'chunks' or a callable that returns an iterable of chunks." + ) + + # read parallel_kwargs from thread-local storage + parallel_kwargs = getattr(_joblib_config, "parallel_kwargs", {}) + + return Parallel(n_jobs=n_jobs, **(parallel_kwargs or {}))( + delayed(process_func)(G, chunk, **kwargs) for chunk in data_chunks + ) From c2028e7d6df640379cb1233fbd43339d97e54ffc Mon Sep 17 00:00:00 2001 From: Derek Alexander Date: Fri, 27 Sep 2024 13:22:51 -0400 Subject: [PATCH 02/19] feat: revert old code comments --- nx_parallel/algorithms/centrality/betweenness.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 44e3760..223d84a 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -62,7 +62,7 @@ def iterator_func(G): endpoints=endpoints, ) - # Aggregation of partial results + # Reducing the partial solution bt_c = {} for bt in results: for n, value in bt.items(): @@ -126,7 +126,6 @@ def process_func(G, chunk, weight) -> dict: def iterator_func(G): return nodes - # Execute the parallel processing results = nxp.utils.chunk.execute_parallel( G, process_func=process_func, @@ -135,7 +134,7 @@ def iterator_func(G): weight=weight, ) - # Aggregation of partial results + # Reducing the partial solution bt_c = {} for partial_bt in results: for edge, value in partial_bt.items(): From 47616541693bcb033384f0fbb80baf40c3127c56 Mon Sep 17 00:00:00 2001 From: Derek Alexander Date: Fri, 27 Sep 2024 13:24:20 -0400 Subject: [PATCH 03/19] feat: revert old code comments --- nx_parallel/algorithms/centrality/betweenness.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 223d84a..27fe69a 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -62,7 +62,7 @@ def iterator_func(G): endpoints=endpoints, ) - # Reducing the partial solution + # Reducing partial solution bt_c = {} for bt in results: for n, value in bt.items(): @@ -134,20 +134,17 @@ def iterator_func(G): weight=weight, ) - # Reducing the partial solution + # Reducing partial solution bt_c = {} for partial_bt in results: for edge, value in partial_bt.items(): bt_c[edge] = bt_c.get(edge, 0.0) + value - # Remove node entries to retain only edges (in case any nodes were mistakenly included) - for node in G: + for node in G: # remove nodes to only return edges bt_c.pop(node, None) - # Rescale the betweenness centrality values betweenness = _rescale_e(bt_c, len(G), normalized=normalized, k=k) - # Handle MultiGraphs by adding edge keys if G.is_multigraph(): betweenness = _add_edge_keys(G, betweenness, weight=weight) From e02a009f295ccf42b30d14f111132c9f6876ad12 Mon Sep 17 00:00:00 2001 From: Derek Alexander Date: Fri, 27 Sep 2024 13:33:40 -0400 Subject: [PATCH 04/19] fix: missing k handling in iterator_func --- .../algorithms/centrality/betweenness.py | 34 ++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 27fe69a..2d7907a 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -40,20 +40,18 @@ def betweenness_centrality( if hasattr(G, "graph_object"): G = G.graph_object - if k is None: - nodes = G.nodes - else: - nodes = seed.sample(list(G.nodes), k) - def process_func(G, chunk, weight, endpoints): return _betweenness_centrality_node_subset( G, chunk, weight=weight, endpoints=endpoints ) def iterator_func(G): - return G.nodes + if k is None: + return G.nodes + else: + return seed.sample(list(G.nodes), k) - results = nxp.utils.chunks.execute_parallel( + bt_cs = nxp.utils.chunks.execute_parallel( G, process_func=process_func, iterator_func=iterator_func, @@ -64,7 +62,7 @@ def iterator_func(G): # Reducing partial solution bt_c = {} - for bt in results: + for bt in bt_cs: for n, value in bt.items(): bt_c[n] = bt_c.get(n, 0.0) + value @@ -115,18 +113,16 @@ def edge_betweenness_centrality( if hasattr(G, "graph_object"): G = G.graph_object - if k is None: - nodes = G.nodes - else: - nodes = seed.sample(list(G.nodes), k) - - def process_func(G, chunk, weight) -> dict: + def process_func(G, chunk, weight): return _edge_betweenness_centrality_node_subset(G, chunk, weight=weight) def iterator_func(G): - return nodes + if k is None: + return G.nodes + else: + return seed.sample(list(G.nodes), k) - results = nxp.utils.chunk.execute_parallel( + bt_cs = nxp.utils.chunk.execute_parallel( G, process_func=process_func, iterator_func=iterator_func, @@ -136,12 +132,12 @@ def iterator_func(G): # Reducing partial solution bt_c = {} - for partial_bt in results: + for partial_bt in bt_cs: for edge, value in partial_bt.items(): bt_c[edge] = bt_c.get(edge, 0.0) + value - for node in G: # remove nodes to only return edges - bt_c.pop(node, None) + for n in G: # remove nodes to only return edges + del bt_c[n] betweenness = _rescale_e(bt_c, len(G), normalized=normalized, k=k) From 597804abe7965abf37794490a700ea98a4a0a53e Mon Sep 17 00:00:00 2001 From: Derek Alexander <16432683+dPys@users.noreply.github.com> Date: Thu, 17 Oct 2024 12:16:51 -0400 Subject: [PATCH 05/19] Update nx_parallel/utils/chunk.py Co-authored-by: Aditi Juneja <91629733+Schefflera-Arboricola@users.noreply.github.com> --- nx_parallel/utils/chunk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nx_parallel/utils/chunk.py b/nx_parallel/utils/chunk.py index c0fc593..7906048 100644 --- a/nx_parallel/utils/chunk.py +++ b/nx_parallel/utils/chunk.py @@ -127,6 +127,6 @@ def execute_parallel( # read parallel_kwargs from thread-local storage parallel_kwargs = getattr(_joblib_config, "parallel_kwargs", {}) - return Parallel(n_jobs=n_jobs, **(parallel_kwargs or {}))( + return Parallel()( delayed(process_func)(G, chunk, **kwargs) for chunk in data_chunks ) From 11196ffe5c6e598d13c11f256978cb63eae00d27 Mon Sep 17 00:00:00 2001 From: Derek Alexander <16432683+dPys@users.noreply.github.com> Date: Thu, 17 Oct 2024 12:16:57 -0400 Subject: [PATCH 06/19] Update nx_parallel/algorithms/centrality/betweenness.py Co-authored-by: Aditi Juneja <91629733+Schefflera-Arboricola@users.noreply.github.com> --- nx_parallel/algorithms/centrality/betweenness.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 2d7907a..4436942 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -51,7 +51,7 @@ def iterator_func(G): else: return seed.sample(list(G.nodes), k) - bt_cs = nxp.utils.chunks.execute_parallel( + bt_cs = nxp.utils.chunk.execute_parallel( G, process_func=process_func, iterator_func=iterator_func, From 66a680f477fa9ed8970b45e440e1502b23b0ea88 Mon Sep 17 00:00:00 2001 From: dpys Date: Thu, 17 Oct 2024 14:38:20 -0400 Subject: [PATCH 07/19] fix: failing unit tests, redundant / incomplete joblib param propogation from ParallelConfig --- .../algorithms/centrality/betweenness.py | 11 +- .../tests/test_betweenness_centrality.py | 59 ++-- nx_parallel/interface.py | 18 +- nx_parallel/tests/test_get_chunks.py | 34 ++- nx_parallel/utils/chunk.py | 61 +++-- nx_parallel/utils/tests/test_chunk.py | 253 ++++++++++++++++-- 6 files changed, 362 insertions(+), 74 deletions(-) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 4436942..233049a 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -24,6 +24,7 @@ def betweenness_centrality( endpoints=False, seed=None, get_chunks="chunks", + **kwargs, ): """The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently. @@ -58,6 +59,7 @@ def iterator_func(G): get_chunks=get_chunks, weight=weight, endpoints=endpoints, + **kwargs, ) # Reducing partial solution @@ -96,7 +98,13 @@ def _betweenness_centrality_node_subset(G, nodes, weight=None, endpoints=False): @nxp._configure_if_nx_active() @py_random_state(4) def edge_betweenness_centrality( - G, k=None, normalized=True, weight=None, seed=None, get_chunks="chunks" + G, + k=None, + normalized=True, + weight=None, + seed=None, + get_chunks="edges", + **kwargs, ): """The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently. @@ -128,6 +136,7 @@ def iterator_func(G): iterator_func=iterator_func, get_chunks=get_chunks, weight=weight, + **kwargs, ) # Reducing partial solution diff --git a/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py b/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py index 408ba05..c3e3556 100644 --- a/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py +++ b/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py @@ -3,33 +3,48 @@ import math -def test_betweenness_centrality_get_chunks(): - def get_chunk(nodes): +def test_edge_betweenness_centrality_get_chunks(): + def get_chunk(edges): num_chunks = nxp.get_n_jobs() - nodes_ebc = {i: 0 for i in nodes} - for i in ebc: - nodes_ebc[i[0]] += ebc[i] - nodes_ebc[i[1]] += ebc[i] - sorted_nodes = sorted(nodes_ebc.items(), key=lambda x: x[1], reverse=True) + # Convert edges to a list to support slicing + edges = list(edges) - chunks = [[] for _ in range(num_chunks)] - chunk_sums = [0] * num_chunks + # Split edges into chunks without relying on precomputed centrality + chunk_size = max(1, len(edges) // num_chunks) + chunks = [edges[i : i + chunk_size] for i in range(0, len(edges), chunk_size)] - for node, value in sorted_nodes: - min_chunk_index = chunk_sums.index(min(chunk_sums)) - chunks[min_chunk_index].append(node) - chunk_sums[min_chunk_index] += value + # Debugging: Print how edges are distributed among chunks + print(f"Chunks distribution: {chunks}") return chunks + # Create a random graph G = nx.fast_gnp_random_graph(100, 0.1, directed=False) - H = nxp.ParallelGraph(G) - ebc = nx.edge_betweenness_centrality(G) - par_bc_chunk = nxp.betweenness_centrality(H, get_chunks=get_chunk) # smoke test - par_bc = nxp.betweenness_centrality(H) - - for i in range(len(G.nodes)): - assert math.isclose(par_bc[i], par_bc_chunk[i], abs_tol=1e-16) - # get_chunk is faster than default(for big graphs) - # G = nx.bipartite.random_graph(400, 700, 0.8, seed=5, directed=False) + H = nxp.ParallelGraph(G) # Wrap the graph in ParallelGraph + + # Compute edge betweenness centrality using NetworkX + ebc = nx.edge_betweenness_centrality(G, normalized=True) + + # Debugging: Print the edge betweenness centrality values from NetworkX + print(f"NetworkX Edge Betweenness Centrality: {ebc}") + + # Instantiate the BackendInterface + backend = nxp.BackendInterface() + + # Smoke test for edge_betweenness_centrality with custom get_chunks + par_bc_chunk = backend.edge_betweenness_centrality( + H.graph_object, # Pass the underlying NetworkX graph + get_chunks=get_chunk, # Pass the custom get_chunks function + ) + + # Debugging: Print the results from parallel computation + print(f"Parallel Computed Edge Betweenness Centrality: {par_bc_chunk}") + + # Compare with standard NetworkX edge betweenness centrality + standard_bc = nx.edge_betweenness_centrality(G, normalized=True) + + for edge in standard_bc: + assert math.isclose( + par_bc_chunk[edge], standard_bc[edge], abs_tol=1e-6 + ), f"Edge {edge} mismatch: {par_bc_chunk[edge]} vs {standard_bc[edge]}" diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 38af8c7..480ea37 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -1,6 +1,8 @@ +from functools import wraps from operator import attrgetter import networkx as nx from nx_parallel import algorithms +from nx_parallel.utils.chunk import execute_parallel __all__ = ["BackendInterface", "ParallelGraph"] @@ -65,13 +67,25 @@ def is_directed(self): def __str__(self): return f"Parallel{self.graph_object}" + def __getattr__(self, attr): + """Delegate attribute access to the underlying NetworkX graph.""" + return getattr(self.graph_object, attr) + + def __getstate__(self): + """Support pickling by returning the state of the underlying graph.""" + return self.graph_object + + def __setstate__(self, state): + """Support unpickling by restoring the underlying graph.""" + self.graph_object = state + def assign_algorithms(cls): """Class decorator to assign algorithms to the class attributes.""" for attr in ALGORITHMS: - # get the function name by parsing the module hierarchy func_name = attr.rsplit(".", 1)[-1] - setattr(cls, func_name, attrgetter(attr)(algorithms)) + func = attrgetter(attr)(algorithms) + setattr(cls, func_name, staticmethod(func)) return cls diff --git a/nx_parallel/tests/test_get_chunks.py b/nx_parallel/tests/test_get_chunks.py index c8e63f9..514b168 100644 --- a/nx_parallel/tests/test_get_chunks.py +++ b/nx_parallel/tests/test_get_chunks.py @@ -1,5 +1,3 @@ -# smoke tests for all functions supporting `get_chunks` kwarg - import inspect import importlib import random @@ -10,7 +8,7 @@ import nx_parallel as nxp -def get_all_functions(package_name="nx_parallel"): +def get_all_functions(package_name="nx_parallel.algorithms"): """Returns a dict keyed by function names to its arguments. This function constructs a dictionary keyed by the function @@ -32,8 +30,8 @@ def get_functions_with_get_chunks(): """Returns a list of function names with the `get_chunks` kwarg.""" all_funcs = get_all_functions() get_chunks_funcs = [] - for func in all_funcs: - if "get_chunks" in all_funcs[func]["args"]: + for func, params in all_funcs.items(): + if "get_chunks" in params["args"]: get_chunks_funcs.append(func) return get_chunks_funcs @@ -47,6 +45,15 @@ def random_chunking(nodes): num_in_chunk = max(len(_nodes) // num_chunks, 1) return nxp.chunks(_nodes, num_in_chunk) + # Define a simple process_func for testing + def process_func(G, chunk, **kwargs): + # Example: Return the degree of each node in the chunk + return {node: G.degree(node) for node in chunk} + + # Define a simple iterator_func for testing + def iterator_func(G): + return G.nodes() + get_chunks_funcs = get_functions_with_get_chunks() ignore_funcs = [ "number_of_isolates", @@ -62,19 +69,24 @@ def random_chunking(nodes): G = nx.fast_gnp_random_graph(50, 0.6, seed=42) H = nxp.ParallelGraph(G) for func in get_chunks_funcs: - print(func) if func not in ignore_funcs: if func in tournament_funcs: G = nx.tournament.random_tournament(50, seed=42) H = nxp.ParallelGraph(G) - c1 = getattr(nxp, func)(H) - c2 = getattr(nxp, func)(H, get_chunks=random_chunking) + c1 = getattr(nxp, func)(H, process_func, iterator_func) + c2 = getattr(nxp, func)( + H, process_func, iterator_func, get_chunks=random_chunking + ) assert c1 == c2 else: - c1 = getattr(nxp, func)(H) - c2 = getattr(nxp, func)(H, get_chunks=random_chunking) + c1 = getattr(nxp, func)(H, process_func, iterator_func) + c2 = getattr(nxp, func)( + H, process_func, iterator_func, get_chunks=random_chunking + ) if isinstance(c1, types.GeneratorType): - c1, c2 = dict(c1), dict(c2) + c1, c2 = list(c1), list( + c2 + ) # Convert generators to lists for comparison if func in chk_dict_vals: for i in range(len(G.nodes)): assert math.isclose(c1[i], c2[i], abs_tol=1e-16) diff --git a/nx_parallel/utils/chunk.py b/nx_parallel/utils/chunk.py index 7906048..ff07f40 100644 --- a/nx_parallel/utils/chunk.py +++ b/nx_parallel/utils/chunk.py @@ -6,25 +6,28 @@ import nx_parallel as nxp from joblib import Parallel, delayed -__all__ = ["chunks", "get_n_jobs", "execute_parallel"] +__all__ = ["parallel_config", "chunks", "get_n_jobs", "execute_parallel"] _joblib_config = ( threading.local() -) # thread-local storage ensures that parallel configs are thread-safe and do not interfere with each other during concurrent executions. +) # thread-local storage ensures that parallel configs are thread-safe and do not +# interfere with each other during concurrent executions. @contextmanager def parallel_config(**kwargs): - """ - Context manager to set Joblib's Parallel configurations in thread-local storage. + """Context manager to temporarily override Joblib's Parallel configurations. Parameters ---------- **kwargs : dict - Keyword arguments corresponding to Joblib's Parallel parameters (e.g., backend, verbose). + Keyword arguments corresponding to Joblib's Parallel parameters + (e.g., backend, verbose). These overrides are temporary and confined + to the current thread. """ - original_kwargs = getattr(_joblib_config, "parallel_kwargs", {}) - _joblib_config.parallel_kwargs = kwargs + original_kwargs = getattr(_joblib_config, "parallel_kwargs", {}).copy() + _joblib_config.parallel_kwargs = {**original_kwargs, **kwargs} + try: yield finally: @@ -49,13 +52,17 @@ def get_n_jobs(n_jobs=None): active configuration system or modifying the passed-in value, similar to joblib's behavior. - - If running under pytest, it returns 2 jobs. + - If running under pytest, it returns 2 jobs when n_jobs is None. - If the `active` configuration in NetworkX's config is `True`, `n_jobs` is extracted from the NetworkX config. - Otherwise, `n_jobs` is obtained from joblib's active backend. - `ValueError` is raised if `n_jobs` is 0. """ - if "PYTEST_CURRENT_TEST" in os.environ: + parallel_kwargs = getattr(_joblib_config, "parallel_kwargs", {}) + if n_jobs is None and "n_jobs" in parallel_kwargs: + n_jobs = parallel_kwargs["n_jobs"] + + if n_jobs is None and "PYTEST_CURRENT_TEST" in os.environ: return 2 if n_jobs is None: @@ -84,8 +91,7 @@ def execute_parallel( get_chunks="chunks", **kwargs, ): - """ - Helper function to execute a processing function in parallel over chunks of data. + """Helper function to execute a processing function in parallel over chunks of data. Parameters ---------- @@ -93,12 +99,13 @@ def execute_parallel( The graph on which the algorithm operates. process_func : callable The function to process each chunk. Should accept (G, chunk, **kwargs). - iterator_func : callable, optional + iterator_func : callable A function that takes G and returns an iterable of data to process. get_chunks : str or callable, optional (default="chunks") Determines how to chunk the data. - If "chunks", chunks are created automatically based on the number of jobs. - - If callable, it should take the data iterable and return an iterable of chunks. + - If callable, it should take the data iterable and return an iterable of + chunks. **kwargs : dict Additional keyword arguments to pass to `process_func`. @@ -109,24 +116,40 @@ def execute_parallel( """ n_jobs = nxp.get_n_jobs() - # generate data using the iterator function data = iterator_func(G) - # handle chunking if get_chunks == "chunks": - # convert data to a list if it's a generator or other iterable data = list(data) data_chunks = nxp.chunks(data, max(len(data) // n_jobs, 1)) elif callable(get_chunks): data_chunks = get_chunks(data) else: raise ValueError( - "get_chunks must be 'chunks' or a callable that returns an iterable of chunks." + "get_chunks must be 'chunks' or a callable that returns an iterable of " + "chunks." ) - # read parallel_kwargs from thread-local storage + # retrieve global backend ParallelConfig instance + config = nx.config.backends.parallel + + joblib_params = { + "backend": config.backend, + "n_jobs": n_jobs, + "verbose": config.verbose, + "temp_folder": config.temp_folder, + "max_nbytes": config.max_nbytes, + "mmap_mode": config.mmap_mode, + "prefer": config.prefer, + "require": config.require, + "inner_max_num_threads": config.inner_max_num_threads, + } + + # retrieve and apply overrides from parallel_config parallel_kwargs = getattr(_joblib_config, "parallel_kwargs", {}) + joblib_params.update(parallel_kwargs) + + joblib_params = {k: v for k, v in joblib_params.items() if v is not None} - return Parallel()( + return Parallel(**joblib_params)( delayed(process_func)(G, chunk, **kwargs) for chunk in data_chunks ) diff --git a/nx_parallel/utils/tests/test_chunk.py b/nx_parallel/utils/tests/test_chunk.py index 45ef9ae..cfc5046 100644 --- a/nx_parallel/utils/tests/test_chunk.py +++ b/nx_parallel/utils/tests/test_chunk.py @@ -1,4 +1,5 @@ import os +from unittest.mock import patch, call import pytest import networkx as nx import nx_parallel as nxp @@ -9,6 +10,11 @@ def test_get_n_jobs(): # Test with no n_jobs (default) with pytest.MonkeyPatch().context() as mp: mp.delitem(os.environ, "PYTEST_CURRENT_TEST", raising=False) + + # Ensure that the parallel config is inactive + nx.config.backends.parallel.active = False + nx.config.backends.parallel.n_jobs = None + assert nxp.get_n_jobs() == 1 # Test with n_jobs set to positive value @@ -16,19 +22,22 @@ def test_get_n_jobs(): # Test with n_jobs set to negative value assert nxp.get_n_jobs(-1) == os.cpu_count() - nx.config.backends.parallel.active = False - from joblib import parallel_config - parallel_config(n_jobs=3) - assert nxp.get_n_jobs() == 3 + # Mock joblib's active backend to return ('loky', 3) + with patch("joblib.parallel.get_active_backend", return_value=("loky", 3)): + assert nxp.get_n_jobs() == 3 + + # Test with n_jobs set in NetworkX config nx.config.backends.parallel.active = True nx.config.backends.parallel.n_jobs = 5 assert nxp.get_n_jobs() == 5 + # Test with n_jobs = 0 to raise a ValueError - try: + # Ensure that the parallel config is inactive to not override n_jobs + nx.config.backends.parallel.active = False + nx.config.backends.parallel.n_jobs = None + with pytest.raises(ValueError, match="n_jobs == 0 in Parallel has no meaning"): nxp.get_n_jobs(0) - except ValueError as e: - assert str(e) == "n_jobs == 0 in Parallel has no meaning" def test_chunks(): @@ -44,18 +53,224 @@ def test_chunks(): assert chunks_list == [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)] -def test_create_iterables(): - """Test `create_iterables` for different iterator types.""" - G = nx.fast_gnp_random_graph(50, 0.6, seed=42) +def test_execute_parallel_basic(): + """Basic test for `execute_parallel` to ensure it processes chunks correctly.""" + + G = nx.path_graph(10) + H = nxp.ParallelGraph(G) + + # Define a simple process_func that calculates the degree of each node in the chunk + def process_func(G, chunk, **kwargs): + return {node: G.degree(node) for node in chunk} + + # Define an iterator_func that returns all nodes + def iterator_func(G): + return list(G.nodes()) # Convert NodeView to list + + # Execute in parallel without overrides + results = nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks="chunks", + ) + + combined_results = {} + for res in results: + combined_results.update(res) + + assert combined_results == dict(G.degree()) + + +def test_execute_parallel_with_overrides(): + """Test `execute_parallel` with overridden parallel configuration.""" + + # Create a simple graph + G = nx.complete_graph(5) + H = nxp.ParallelGraph(G) + + # Define a simple process_func that returns the list of nodes in the chunk + def process_func(G, chunk, **kwargs): + return list(chunk) + + # Define an iterator_func that returns all nodes + def iterator_func(G): + return list(G.nodes()) # Convert NodeView to list + + # Mock joblib.Parallel in the correct module + with patch("nx_parallel.utils.chunk.Parallel") as mock_parallel: + with nxp.parallel_config(n_jobs=2, backend="loky", verbose=5): + nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks="chunks", + ) + + # Assert that Parallel was called with overridden parameters (excluding None values) + mock_parallel.assert_called_with( + backend="loky", + n_jobs=2, + verbose=5, + max_nbytes="1M", + mmap_mode="r", + ) + + +def test_execute_parallel_callable_chunks(): + """Test `execute_parallel` with a custom callable for get_chunks.""" + + G = nx.cycle_graph(6) + H = nxp.ParallelGraph(G) + + # Define a process_func that sums node numbers in the chunk + def process_func(G, chunk, **kwargs): + return sum(chunk) + + # Define an iterator_func that returns all nodes as a list + def iterator_func(G): + return list(G.nodes()) # Convert NodeView to list + + # Define a custom chunking function that groups nodes into chunks of size 2 + def custom_chunking(data): + return [tuple(data[i : i + 2]) for i in range(0, len(data), 2)] + + results = nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks=custom_chunking, + ) + + # Expected sums: (0+1), (2+3), (4+5) => 1, 5, 9 + expected_results = [1, 5, 9] + + assert results == expected_results + + +def test_parallel_config_override(): + """Test that `parallel_config` correctly overrides config within its context.""" + + # Define a simple graph + G = nx.complete_graph(5) + H = nxp.ParallelGraph(G) + + # Define a simple process_func that returns the list of nodes in the chunk + def process_func(G, chunk, **kwargs): + return list(chunk) + + # Define an iterator_func that returns all nodes + def iterator_func(G): + return list(G.nodes()) # Convert NodeView to list + + # Mock joblib.Parallel to capture the parameters it's called with + with patch("nx_parallel.utils.chunk.Parallel") as mock_parallel: + with nxp.parallel_config(backend="threading", n_jobs=2, verbose=10): + nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks="chunks", + ) + + # Assert that Parallel was called with overridden parameters + mock_parallel.assert_called_with( + backend="threading", + n_jobs=2, + verbose=10, + max_nbytes="1M", + mmap_mode="r", + ) + + +def test_parallel_config_nested_overrides(): + """Test that nested `parallel_config` contexts correctly handle overrides.""" + + G = nx.complete_graph(5) + H = nxp.ParallelGraph(G) + + # Define a simple process_func + def process_func(G, chunk, **kwargs): + return list(chunk) + + # Define an iterator_func + def iterator_func(G): + return list(G.nodes()) + + # Mock joblib.Parallel in the correct module + with patch("nx_parallel.utils.chunk.Parallel") as mock_parallel: + with nxp.parallel_config(backend="threading", n_jobs=2, verbose=10): + with nxp.parallel_config(n_jobs=4, verbose=5): + nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks="chunks", + ) + nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks="chunks", + ) + + # Extract only the instantiation calls to Parallel + instantiation_calls = [c for c in mock_parallel.call_args_list if c[0] or c[1]] + + # Adjust expected calls to exclude parameters with None values + expected_calls = [ + call( + backend="threading", + n_jobs=4, + verbose=5, + max_nbytes="1M", + mmap_mode="r", + ), + call( + backend="threading", + n_jobs=2, + verbose=10, + max_nbytes="1M", + mmap_mode="r", + ), + ] + + assert instantiation_calls == expected_calls + + +def test_parallel_config_thread_safety(monkeypatch): + """Test that `parallel_config` overrides are thread-safe.""" + + import threading + + # Remove 'PYTEST_CURRENT_TEST' from os.environ + monkeypatch.delenv("PYTEST_CURRENT_TEST", raising=False) + + # Define a function to run in a thread + def thread_func(backend, n_jobs, results, index): + with nxp.parallel_config(backend=backend, n_jobs=n_jobs): + # Simulate some operation that uses get_n_jobs + job_count = nxp.get_n_jobs() + # Retrieve backend from thread-local storage + parallel_kwargs = getattr( + nxp.utils.chunk._joblib_config, "parallel_kwargs", {} + ) + backend_used = parallel_kwargs.get( + "backend", nx.config.backends.parallel.backend + ) + results[index] = (backend_used, job_count) + + results = {} - # Test node iterator - iterable = nxp.create_iterables(G, "node", 4) - assert len(list(iterable)) == 4 + # Start two threads with different configurations + t1 = threading.Thread(target=thread_func, args=("loky", 5, results, 1)) + t2 = threading.Thread(target=thread_func, args=("threading", 3, results, 2)) - # Test edge iterator - iterable = nxp.create_iterables(G, "edge", 4) - assert len(list(iterable)) == 4 + t1.start() + t2.start() + t1.join() + t2.join() - # Test isolate iterator (G has no isolates, so this should be empty) - iterable = nxp.create_iterables(G, "isolate", 4) - assert len(list(iterable)) == 0 + # Ensure that each thread received its own overrides + assert results[1] == ("loky", 5) + assert results[2] == ("threading", 3) From 427e5e7ab0b7a1023babed21f217af37e97d7b92 Mon Sep 17 00:00:00 2001 From: dpys Date: Thu, 17 Oct 2024 14:39:38 -0400 Subject: [PATCH 08/19] fix: failing unit tests, redundant / incomplete joblib param propogation from ParallelConfig --- _nx_parallel/__init__.py | 4 ++-- nx_parallel/interface.py | 2 -- nx_parallel/tests/test_get_chunks.py | 5 +++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index 449682c..3e644f1 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -84,7 +84,7 @@ def get_info(): }, }, "betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L20", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L19", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently.", "additional_parameters": { 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." @@ -98,7 +98,7 @@ def get_info(): }, }, "edge_betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L96", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L100", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.", "additional_parameters": { 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 480ea37..9dc904a 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -1,8 +1,6 @@ -from functools import wraps from operator import attrgetter import networkx as nx from nx_parallel import algorithms -from nx_parallel.utils.chunk import execute_parallel __all__ = ["BackendInterface", "ParallelGraph"] diff --git a/nx_parallel/tests/test_get_chunks.py b/nx_parallel/tests/test_get_chunks.py index 514b168..db6245a 100644 --- a/nx_parallel/tests/test_get_chunks.py +++ b/nx_parallel/tests/test_get_chunks.py @@ -84,8 +84,9 @@ def iterator_func(G): H, process_func, iterator_func, get_chunks=random_chunking ) if isinstance(c1, types.GeneratorType): - c1, c2 = list(c1), list( - c2 + c1, c2 = ( + list(c1), + list(c2), ) # Convert generators to lists for comparison if func in chk_dict_vals: for i in range(len(G.nodes)): From 1615582a3423f2ad44547509d8e72e2cbf4439fd Mon Sep 17 00:00:00 2001 From: dpys Date: Thu, 17 Oct 2024 14:50:57 -0400 Subject: [PATCH 09/19] fix: change get_chunks default from edges -> chunks to see if tests pass --- nx_parallel/algorithms/centrality/betweenness.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 233049a..ff5b4f8 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -103,7 +103,7 @@ def edge_betweenness_centrality( normalized=True, weight=None, seed=None, - get_chunks="edges", + get_chunks="nodes", **kwargs, ): """The parallel computation is implemented by dividing the nodes into chunks and From 8853417285335dc375798c2081edd9044730261f Mon Sep 17 00:00:00 2001 From: dpys Date: Thu, 17 Oct 2024 15:13:21 -0400 Subject: [PATCH 10/19] chore: prevent superfluous changes to __init__ --- _nx_parallel/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index 3e644f1..449682c 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -84,7 +84,7 @@ def get_info(): }, }, "betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L19", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L20", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently.", "additional_parameters": { 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." @@ -98,7 +98,7 @@ def get_info(): }, }, "edge_betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L100", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L96", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.", "additional_parameters": { 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." From 312743c367494bf9cc9d5e7f366d50d653930443 Mon Sep 17 00:00:00 2001 From: dpys Date: Thu, 17 Oct 2024 15:14:31 -0400 Subject: [PATCH 11/19] chore: prevent superfluous changes to __init__ --- _nx_parallel/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index 449682c..3e644f1 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -84,7 +84,7 @@ def get_info(): }, }, "betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L20", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L19", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently.", "additional_parameters": { 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." @@ -98,7 +98,7 @@ def get_info(): }, }, "edge_betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L96", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L100", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.", "additional_parameters": { 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." From e0db6bbc59218cbfbda4dff29ae1b02bb163fcb6 Mon Sep 17 00:00:00 2001 From: dpys Date: Fri, 18 Oct 2024 10:32:38 -0400 Subject: [PATCH 12/19] fix: Accept 'nodes' as a Valid get_chunks Value --- nx_parallel/utils/chunk.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/nx_parallel/utils/chunk.py b/nx_parallel/utils/chunk.py index ff07f40..27ad6b9 100644 --- a/nx_parallel/utils/chunk.py +++ b/nx_parallel/utils/chunk.py @@ -103,7 +103,8 @@ def execute_parallel( A function that takes G and returns an iterable of data to process. get_chunks : str or callable, optional (default="chunks") Determines how to chunk the data. - - If "chunks", chunks are created automatically based on the number of jobs. + - If "chunks" or "nodes", chunks are created automatically based on the + number of jobs. - If callable, it should take the data iterable and return an iterable of chunks. **kwargs : dict @@ -118,15 +119,15 @@ def execute_parallel( data = iterator_func(G) - if get_chunks == "chunks": + if get_chunks in {"chunks", "nodes"}: data = list(data) data_chunks = nxp.chunks(data, max(len(data) // n_jobs, 1)) elif callable(get_chunks): data_chunks = get_chunks(data) else: raise ValueError( - "get_chunks must be 'chunks' or a callable that returns an iterable of " - "chunks." + "get_chunks must be 'chunks', 'nodes', or a callable that returns an " + "iterable of chunks." ) # retrieve global backend ParallelConfig instance From e4b9d9c2c172e52cd11b508cf83401ea3c7611a7 Mon Sep 17 00:00:00 2001 From: dpys Date: Sat, 19 Oct 2024 19:43:12 -0400 Subject: [PATCH 13/19] fix: revert only functional change from last commit --- .../tests/test_betweenness_centrality.py | 15 ++++---------- nx_parallel/interface.py | 6 +++--- nx_parallel/tests/test_get_chunks.py | 5 ++--- nx_parallel/utils/tests/test_chunk.py | 20 ++++--------------- 4 files changed, 13 insertions(+), 33 deletions(-) diff --git a/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py b/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py index c3e3556..9c50d8b 100644 --- a/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py +++ b/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py @@ -7,41 +7,34 @@ def test_edge_betweenness_centrality_get_chunks(): def get_chunk(edges): num_chunks = nxp.get_n_jobs() - # Convert edges to a list to support slicing edges = list(edges) # Split edges into chunks without relying on precomputed centrality chunk_size = max(1, len(edges) // num_chunks) chunks = [edges[i : i + chunk_size] for i in range(0, len(edges), chunk_size)] - # Debugging: Print how edges are distributed among chunks print(f"Chunks distribution: {chunks}") return chunks - # Create a random graph G = nx.fast_gnp_random_graph(100, 0.1, directed=False) - H = nxp.ParallelGraph(G) # Wrap the graph in ParallelGraph + H = nxp.ParallelGraph(G) - # Compute edge betweenness centrality using NetworkX ebc = nx.edge_betweenness_centrality(G, normalized=True) - # Debugging: Print the edge betweenness centrality values from NetworkX print(f"NetworkX Edge Betweenness Centrality: {ebc}") - # Instantiate the BackendInterface backend = nxp.BackendInterface() # Smoke test for edge_betweenness_centrality with custom get_chunks par_bc_chunk = backend.edge_betweenness_centrality( - H.graph_object, # Pass the underlying NetworkX graph - get_chunks=get_chunk, # Pass the custom get_chunks function + H.graph_object, + get_chunks=get_chunk, ) - # Debugging: Print the results from parallel computation print(f"Parallel Computed Edge Betweenness Centrality: {par_bc_chunk}") - # Compare with standard NetworkX edge betweenness centrality + # Compare with standard edge betweenness centrality standard_bc = nx.edge_betweenness_centrality(G, normalized=True) for edge in standard_bc: diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 9dc904a..c901ae8 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -81,9 +81,9 @@ def __setstate__(self, state): def assign_algorithms(cls): """Class decorator to assign algorithms to the class attributes.""" for attr in ALGORITHMS: - func_name = attr.rsplit(".", 1)[-1] - func = attrgetter(attr)(algorithms) - setattr(cls, func_name, staticmethod(func)) + setattr( + cls, attr.rsplit(".", 1)[-1], staticmethod(attrgetter(attr)(algorithms)) + ) return cls diff --git a/nx_parallel/tests/test_get_chunks.py b/nx_parallel/tests/test_get_chunks.py index db6245a..514b168 100644 --- a/nx_parallel/tests/test_get_chunks.py +++ b/nx_parallel/tests/test_get_chunks.py @@ -84,9 +84,8 @@ def iterator_func(G): H, process_func, iterator_func, get_chunks=random_chunking ) if isinstance(c1, types.GeneratorType): - c1, c2 = ( - list(c1), - list(c2), + c1, c2 = list(c1), list( + c2 ) # Convert generators to lists for comparison if func in chk_dict_vals: for i in range(len(G.nodes)): diff --git a/nx_parallel/utils/tests/test_chunk.py b/nx_parallel/utils/tests/test_chunk.py index cfc5046..480eae0 100644 --- a/nx_parallel/utils/tests/test_chunk.py +++ b/nx_parallel/utils/tests/test_chunk.py @@ -27,7 +27,7 @@ def test_get_n_jobs(): with patch("joblib.parallel.get_active_backend", return_value=("loky", 3)): assert nxp.get_n_jobs() == 3 - # Test with n_jobs set in NetworkX config + # Test with n_jobs set in NX config nx.config.backends.parallel.active = True nx.config.backends.parallel.n_jobs = 5 assert nxp.get_n_jobs() == 5 @@ -59,13 +59,11 @@ def test_execute_parallel_basic(): G = nx.path_graph(10) H = nxp.ParallelGraph(G) - # Define a simple process_func that calculates the degree of each node in the chunk def process_func(G, chunk, **kwargs): return {node: G.degree(node) for node in chunk} - # Define an iterator_func that returns all nodes def iterator_func(G): - return list(G.nodes()) # Convert NodeView to list + return list(G.nodes()) # Execute in parallel without overrides results = nxp.execute_parallel( @@ -85,17 +83,14 @@ def iterator_func(G): def test_execute_parallel_with_overrides(): """Test `execute_parallel` with overridden parallel configuration.""" - # Create a simple graph G = nx.complete_graph(5) H = nxp.ParallelGraph(G) - # Define a simple process_func that returns the list of nodes in the chunk def process_func(G, chunk, **kwargs): return list(chunk) - # Define an iterator_func that returns all nodes def iterator_func(G): - return list(G.nodes()) # Convert NodeView to list + return list(G.nodes()) # Mock joblib.Parallel in the correct module with patch("nx_parallel.utils.chunk.Parallel") as mock_parallel: @@ -123,11 +118,9 @@ def test_execute_parallel_callable_chunks(): G = nx.cycle_graph(6) H = nxp.ParallelGraph(G) - # Define a process_func that sums node numbers in the chunk def process_func(G, chunk, **kwargs): return sum(chunk) - # Define an iterator_func that returns all nodes as a list def iterator_func(G): return list(G.nodes()) # Convert NodeView to list @@ -151,17 +144,14 @@ def custom_chunking(data): def test_parallel_config_override(): """Test that `parallel_config` correctly overrides config within its context.""" - # Define a simple graph G = nx.complete_graph(5) H = nxp.ParallelGraph(G) - # Define a simple process_func that returns the list of nodes in the chunk def process_func(G, chunk, **kwargs): return list(chunk) - # Define an iterator_func that returns all nodes def iterator_func(G): - return list(G.nodes()) # Convert NodeView to list + return list(G.nodes()) # Mock joblib.Parallel to capture the parameters it's called with with patch("nx_parallel.utils.chunk.Parallel") as mock_parallel: @@ -189,11 +179,9 @@ def test_parallel_config_nested_overrides(): G = nx.complete_graph(5) H = nxp.ParallelGraph(G) - # Define a simple process_func def process_func(G, chunk, **kwargs): return list(chunk) - # Define an iterator_func def iterator_func(G): return list(G.nodes()) From 5c3be7019c3ed80123f20213d0f1b2f16dcabeb0 Mon Sep 17 00:00:00 2001 From: dpys Date: Sat, 19 Oct 2024 19:43:29 -0400 Subject: [PATCH 14/19] fix: revert only functional change from last commit --- nx_parallel/tests/test_get_chunks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nx_parallel/tests/test_get_chunks.py b/nx_parallel/tests/test_get_chunks.py index 514b168..db6245a 100644 --- a/nx_parallel/tests/test_get_chunks.py +++ b/nx_parallel/tests/test_get_chunks.py @@ -84,8 +84,9 @@ def iterator_func(G): H, process_func, iterator_func, get_chunks=random_chunking ) if isinstance(c1, types.GeneratorType): - c1, c2 = list(c1), list( - c2 + c1, c2 = ( + list(c1), + list(c2), ) # Convert generators to lists for comparison if func in chk_dict_vals: for i in range(len(G.nodes)): From 0a12b19468c9f72af7384b06215940ebed7ff712 Mon Sep 17 00:00:00 2001 From: dpys Date: Fri, 25 Oct 2024 12:04:52 -0400 Subject: [PATCH 15/19] feat: remove explicit dunder methods for handling state/ attr on ParallelGraph interface, instead making execute_parallel more dynamic --- nx_parallel/interface.py | 12 ------------ nx_parallel/utils/chunk.py | 7 +++++-- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index c901ae8..0f05caa 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -65,18 +65,6 @@ def is_directed(self): def __str__(self): return f"Parallel{self.graph_object}" - def __getattr__(self, attr): - """Delegate attribute access to the underlying NetworkX graph.""" - return getattr(self.graph_object, attr) - - def __getstate__(self): - """Support pickling by returning the state of the underlying graph.""" - return self.graph_object - - def __setstate__(self, state): - """Support unpickling by restoring the underlying graph.""" - self.graph_object = state - def assign_algorithms(cls): """Class decorator to assign algorithms to the class attributes.""" diff --git a/nx_parallel/utils/chunk.py b/nx_parallel/utils/chunk.py index 27ad6b9..c2ccb43 100644 --- a/nx_parallel/utils/chunk.py +++ b/nx_parallel/utils/chunk.py @@ -85,7 +85,7 @@ def get_n_jobs(n_jobs=None): def execute_parallel( - G: nx.Graph, + G, process_func, iterator_func, get_chunks="chunks", @@ -95,7 +95,7 @@ def execute_parallel( Parameters ---------- - G : networkx.Graph + G : networkx.Graph or ParallelGraph The graph on which the algorithm operates. process_func : callable The function to process each chunk. Should accept (G, chunk, **kwargs). @@ -117,6 +117,9 @@ def execute_parallel( """ n_jobs = nxp.get_n_jobs() + if hasattr(G, "graph_object"): + G = G.graph_object + data = iterator_func(G) if get_chunks in {"chunks", "nodes"}: From ae170435ee653dd539ae20383716a46b8caf6e6c Mon Sep 17 00:00:00 2001 From: dpys Date: Fri, 25 Oct 2024 12:07:29 -0400 Subject: [PATCH 16/19] feat: remove stale hasattr check in betweenness --- nx_parallel/algorithms/centrality/betweenness.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index ff5b4f8..30f38e6 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -38,8 +38,6 @@ def betweenness_centrality( iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks. """ - if hasattr(G, "graph_object"): - G = G.graph_object def process_func(G, chunk, weight, endpoints): return _betweenness_centrality_node_subset( @@ -118,8 +116,6 @@ def edge_betweenness_centrality( iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks. """ - if hasattr(G, "graph_object"): - G = G.graph_object def process_func(G, chunk, weight): return _edge_betweenness_centrality_node_subset(G, chunk, weight=weight) From 8c7ec8c53ee2d2f896003cd2fd9c2d4aa76ad56f Mon Sep 17 00:00:00 2001 From: dpys Date: Fri, 25 Oct 2024 12:11:23 -0400 Subject: [PATCH 17/19] feat: remove superfluous kwargs in betweenness --- nx_parallel/algorithms/centrality/betweenness.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 30f38e6..9693bd4 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -24,7 +24,6 @@ def betweenness_centrality( endpoints=False, seed=None, get_chunks="chunks", - **kwargs, ): """The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently. @@ -57,7 +56,6 @@ def iterator_func(G): get_chunks=get_chunks, weight=weight, endpoints=endpoints, - **kwargs, ) # Reducing partial solution @@ -102,7 +100,6 @@ def edge_betweenness_centrality( weight=None, seed=None, get_chunks="nodes", - **kwargs, ): """The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently. @@ -132,7 +129,6 @@ def iterator_func(G): iterator_func=iterator_func, get_chunks=get_chunks, weight=weight, - **kwargs, ) # Reducing partial solution From 5457cedc60d60b6d40fe9ee2b0e685cf3ba3ab7e Mon Sep 17 00:00:00 2001 From: dpys Date: Fri, 25 Oct 2024 12:41:11 -0400 Subject: [PATCH 18/19] fix: revert hasattr check --- nx_parallel/algorithms/centrality/betweenness.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 9693bd4..140164c 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -38,6 +38,9 @@ def betweenness_centrality( `nodes` into `n_jobs` number of chunks. """ + if hasattr(G, "graph_object"): + G = G.graph_object + def process_func(G, chunk, weight, endpoints): return _betweenness_centrality_node_subset( G, chunk, weight=weight, endpoints=endpoints @@ -113,6 +116,8 @@ def edge_betweenness_centrality( iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks. """ + if hasattr(G, "graph_object"): + G = G.graph_object def process_func(G, chunk, weight): return _edge_betweenness_centrality_node_subset(G, chunk, weight=weight) From 12561b17c441c40d4fc2d6f6d63a6ca155e15e53 Mon Sep 17 00:00:00 2001 From: dpys Date: Fri, 25 Oct 2024 12:43:07 -0400 Subject: [PATCH 19/19] fix: revert hasattr check --- _nx_parallel/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index 3e644f1..488a6a9 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -98,7 +98,7 @@ def get_info(): }, }, "edge_betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L100", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L99", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.", "additional_parameters": { 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks."