Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Refactor Parallel Graph Algorithms to Use a Centralized Parallel Configuration with Flexible Iterators #86

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def get_info():
},
},
"betweenness_centrality": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L20",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L19",
"additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently.",
"additional_parameters": {
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks."
Expand All @@ -98,7 +98,7 @@ def get_info():
},
},
"edge_betweenness_centrality": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L96",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L99",
"additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.",
"additional_parameters": {
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks."
Expand Down
84 changes: 46 additions & 38 deletions nx_parallel/algorithms/centrality/betweenness.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from joblib import Parallel, delayed
from networkx.algorithms.centrality.betweenness import (
_accumulate_basic,
_accumulate_endpoints,
Expand Down Expand Up @@ -38,31 +37,35 @@ def betweenness_centrality(
iterable `node_chunks`. The default chunking is done by slicing the
`nodes` into `n_jobs` number of chunks.
"""

if hasattr(G, "graph_object"):
G = G.graph_object

dPys marked this conversation as resolved.
Show resolved Hide resolved
if k is None:
nodes = G.nodes
else:
nodes = seed.sample(list(G.nodes), k)

n_jobs = nxp.get_n_jobs()
def process_func(G, chunk, weight, endpoints):
return _betweenness_centrality_node_subset(
G, chunk, weight=weight, endpoints=endpoints
)

if get_chunks == "chunks":
node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes)
else:
node_chunks = get_chunks(nodes)

bt_cs = Parallel()(
delayed(_betweenness_centrality_node_subset)(G, chunk, weight, endpoints)
for chunk in node_chunks
def iterator_func(G):
if k is None:
return G.nodes
else:
return seed.sample(list(G.nodes), k)

bt_cs = nxp.utils.chunk.execute_parallel(
G,
process_func=process_func,
iterator_func=iterator_func,
get_chunks=get_chunks,
weight=weight,
endpoints=endpoints,
)

# Reducing partial solution
bt_c = bt_cs[0]
for bt in bt_cs[1:]:
for n in bt:
bt_c[n] += bt[n]
bt_c = {}
for bt in bt_cs:
for n, value in bt.items():
bt_c[n] = bt_c.get(n, 0.0) + value

betweenness = _rescale(
bt_c,
Expand Down Expand Up @@ -94,7 +97,12 @@ def _betweenness_centrality_node_subset(G, nodes, weight=None, endpoints=False):
@nxp._configure_if_nx_active()
@py_random_state(4)
def edge_betweenness_centrality(
G, k=None, normalized=True, weight=None, seed=None, get_chunks="chunks"
G,
k=None,
normalized=True,
weight=None,
seed=None,
get_chunks="nodes",
):
"""The parallel computation is implemented by dividing the nodes into chunks and
computing edge betweenness centrality for each chunk concurrently.
Expand All @@ -111,28 +119,28 @@ def edge_betweenness_centrality(
if hasattr(G, "graph_object"):
G = G.graph_object

if k is None:
nodes = G.nodes
else:
nodes = seed.sample(list(G.nodes), k)
def process_func(G, chunk, weight):
return _edge_betweenness_centrality_node_subset(G, chunk, weight=weight)

n_jobs = nxp.get_n_jobs()

if get_chunks == "chunks":
node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes)
else:
node_chunks = get_chunks(nodes)

bt_cs = Parallel()(
delayed(_edge_betweenness_centrality_node_subset)(G, chunk, weight)
for chunk in node_chunks
def iterator_func(G):
if k is None:
return G.nodes
else:
return seed.sample(list(G.nodes), k)

bt_cs = nxp.utils.chunk.execute_parallel(
G,
process_func=process_func,
iterator_func=iterator_func,
get_chunks=get_chunks,
weight=weight,
)

# Reducing partial solution
bt_c = bt_cs[0]
for bt in bt_cs[1:]:
for e in bt:
bt_c[e] += bt[e]
bt_c = {}
for partial_bt in bt_cs:
for edge, value in partial_bt.items():
bt_c[edge] = bt_c.get(edge, 0.0) + value

for n in G: # remove nodes to only return edges
del bt_c[n]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,41 @@
import math


def test_betweenness_centrality_get_chunks():
def get_chunk(nodes):
def test_edge_betweenness_centrality_get_chunks():
def get_chunk(edges):
num_chunks = nxp.get_n_jobs()
nodes_ebc = {i: 0 for i in nodes}
for i in ebc:
nodes_ebc[i[0]] += ebc[i]
nodes_ebc[i[1]] += ebc[i]

sorted_nodes = sorted(nodes_ebc.items(), key=lambda x: x[1], reverse=True)
edges = list(edges)

chunks = [[] for _ in range(num_chunks)]
chunk_sums = [0] * num_chunks
# Split edges into chunks without relying on precomputed centrality
chunk_size = max(1, len(edges) // num_chunks)
chunks = [edges[i : i + chunk_size] for i in range(0, len(edges), chunk_size)]

for node, value in sorted_nodes:
min_chunk_index = chunk_sums.index(min(chunk_sums))
chunks[min_chunk_index].append(node)
chunk_sums[min_chunk_index] += value
print(f"Chunks distribution: {chunks}")

return chunks

G = nx.fast_gnp_random_graph(100, 0.1, directed=False)
H = nxp.ParallelGraph(G)
ebc = nx.edge_betweenness_centrality(G)
par_bc_chunk = nxp.betweenness_centrality(H, get_chunks=get_chunk) # smoke test
par_bc = nxp.betweenness_centrality(H)

for i in range(len(G.nodes)):
assert math.isclose(par_bc[i], par_bc_chunk[i], abs_tol=1e-16)
# get_chunk is faster than default(for big graphs)
# G = nx.bipartite.random_graph(400, 700, 0.8, seed=5, directed=False)

ebc = nx.edge_betweenness_centrality(G, normalized=True)

print(f"NetworkX Edge Betweenness Centrality: {ebc}")

backend = nxp.BackendInterface()

# Smoke test for edge_betweenness_centrality with custom get_chunks
par_bc_chunk = backend.edge_betweenness_centrality(
H.graph_object,
get_chunks=get_chunk,
)

print(f"Parallel Computed Edge Betweenness Centrality: {par_bc_chunk}")

# Compare with standard edge betweenness centrality
standard_bc = nx.edge_betweenness_centrality(G, normalized=True)

for edge in standard_bc:
assert math.isclose(
par_bc_chunk[edge], standard_bc[edge], abs_tol=1e-6
), f"Edge {edge} mismatch: {par_bc_chunk[edge]} vs {standard_bc[edge]}"
6 changes: 3 additions & 3 deletions nx_parallel/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ def __str__(self):
def assign_algorithms(cls):
"""Class decorator to assign algorithms to the class attributes."""
for attr in ALGORITHMS:
# get the function name by parsing the module hierarchy
func_name = attr.rsplit(".", 1)[-1]
setattr(cls, func_name, attrgetter(attr)(algorithms))
setattr(
cls, attr.rsplit(".", 1)[-1], staticmethod(attrgetter(attr)(algorithms))
)
return cls


Expand Down
35 changes: 24 additions & 11 deletions nx_parallel/tests/test_get_chunks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# smoke tests for all functions supporting `get_chunks` kwarg

import inspect
import importlib
import random
Expand All @@ -10,7 +8,7 @@
import nx_parallel as nxp


def get_all_functions(package_name="nx_parallel"):
def get_all_functions(package_name="nx_parallel.algorithms"):
"""Returns a dict keyed by function names to its arguments.

This function constructs a dictionary keyed by the function
Expand All @@ -32,8 +30,8 @@ def get_functions_with_get_chunks():
"""Returns a list of function names with the `get_chunks` kwarg."""
all_funcs = get_all_functions()
get_chunks_funcs = []
for func in all_funcs:
if "get_chunks" in all_funcs[func]["args"]:
for func, params in all_funcs.items():
if "get_chunks" in params["args"]:
get_chunks_funcs.append(func)
return get_chunks_funcs

Expand All @@ -47,6 +45,15 @@ def random_chunking(nodes):
num_in_chunk = max(len(_nodes) // num_chunks, 1)
return nxp.chunks(_nodes, num_in_chunk)

# Define a simple process_func for testing
def process_func(G, chunk, **kwargs):
# Example: Return the degree of each node in the chunk
return {node: G.degree(node) for node in chunk}

# Define a simple iterator_func for testing
def iterator_func(G):
return G.nodes()

get_chunks_funcs = get_functions_with_get_chunks()
ignore_funcs = [
"number_of_isolates",
Expand All @@ -62,19 +69,25 @@ def random_chunking(nodes):
G = nx.fast_gnp_random_graph(50, 0.6, seed=42)
H = nxp.ParallelGraph(G)
for func in get_chunks_funcs:
print(func)
if func not in ignore_funcs:
if func in tournament_funcs:
G = nx.tournament.random_tournament(50, seed=42)
H = nxp.ParallelGraph(G)
c1 = getattr(nxp, func)(H)
c2 = getattr(nxp, func)(H, get_chunks=random_chunking)
c1 = getattr(nxp, func)(H, process_func, iterator_func)
c2 = getattr(nxp, func)(
H, process_func, iterator_func, get_chunks=random_chunking
)
assert c1 == c2
else:
c1 = getattr(nxp, func)(H)
c2 = getattr(nxp, func)(H, get_chunks=random_chunking)
c1 = getattr(nxp, func)(H, process_func, iterator_func)
c2 = getattr(nxp, func)(
H, process_func, iterator_func, get_chunks=random_chunking
)
if isinstance(c1, types.GeneratorType):
c1, c2 = dict(c1), dict(c2)
c1, c2 = (
list(c1),
list(c2),
) # Convert generators to lists for comparison
if func in chk_dict_vals:
for i in range(len(G.nodes)):
assert math.isclose(c1[i], c2[i], abs_tol=1e-16)
Expand Down
Loading
Loading