diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 6f92426b..db8d8d59 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -24,7 +24,7 @@ jobs: - name: Install packages run: | pip install --upgrade pip - pip install pre-commit joblib networkx + pip install -e ."[developer]" - name: Lint run: pre-commit run --all-files --show-diff-on-failure --color always diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b672532f..3160eae9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,6 +2,13 @@ # pre-commit install repos: + - repo: local + hooks: + - id: update-get_info + name: Update function info + entry: sh _nx_parallel/script.sh + language: system + pass_filenames: false - repo: https://github.com/adamchainz/blacken-docs rev: 1.16.0 hooks: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d13a3bc4..8d538c34 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -113,7 +113,7 @@ def parallel_func(G, nx_arg, additional_backend_arg_1, additional_backend_arg_2= In parallel computing, "chunking" refers to dividing a large task into smaller, more manageable chunks that can be processed simultaneously by multiple computing units, such as CPU cores or distributed computing nodes. It's like breaking down a big task into smaller pieces so that multiple workers can work on different pieces at the same time, and in the case of nx-parallel, this usually speeds up the overall process. -The default chunking in nx-parallel is done by first determining the number of available CPU cores and then allocating the nodes (or edges or any other iterator) per chunk by dividing the total number of nodes by the total CPU cores available. (ref. [chunk.py](./nx_parallel/utils/chunk.py)). This default chunking can be overridden by the user by passing a custom `get_chunks` function to the algorithm as a kwarg. While adding a new algorithm, you can change this default chunking, if necessary (ref. [PR](https://github.com/networkx/nx-parallel/pull/33)). Also, when [the `config` PR](https://github.com/networkx/networkx/pull/7225) is merged in networkx, and the `config` will be added to nx-parallel, then the user would be able to control the number of CPU cores they would want to use and then the chunking would be done accordingly. +The default chunking in nx-parallel is done by slicing the list of nodes (or edges or any other iterator) into `n_jobs` number of chunks. (ref. [chunk.py](./nx_parallel/utils/chunk.py)). By default, `n_jobs` is `None`. To learn about how you can modify the value of `n_jobs` and other config options refer [`Config.md`](./Config.md). The default chunking can be overridden by the user by passing a custom `get_chunks` function to the algorithm as a kwarg. While adding a new algorithm, you can change this default chunking, if necessary (ref. [PR](https://github.com/networkx/nx-parallel/pull/33)). ## General guidelines on adding a new algorithm diff --git a/Config.md b/Config.md new file mode 100644 index 00000000..4d3ed3a3 --- /dev/null +++ b/Config.md @@ -0,0 +1,156 @@ +# Configuring nx-parallel + +`nx-parallel` provides flexible parallel computing capabilities, allowing you to control settings like `backend`, `n_jobs`, `verbose`, and more. This can be done through two configuration systems: `joblib` and `NetworkX`. This guide explains how to configure `nx-parallel` using both systems. + +## 1. Setting configs using `joblib.parallel_config` + +`nx-parallel` relies on [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) for parallel computing. You can adjust its settings through the [`joblib.parallel_config`](https://joblib.readthedocs.io/en/latest/generated/joblib.parallel_config.html) class provided by `joblib`. For more details, check out the official [joblib documentation](https://joblib.readthedocs.io/en/latest/parallel.html). + +### 1.1 Usage + +```python +from joblib import parallel_config + +# Setting global configs +parallel_config(n_jobs=3, verbose=50) +nx.square_clustering(H) + +# Setting configs in a context +with parallel_config(n_jobs=7, verbose=0): + nx.square_clustering(H) +``` + +Please refer the [official joblib's documentation](https://joblib.readthedocs.io/en/latest/generated/joblib.parallel_config.html) to better understand the config parameters. + +Note: Ensure that `nx.config.backends.parallel.active = False` when using `joblib` for configuration, as NetworkX configurations will override `joblib.parallel_config` settings if `active` is `True`. + +## 2. Setting configs using `networkx`'s configuration system for backends + +To use NetworkX’s configuration system in `nx-parallel`, you must set the `active` flag (in `nx.config.backends.parallel`) to `True`. + +### 2.1 Configs in NetworkX for backends + +When you import NetworkX, it automatically sets default configurations for all installed backends, including `nx-parallel`. + +```python +import networkx as nx + +print(nx.config) +``` + +Output: + +``` +NetworkXConfig( + backend_priority=[], + backends=Config( + parallel=ParallelConfig( + active=False, + backend="loky", + n_jobs=None, + verbose=0, + temp_folder=None, + max_nbytes="1M", + mmap_mode="r", + prefer=None, + require=None, + inner_max_num_threads=None, + backend_params={}, + ) + ), + cache_converted_graphs=True, +) +``` + +As you can see in the above output, by default, `active` is set to `False`. So, to enable NetworkX configurations for `nx-parallel`, set `active` to `True`. Please refer the [NetworkX's official backend and config docs](https://networkx.org/documentation/latest/reference/backends.html) for more on networkx configuration system. + +### 2.2 Usage + +```python +# enabling networkx's config for nx-parallel +nx.config.backends.parallel.active = True + +# Setting global configs +nxp_config = nx.config.backends.parallel +nxp_config.n_jobs = 3 +nxp_config.verbose = 50 + +nx.square_clustering(H) + +# Setting config in a context +with nxp_config(n_jobs=7, verbose=0): + nx.square_clustering(H) +``` + +The configuration parameters are the same as `joblib.parallel_config`, so you can refer to the [official joblib's documentation](https://joblib.readthedocs.io/en/latest/generated/joblib.parallel_config.html) to better understand these config parameters. + +### 2.3 How Does NetworkX's Configuration Work in nx-parallel? + +In `nx-parallel`, there's a `_configure_if_nx_active` decorator applied to all algorithms. This decorator checks the value of `active`(in `nx.config.backends.parallel`) and then accordingly uses the appropriate configuration system (`joblib` or `networkx`). If `active=True`, it extracts the configs from `nx.config.backends.parallel` and passes them in a `joblib.parallel_config` context manager and calls the function in this context. Otherwise, it simply calls the function. + +## 3. Comparing NetworkX and Joblib Configuration Systems + +### 3.1 Using Both Systems Simultaneously + +You can use both NetworkX’s configuration system and `joblib.parallel_config` together in `nx-parallel`. However, it’s important to understand their interaction. + +Example: + +```py +# Enable NetworkX configuration +nx.config.backends.parallel.active = True +nx.config.backends.parallel.n_jobs = 6 + +# Global Joblib configuration +joblib.parallel_config(backend="threading") + +with joblib.parallel_config(n_jobs=4, verbose=55): + # NetworkX config for nx-parallel + # backend="loky", n_jobs=6, verbose=0 + nx.square_clustering(G, backend="parallel") + + # Joblib config for other parallel tasks + # backend="threading", n_jobs=4, verbose=55 + joblib.Parallel()(joblib.delayed(sqrt)(i**2) for i in range(10)) +``` + +- **NetworkX Configurations for nx-parallel**: When calling functions within `nx-parallel`, NetworkX’s configurations will override those specified by Joblib. For example, the `nx.square_clustering` function will use the `n_jobs=6` setting from `nx.config.backends.parallel`, regardless of any Joblib settings within the same context. + +- **Joblib Configurations for Other Code**: For any other parallel code outside of `nx-parallel`, such as a direct call to `joblib.Parallel`, the configurations specified within the Joblib context will be applied. + +This behavior ensures that `nx-parallel` functions consistently use NetworkX’s settings when enabled, while still allowing Joblib configurations to apply to non-NetworkX parallel tasks. + +**Key Takeaway**: When both systems are used together, NetworkX's configuration (`nx.config.backends.parallel`) takes precedence for `nx-parallel` functions. To avoid unexpected behavior, ensure that the `active` setting aligns with your intended configuration system. + +### 3.2 Key Differences + +- **Parameter Handling**: The main difference is how `backend_params` are passed. Since, in networkx configurations are stored as a [`@dataclass`](https://docs.python.org/3/library/dataclasses.html), we need to pass them as a dictionary, whereas in `joblib.parallel_config` you can just pass them along with the other configurations, as shown below: + + ```py + nx.config.backends.parallel.backend_params = {"max_nbytes": None} + joblib.parallel_config(backend="loky", max_nbytes=None) + ``` + +- **Default Behavior**: By default, `nx-parallel` looks for configs in `joblib.parallel_config` unless `nx.config.backends.parallel.active` is set to `True`. + +### 3.3 When Should You Use Which System? + +When the only networkx backend you're using is `nx-parallel`, then either of the NetworkX or `joblib` configuration systems can be used, depending on your preference. + +But, when working with multiple NetworkX backends, it's crucial to ensure compatibility among the backends to avoid conflicts between different configurations. In such cases, using NetworkX's configuration system to configure `nx-parallel` is recommended. This approach helps maintain consistency across backends. For example: + +```python +nx.config.backend_priority = ["another_nx_backend", "parallel"] +nx.config.backends.another_nx_backend.config_1 = "xyz" +joblib.parallel_config(n_jobs=7, verbose=50) + +nx.square_clustering(G) +``` + +In this example, if `another_nx_backend` also internally utilizes `joblib.Parallel` (without exposing it to the user) within its implementation of the `square_clustering` algorithm, then the `nx-parallel` configurations set by `joblib.parallel_config` will influence the internal `joblib.Parallel` used by `another_nx_backend`. To prevent unexpected behavior, it is advisable to configure these settings through the NetworkX configuration system. + +**Future Synchronization:** We are working on synchronizing both configuration systems so that changes in one system automatically reflect in the other. This started with [PR#68](https://github.com/networkx/nx-parallel/pull/68), which introduced a unified context manager for `nx-parallel`. For more details on the challenges of creating a compatibility layer to keep both systems in sync, refer to [Issue#76](https://github.com/networkx/nx-parallel/issues/76). + +If you have feedback or suggestions, feel free to open an issue or submit a pull request. + +Thank you :) diff --git a/README.md b/README.md index 76bd175d..7c09a1f0 100644 --- a/README.md +++ b/README.md @@ -4,26 +4,26 @@ nx-parallel is a NetworkX backend that uses joblib for parallelization. This pro ## Algorithms in nx-parallel -- [number_of_isolates](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/isolate.py#L8) -- [square_clustering](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/cluster.py#L10) -- [local_efficiency](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/efficiency_measures.py#L9) -- [closeness_vitality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/vitality.py#L9) -- [is_reachable](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L10) -- [tournament_is_strongly_connected](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L54) -- [all_pairs_node_connectivity](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/connectivity/connectivity.py#L17) -- [approximate_all_pairs_node_connectivity](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/approximation/connectivity.py#L12) -- [betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L19) -- [edge_betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L94) -- [node_redundancy](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/bipartite/redundancy.py#L11) -- [all_pairs_dijkstra](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L28) -- [all_pairs_dijkstra_path_length](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L71) -- [all_pairs_dijkstra_path](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L121) -- [all_pairs_bellman_ford_path_length](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L164) -- [all_pairs_bellman_ford_path](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L209) -- [johnson](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L252) -- [all_pairs_all_shortest_paths](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/generic.py#L10) -- [all_pairs_shortest_path_length](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/unweighted.py#L18) -- [all_pairs_shortest_path](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/unweighted.py#L62) +- [all_pairs_all_shortest_paths](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/generic.py#L11) +- [all_pairs_bellman_ford_path](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L212) +- [all_pairs_bellman_ford_path_length](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L168) +- [all_pairs_dijkstra](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L29) +- [all_pairs_dijkstra_path](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L124) +- [all_pairs_dijkstra_path_length](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L73) +- [all_pairs_node_connectivity](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/connectivity/connectivity.py#L18) +- [all_pairs_shortest_path](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/unweighted.py#L63) +- [all_pairs_shortest_path_length](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/unweighted.py#L19) +- [approximate_all_pairs_node_connectivity](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/approximation/connectivity.py#L13) +- [betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L20) +- [closeness_vitality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/vitality.py#L10) +- [edge_betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L96) +- [is_reachable](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L13) +- [johnson](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L256) +- [local_efficiency](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/efficiency_measures.py#L10) +- [node_redundancy](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/bipartite/redundancy.py#L12) +- [number_of_isolates](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/isolate.py#L9) +- [square_clustering](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/cluster.py#L11) +- [tournament_is_strongly_connected](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L59)
Script used to generate the above list @@ -107,6 +107,12 @@ Note that for all functions inside `nx_code.py` that do not have an nx-parallel import networkx as nx import nx_parallel as nxp +# enabling networkx's config for nx-parallel +nx.config.backends.parallel.active = True + +# setting `n_jobs` (by default, `n_jobs=None`) +nx.config.backends.parallel.n_jobs = 4 + G = nx.path_graph(4) H = nxp.ParallelGraph(G) @@ -121,10 +127,10 @@ nxp.betweenness_centrality(G) # method 4 : using nx-parallel implementation with ParallelGraph object nxp.betweenness_centrality(H) - -# output : {0: 0.0, 1: 0.6666666666666666, 2: 0.6666666666666666, 3: 0.0} ``` +For more on how to play with configurations in nx-parallel refer the [Config.md](./Config.md)! Additionally, refer the [NetworkX's official backend and config docs](https://networkx.org/documentation/latest/reference/backends.html) for more on functionalities provided by networkx for backends and configs like logging, `backend_priority`, etc. Another way to configure nx-parallel is by using [`joblib.parallel_config`](https://joblib.readthedocs.io/en/latest/generated/joblib.parallel_config.html). + ### Notes 1. Some functions in networkx have the same name but different implementations, so to avoid these name conflicts at the time of dispatching networkx differentiates them by specifying the `name` parameter in the `_dispatchable` decorator of such algorithms. So, `method 3` and `method 4` are not recommended. But, you can use them if you know the correct `name`. For example: diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index af79e551..449682ce 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -1,5 +1,7 @@ # This file was automatically generated by update_get_info.py +from .config import _config + def get_info(): """Return a dictionary with information about the package.""" @@ -8,164 +10,147 @@ def get_info(): "project": "nx-parallel", "package": "nx_parallel", "url": "https://github.com/networkx/nx-parallel", - "short_summary": "Parallel backend for NetworkX algorithms", + "short_summary": "A networkx backend that uses joblib to run graph algorithms in parallel. Find the nx-parallel's configuration guide `here `_", + "default_config": _config, "functions": { - "number_of_isolates": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/isolate.py#L8", - "additional_docs": "The parallel computation is implemented by dividing the list of isolated nodes into chunks and then finding the length of each chunk in parallel and then adding all the lengths at the end.", + "all_pairs_all_shortest_paths": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/generic.py#L11", + "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute all shortest paths between all nodes for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across `n_jobs` number of CPU cores.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the isolated nodes as input and returns an iterable `isolate_chunks`. The default chunking is done by slicing the `isolates` into `n` chunks, where `n` is the total number of CPU cores available." + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n_jobs` number of chunks." }, }, - "square_clustering": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/cluster.py#L10", - "additional_docs": "The nodes are chunked into `node_chunks` and then the square clustering coefficient for all `node_chunks` are computed in parallel over all available CPU cores.", + "all_pairs_bellman_ford_path": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L212", + "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths for each node_chunk, and then employs joblib's `Parallel` function to execute these computations in parallel across `n_jobs` number of CPU cores.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes (or nbunch) as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n_jobs` number of chunks." }, }, - "local_efficiency": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/efficiency_measures.py#L9", - "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and then computing and adding global efficiencies of all node in all chunks, in parallel, and then adding all these sums and dividing by the total number of nodes at the end.", + "all_pairs_bellman_ford_path_length": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L168", + "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths lengths for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across `n_jobs` number of CPU cores.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n` chunks, where `n` is the total number of CPU cores available." + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n_jobs` number of chunks." }, }, - "closeness_vitality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/vitality.py#L9", - "additional_docs": "The parallel computation is implemented only when the node is not specified. The closeness vitality for each node is computed concurrently.", + "all_pairs_dijkstra": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L29", + "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths and lengths for each `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across `n_jobs` number of CPU cores.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n` chunks, where `n` is the total number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n_jobs` number of chunks." }, }, - "is_reachable": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L12", - "additional_docs": "The function parallelizes the calculation of two neighborhoods of vertices in `G` and checks closure conditions for each neighborhood subset in parallel.", + "all_pairs_dijkstra_path": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L124", + "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths for each `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across `n_jobs` number of CPU cores.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n` chunks, where `n` is the total number of CPU cores available." + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n_jobs` number of chunks." }, }, - "tournament_is_strongly_connected": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L59", - "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and then checking whether each node is reachable from each other node in parallel.", + "all_pairs_dijkstra_path_length": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L73", + "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths lengths for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across `n_jobs` number of CPU cores.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n` chunks, where `n` is the total number of CPU cores available." + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n_jobs` number of chunks." }, }, "all_pairs_node_connectivity": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/connectivity/connectivity.py#L17", - "additional_docs": "The parallel implementation first divides a list of all permutation (in case of directed graphs) and combinations (in case of undirected graphs) of `nbunch` into chunks and then creates a generator to lazily compute the local node connectivities for each chunk, and then employs joblib's `Parallel` function to execute these computations in parallel across all available CPU cores. At the end, the results are aggregated into a single dictionary and returned.", - "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in `list(iter_func(nbunch, 2))` as input and returns an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of directed graphs and `combinations` in case of undirected graphs. The default is to create chunks by slicing the list into `n` chunks, where `n` is the number of CPU cores, such that size of each chunk is atmost 10, and at least 1." - }, - }, - "approximate_all_pairs_node_connectivity": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/approximation/connectivity.py#L12", - "additional_docs": "The parallel implementation first divides the a list of all permutation (in case of directed graphs) and combinations (in case of undirected graphs) of `nbunch` into chunks and then creates a generator to lazily compute the local node connectivities for each chunk, and then employs joblib's `Parallel` function to execute these computations in parallel across all available CPU cores. At the end, the results are aggregated into a single dictionary and returned.", - "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in `list(iter_func(nbunch, 2))` as input and returns an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of directed graphs and `combinations` in case of undirected graphs. The default is to create chunks by slicing the list into `n` chunks, where `n` is the number of CPU cores, such that size of each chunk is atmost 10, and at least 1." - }, - }, - "betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L19", - "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently.", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/connectivity/connectivity.py#L18", + "additional_docs": "The parallel implementation first divides a list of all permutation (in case of directed graphs) and combinations (in case of undirected graphs) of `nbunch` into chunks and then creates a generator to lazily compute the local node connectivities for each chunk, and then employs joblib's `Parallel` function to execute these computations in parallel across `n_jobs` number of CPU cores. At the end, the results are aggregated into a single dictionary and returned.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in `list(iter_func(nbunch, 2))` as input and returns an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of directed graphs and `combinations` in case of undirected graphs. The default is to create chunks by slicing the list into `n_jobs` number of chunks, such that size of each chunk is atmost 10, and at least 1." }, }, - "edge_betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L94", - "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.", + "all_pairs_shortest_path": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/unweighted.py#L63", + "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths for each `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across `n_jobs` number of CPU cores.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n_jobs` number of chunks." }, }, - "node_redundancy": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/bipartite/redundancy.py#L11", - "additional_docs": "In the parallel implementation we divide the nodes into chunks and compute the node redundancy coefficients for all `node_chunk` in parallel.", + "all_pairs_shortest_path_length": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/unweighted.py#L19", + "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths lengths for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across `n_jobs` number of CPU cores.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` (or `nodes`) into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n_jobs` number of chunks." }, }, - "all_pairs_dijkstra": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L28", - "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths and lengths for each `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across all available CPU cores.", + "approximate_all_pairs_node_connectivity": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/approximation/connectivity.py#L13", + "additional_docs": "The parallel implementation first divides the a list of all permutation (in case of directed graphs) and combinations (in case of undirected graphs) of `nbunch` into chunks and then creates a generator to lazily compute the local node connectivities for each chunk, and then employs joblib's `Parallel` function to execute these computations in parallel across `n_jobs` number of CPU cores. At the end, the results are aggregated into a single dictionary and returned.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in `list(iter_func(nbunch, 2))` as input and returns an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of directed graphs and `combinations` in case of undirected graphs. The default is to create chunks by slicing the list into `n_jobs` chunks, such that size of each chunk is atmost 10, and at least 1." }, }, - "all_pairs_dijkstra_path_length": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L71", - "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths lengths for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across all available CPU cores.", + "betweenness_centrality": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L20", + "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." }, }, - "all_pairs_dijkstra_path": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L121", - "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths for each `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across all available CPU cores.", + "closeness_vitality": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/vitality.py#L10", + "additional_docs": "The parallel computation is implemented only when the node is not specified. The closeness vitality for each node is computed concurrently.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." }, }, - "all_pairs_bellman_ford_path_length": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L164", - "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths lengths for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across all available CPU cores.", + "edge_betweenness_centrality": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L96", + "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." }, }, - "all_pairs_bellman_ford_path": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L209", - "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths for each node_chunk, and then employs joblib's `Parallel` function to execute these computations in parallel across all available CPU cores.", + "is_reachable": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L13", + "additional_docs": "The function parallelizes the calculation of two neighborhoods of vertices in `G` and checks closure conditions for each neighborhood subset in parallel.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." }, }, "johnson": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L252", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L256", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing the shortest paths using Johnson's Algorithm for each chunk in parallel.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n_jobs` number of chunks." }, }, - "all_pairs_all_shortest_paths": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/generic.py#L10", - "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute all shortest paths between all nodes for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across all available CPU cores.", + "local_efficiency": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/efficiency_measures.py#L10", + "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and then computing and adding global efficiencies of all node in all chunks, in parallel, and then adding all these sums and dividing by the total number of nodes at the end.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." }, }, - "all_pairs_shortest_path_length": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/unweighted.py#L18", - "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths lengths for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across all available CPU cores.", + "node_redundancy": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/bipartite/redundancy.py#L12", + "additional_docs": "In the parallel implementation we divide the nodes into chunks and compute the node redundancy coefficients for all `node_chunk` in parallel.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` (or `nodes`) into `n_jobs` number of chunks." }, }, - "all_pairs_shortest_path": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/unweighted.py#L63", - "additional_docs": "The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths for each `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in parallel across all available CPU cores.", + "number_of_isolates": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/isolate.py#L9", + "additional_docs": "The parallel computation is implemented by dividing the list of isolated nodes into chunks and then finding the length of each chunk in parallel and then adding all the lengths at the end.", "additional_parameters": { - 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores." + 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the isolated nodes as input and returns an iterable `isolate_chunks`. The default chunking is done by slicing the `isolates` into `n_jobs` number of chunks." }, }, - "chunks": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L8", - "additional_docs": "Divides an iterable into chunks of size n", - "additional_parameters": None, - }, - "cpu_count": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L18", - "additional_docs": "Returns the number of logical CPUs or cores", - "additional_parameters": None, + "square_clustering": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/cluster.py#L11", + "additional_docs": "The nodes are chunked into `node_chunks` and then the square clustering coefficient for all `node_chunks` are computed in parallel over `n_jobs` number of CPU cores.", + "additional_parameters": { + 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes (or nbunch) as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." + }, }, - "create_iterables": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L26", - "additional_docs": "Creates an iterable of function inputs for parallel computation based on the provided iterator type.", + "tournament_is_strongly_connected": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L59", + "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and then checking whether each node is reachable from each other node in parallel.", "additional_parameters": { - "G : NetworkX graph": "iterator : str Type of iterator. Valid values are 'node', 'edge', 'isolate'", - "iterable : Iterable": "An iterable of function inputs.", + 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." }, }, }, diff --git a/_nx_parallel/config.py b/_nx_parallel/config.py new file mode 100644 index 00000000..4f7ff495 --- /dev/null +++ b/_nx_parallel/config.py @@ -0,0 +1,25 @@ +from networkx.utils.configs import Config +from typing import Union +from dataclasses import dataclass, field + +__all__ = [ + "_config", +] + + +@dataclass +class ParallelConfig(Config): + active: bool = False + backend: str = "loky" + n_jobs: int = None + verbose: int = 0 + temp_folder: str = None + max_nbytes: Union[int, str] = "1M" + mmap_mode: str = "r" + prefer: str = None + require: str = None + inner_max_num_threads: int = None + backend_params: dict = field(default_factory=dict) + + +_config = ParallelConfig() diff --git a/_nx_parallel/script.sh b/_nx_parallel/script.sh index e57323d9..cb8b3389 100644 --- a/_nx_parallel/script.sh +++ b/_nx_parallel/script.sh @@ -1,6 +1,6 @@ #!/bin/bash -python _nx_parallel/update_get_info.py +python3 _nx_parallel/update_get_info.py ruff format "_nx_parallel/temp__init__.py" diff --git a/_nx_parallel/update_get_info.py b/_nx_parallel/update_get_info.py index 96346654..6611dd4d 100644 --- a/_nx_parallel/update_get_info.py +++ b/_nx_parallel/update_get_info.py @@ -20,6 +20,8 @@ def get_funcs_info(): nx_parallel_dir = os.path.join(os.getcwd(), "nx_parallel") for root, dirs, files in os.walk(nx_parallel_dir): + if root[-29:] == "nx-parallel/nx_parallel/utils": + continue for file in files: if ( file.endswith(".py") @@ -34,7 +36,8 @@ def get_funcs_info(): "additional_docs": extract_add_docs(d[func]), "additional_parameters": extract_add_params(d[func]), } - return funcs + sorted_funcs = dict(sorted(funcs.items())) + return sorted_funcs def extract_docstrings_from_file(file_path): @@ -58,7 +61,9 @@ def extract_docstrings_from_file(file_path): and node.targets[0].id == "__all__" ): all_list = [ - expr.s for expr in node.value.elts if isinstance(expr, ast.Str) + expr.value + for expr in node.value.elts + if isinstance(expr, ast.Constant) ] elif isinstance(node, ast.FunctionDef): if all_list and node.name in all_list: @@ -136,6 +141,7 @@ def get_url(file_path, function_name): string = '''# This file was automatically generated by update_get_info.py +from .config import _config def get_info(): """Return a dictionary with information about the package.""" @@ -144,7 +150,8 @@ def get_info(): "project": "nx-parallel", "package": "nx_parallel", "url": "https://github.com/networkx/nx-parallel", - "short_summary": "Parallel backend for NetworkX algorithms", + "short_summary": "A networkx backend that uses joblib to run graph algorithms in parallel. Find the nx-parallel's configuration guide `here `_", + "default_config": _config, "functions": ''' with open("_nx_parallel/temp__init__.py", "w") as f: diff --git a/nx_parallel/__init__.py b/nx_parallel/__init__.py index ebaaffab..bc6e102b 100644 --- a/nx_parallel/__init__.py +++ b/nx_parallel/__init__.py @@ -1,5 +1,6 @@ +from .utils.decorators import _configure_if_nx_active +from .utils import * from .algorithms import * from .interface import * -from .utils import * __version__ = "0.3rc0.dev0" diff --git a/nx_parallel/algorithms/approximation/connectivity.py b/nx_parallel/algorithms/approximation/connectivity.py index 0a23ecd4..19b02f5b 100644 --- a/nx_parallel/algorithms/approximation/connectivity.py +++ b/nx_parallel/algorithms/approximation/connectivity.py @@ -9,6 +9,7 @@ ] +@nxp._configure_if_nx_active() def approximate_all_pairs_node_connectivity( G, nbunch=None, cutoff=None, get_chunks="chunks" ): @@ -16,8 +17,8 @@ def approximate_all_pairs_node_connectivity( of directed graphs) and combinations (in case of undirected graphs) of `nbunch` into chunks and then creates a generator to lazily compute the local node connectivities for each chunk, and then employs joblib's `Parallel` function to - execute these computations in parallel across all available CPU cores. At the end, - the results are aggregated into a single dictionary and returned. + execute these computations in parallel across `n_jobs` number of CPU cores. At the + end, the results are aggregated into a single dictionary and returned. Note, this function uses the name `approximate_all_pairs_node_connectivity` while dispatching to the backend in=mplementation. So, `nxp.all_pairs_node_connectivity` @@ -32,8 +33,8 @@ def approximate_all_pairs_node_connectivity( A function that takes in `list(iter_func(nbunch, 2))` as input and returns an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of directed graphs and `combinations` in case of undirected graphs. The default - is to create chunks by slicing the list into `n` chunks, where `n` is the - number of CPU cores, such that size of each chunk is atmost 10, and at least 1. + is to create chunks by slicing the list into `n_jobs` chunks, such that size + of each chunk is atmost 10, and at least 1. """ if hasattr(G, "graph_object"): @@ -59,9 +60,9 @@ def _process_pair_chunk(pairs_chunk): ] pairs = list(iter_func(nbunch, 2)) - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(min(len(pairs) // total_cores, 10), 1) + num_in_chunk = max(min(len(pairs) // n_jobs, 10), 1) pairs_chunks = nxp.chunks(pairs, num_in_chunk) else: pairs_chunks = get_chunks(pairs) @@ -70,7 +71,7 @@ def _process_pair_chunk(pairs_chunk): delayed(_process_pair_chunk)(pairs_chunk) for pairs_chunk in pairs_chunks ) - for nc_chunk in Parallel(n_jobs=total_cores)(nc_chunk_generator): + for nc_chunk in Parallel()(nc_chunk_generator): for u, v, k in nc_chunk: all_pairs[u][v] = k if not directed: diff --git a/nx_parallel/algorithms/bipartite/redundancy.py b/nx_parallel/algorithms/bipartite/redundancy.py index 69da5713..adced3f7 100644 --- a/nx_parallel/algorithms/bipartite/redundancy.py +++ b/nx_parallel/algorithms/bipartite/redundancy.py @@ -8,6 +8,7 @@ __all__ = ["node_redundancy"] +@nxp._configure_if_nx_active() def node_redundancy(G, nodes=None, get_chunks="chunks"): """In the parallel implementation we divide the nodes into chunks and compute the node redundancy coefficients for all `node_chunk` in parallel. @@ -19,7 +20,7 @@ def node_redundancy(G, nodes=None, get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `G.nodes` (or `nodes`) into `n` chunks, where `n` is the number of CPU cores. + `G.nodes` (or `nodes`) into `n_jobs` number of chunks. """ if hasattr(G, "graph_object"): @@ -31,13 +32,13 @@ def node_redundancy(G, nodes=None, get_chunks="chunks"): "Cannot compute redundancy coefficient for a node" " that has fewer than two neighbors." ) - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(nodes) // total_cores, 1) + num_in_chunk = max(len(nodes) // n_jobs, 1) node_chunks = nxp.chunks(nodes, num_in_chunk) else: node_chunks = get_chunks(nodes) - node_redundancies = Parallel(n_jobs=total_cores)( + node_redundancies = Parallel()( delayed( lambda G, node_chunk: [(v, _node_redundancy(G, v)) for v in node_chunk] )(G, node_chunk) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 29765796..3f396b1f 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -15,6 +15,7 @@ __all__ = ["betweenness_centrality", "edge_betweenness_centrality"] +@nxp._configure_if_nx_active() @py_random_state(5) def betweenness_centrality( G, @@ -35,7 +36,7 @@ def betweenness_centrality( get_chunks : str, function (default = "chunks") A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `nodes` into `n` chunks, where `n` is the number of CPU cores. + `nodes` into `n_jobs` number of chunks. """ if hasattr(G, "graph_object"): G = G.graph_object @@ -45,14 +46,14 @@ def betweenness_centrality( else: nodes = seed.sample(list(G.nodes), k) - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - node_chunks = nxp.create_iterables(G, "node", total_cores, nodes) + node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes) else: node_chunks = get_chunks(nodes) - bt_cs = Parallel(n_jobs=total_cores)( + bt_cs = Parallel()( delayed(_betweenness_centrality_node_subset)(G, chunk, weight, endpoints) for chunk in node_chunks ) @@ -90,6 +91,7 @@ def _betweenness_centrality_node_subset(G, nodes, weight=None, endpoints=False): return betweenness +@nxp._configure_if_nx_active() @py_random_state(4) def edge_betweenness_centrality( G, k=None, normalized=True, weight=None, seed=None, get_chunks="chunks" @@ -104,7 +106,7 @@ def edge_betweenness_centrality( get_chunks : str, function (default = "chunks") A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `nodes` into `n` chunks, where `n` is the number of CPU cores. + `nodes` into `n_jobs` number of chunks. """ if hasattr(G, "graph_object"): G = G.graph_object @@ -114,14 +116,14 @@ def edge_betweenness_centrality( else: nodes = seed.sample(list(G.nodes), k) - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - node_chunks = nxp.create_iterables(G, "node", total_cores, nodes) + node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes) else: node_chunks = get_chunks(nodes) - bt_cs = Parallel(n_jobs=total_cores)( + bt_cs = Parallel()( delayed(_edge_betweenness_centrality_node_subset)(G, chunk, weight) for chunk in node_chunks ) diff --git a/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py b/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py index dbc6d8a3..408ba05e 100644 --- a/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py +++ b/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py @@ -5,7 +5,7 @@ def test_betweenness_centrality_get_chunks(): def get_chunk(nodes): - num_chunks = nxp.cpu_count() + num_chunks = nxp.get_n_jobs() nodes_ebc = {i: 0 for i in nodes} for i in ebc: nodes_ebc[i[0]] += ebc[i] diff --git a/nx_parallel/algorithms/cluster.py b/nx_parallel/algorithms/cluster.py index fca589dd..c758796a 100644 --- a/nx_parallel/algorithms/cluster.py +++ b/nx_parallel/algorithms/cluster.py @@ -7,10 +7,11 @@ ] +@nxp._configure_if_nx_active() def square_clustering(G, nodes=None, get_chunks="chunks"): """The nodes are chunked into `node_chunks` and then the square clustering - coefficient for all `node_chunks` are computed in parallel over all available - CPU cores. + coefficient for all `node_chunks` are computed in parallel over `n_jobs` number + of CPU cores. networkx.square_clustering: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.cluster.square_clustering.html @@ -19,7 +20,7 @@ def square_clustering(G, nodes=None, get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in a list of all the nodes (or nbunch) as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `nodes` into `n` chunks, where `n` is the number of CPU cores. + `nodes` into `n_jobs` number of chunks. """ def _compute_clustering_chunk(node_iter_chunk): @@ -47,15 +48,15 @@ def _compute_clustering_chunk(node_iter_chunk): else: node_iter = list(G.nbunch_iter(nodes)) - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(node_iter) // total_cores, 1) + num_in_chunk = max(len(node_iter) // n_jobs, 1) node_iter_chunks = nxp.chunks(node_iter, num_in_chunk) else: node_iter_chunks = get_chunks(node_iter) - result = Parallel(n_jobs=total_cores)( + result = Parallel()( delayed(_compute_clustering_chunk)(node_iter_chunk) for node_iter_chunk in node_iter_chunks ) diff --git a/nx_parallel/algorithms/connectivity/connectivity.py b/nx_parallel/algorithms/connectivity/connectivity.py index 756d5059..223e4573 100644 --- a/nx_parallel/algorithms/connectivity/connectivity.py +++ b/nx_parallel/algorithms/connectivity/connectivity.py @@ -14,12 +14,13 @@ ] +@nxp._configure_if_nx_active() def all_pairs_node_connectivity(G, nbunch=None, flow_func=None, get_chunks="chunks"): """The parallel implementation first divides a list of all permutation (in case of directed graphs) and combinations (in case of undirected graphs) of `nbunch` into chunks and then creates a generator to lazily compute the local node connectivities for each chunk, and then employs joblib's `Parallel` function to - execute these computations in parallel across all available CPU cores. At the end, + execute these computations in parallel across `n_jobs` number of CPU cores. At the end, the results are aggregated into a single dictionary and returned. networkx.all_pairs_node_connectivity : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.connectivity.connectivity.all_pairs_node_connectivity.html @@ -30,8 +31,8 @@ def all_pairs_node_connectivity(G, nbunch=None, flow_func=None, get_chunks="chun A function that takes in `list(iter_func(nbunch, 2))` as input and returns an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of directed graphs and `combinations` in case of undirected graphs. The default - is to create chunks by slicing the list into `n` chunks, where `n` is the - number of CPU cores, such that size of each chunk is atmost 10, and at least 1. + is to create chunks by slicing the list into `n_jobs` number of chunks, such + that size of each chunk is atmost 10, and at least 1. """ if hasattr(G, "graph_object"): @@ -61,9 +62,9 @@ def _process_pair_chunk(pairs_chunk): ] pairs = list(iter_func(nbunch, 2)) - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(min(len(pairs) // total_cores, 10), 1) + num_in_chunk = max(min(len(pairs) // n_jobs, 10), 1) pairs_chunks = nxp.chunks(pairs, num_in_chunk) else: pairs_chunks = get_chunks(pairs) @@ -72,7 +73,7 @@ def _process_pair_chunk(pairs_chunk): delayed(_process_pair_chunk)(pairs_chunk) for pairs_chunk in pairs_chunks ) - for nc_chunk in Parallel(n_jobs=total_cores)(nc_chunk_generator): + for nc_chunk in Parallel()(nc_chunk_generator): for u, v, k in nc_chunk: all_pairs[u][v] = k if not directed: diff --git a/nx_parallel/algorithms/efficiency_measures.py b/nx_parallel/algorithms/efficiency_measures.py index 8c88baef..62572571 100644 --- a/nx_parallel/algorithms/efficiency_measures.py +++ b/nx_parallel/algorithms/efficiency_measures.py @@ -6,6 +6,7 @@ __all__ = ["local_efficiency"] +@nxp._configure_if_nx_active() def local_efficiency(G, get_chunks="chunks"): """The parallel computation is implemented by dividing the nodes into chunks and then computing and adding global efficiencies of all node @@ -19,7 +20,7 @@ def local_efficiency(G, get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` - into `n` chunks, where `n` is the total number of CPU cores available. + into `n_jobs` number of chunks. """ def _local_efficiency_node_subset(G, chunk): @@ -28,15 +29,15 @@ def _local_efficiency_node_subset(G, chunk): if hasattr(G, "graph_object"): G = G.graph_object - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(G.nodes) // total_cores, 1) + num_in_chunk = max(len(G.nodes) // n_jobs, 1) node_chunks = list(nxp.chunks(G.nodes, num_in_chunk)) else: node_chunks = get_chunks(G.nodes) - efficiencies = Parallel(n_jobs=total_cores)( + efficiencies = Parallel()( delayed(_local_efficiency_node_subset)(G, chunk) for chunk in node_chunks ) return sum(efficiencies) / len(G) diff --git a/nx_parallel/algorithms/isolate.py b/nx_parallel/algorithms/isolate.py index 977996d7..525b7e5d 100644 --- a/nx_parallel/algorithms/isolate.py +++ b/nx_parallel/algorithms/isolate.py @@ -5,6 +5,7 @@ __all__ = ["number_of_isolates"] +@nxp._configure_if_nx_active() def number_of_isolates(G, get_chunks="chunks"): """The parallel computation is implemented by dividing the list of isolated nodes into chunks and then finding the length of each chunk in parallel @@ -17,21 +18,19 @@ def number_of_isolates(G, get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in a list of all the isolated nodes as input and returns an iterable `isolate_chunks`. The default chunking is done by slicing the `isolates` - into `n` chunks, where `n` is the total number of CPU cores available. + into `n_jobs` number of chunks. """ if hasattr(G, "graph_object"): G = G.graph_object - cpu_count = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() isolates_list = list(nx.isolates(G)) if get_chunks == "chunks": - num_in_chunk = max(len(isolates_list) // cpu_count, 1) + num_in_chunk = max(len(isolates_list) // n_jobs, 1) isolate_chunks = nxp.chunks(isolates_list, num_in_chunk) else: isolate_chunks = get_chunks(isolates_list) - results = Parallel(n_jobs=cpu_count)( - delayed(len)(chunk) for chunk in isolate_chunks - ) + results = Parallel()(delayed(len)(chunk) for chunk in isolate_chunks) return sum(results) diff --git a/nx_parallel/algorithms/shortest_paths/generic.py b/nx_parallel/algorithms/shortest_paths/generic.py index 4cba68a7..4901f49b 100644 --- a/nx_parallel/algorithms/shortest_paths/generic.py +++ b/nx_parallel/algorithms/shortest_paths/generic.py @@ -7,13 +7,14 @@ ] +@nxp._configure_if_nx_active() def all_pairs_all_shortest_paths( G, weight=None, method="dijkstra", get_chunks="chunks" ): """The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute all shortest paths between all nodes for each node in `node_chunk`, and then employs joblib's `Parallel` function to - execute these computations in parallel across all available CPU cores. + execute these computations in parallel across `n_jobs` number of CPU cores. networkx.single_source_all_shortest_paths : https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.shortest_paths.generic.single_source_all_shortest_paths.html @@ -22,7 +23,7 @@ def all_pairs_all_shortest_paths( get_chunks : str, function (default = "chunks") A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `G.nodes` into `n` chunks, where `n` is the number of CPU cores. + `G.nodes` into `n_jobs` number of chunks. """ def _process_node_chunk(node_chunk): @@ -40,10 +41,10 @@ def _process_node_chunk(node_chunk): G = G.graph_object nodes = G.nodes - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(nodes) // total_cores, 1) + num_in_chunk = max(len(nodes) // n_jobs, 1) node_chunks = nxp.chunks(nodes, num_in_chunk) else: node_chunks = get_chunks(nodes) @@ -52,6 +53,6 @@ def _process_node_chunk(node_chunk): delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks ) - for path_chunk in Parallel(n_jobs=total_cores)(paths_chunk_generator): + for path_chunk in Parallel()(paths_chunk_generator): for path in path_chunk: yield path diff --git a/nx_parallel/algorithms/shortest_paths/unweighted.py b/nx_parallel/algorithms/shortest_paths/unweighted.py index d1930e6b..455b9584 100644 --- a/nx_parallel/algorithms/shortest_paths/unweighted.py +++ b/nx_parallel/algorithms/shortest_paths/unweighted.py @@ -15,11 +15,12 @@ ] +@nxp._configure_if_nx_active() def all_pairs_shortest_path_length(G, cutoff=None, get_chunks="chunks"): """The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths lengths for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these - computations in parallel across all available CPU cores. + computations in parallel across `n_jobs` number of CPU cores. networkx.single_source_shortest_path_length : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.unweighted.all_pairs_shortest_path_length.html @@ -28,7 +29,7 @@ def all_pairs_shortest_path_length(G, cutoff=None, get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `G.nodes` into `n` chunks, where `n` is the number of CPU cores. + `G.nodes` into `n_jobs` number of chunks. """ def _process_node_chunk(node_chunk): @@ -41,10 +42,10 @@ def _process_node_chunk(node_chunk): G = G.graph_object nodes = G.nodes - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(nodes) // total_cores, 1) + num_in_chunk = max(len(nodes) // n_jobs, 1) node_chunks = nxp.chunks(nodes, num_in_chunk) else: node_chunks = get_chunks(nodes) @@ -53,18 +54,17 @@ def _process_node_chunk(node_chunk): delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks ) - for path_length_chunk in Parallel(n_jobs=nxp.cpu_count())( - path_lengths_chunk_generator - ): + for path_length_chunk in Parallel()(path_lengths_chunk_generator): for path_length in path_length_chunk: yield path_length +@nxp._configure_if_nx_active() def all_pairs_shortest_path(G, cutoff=None, get_chunks="chunks"): """The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths for each `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in - parallel across all available CPU cores. + parallel across `n_jobs` number of CPU cores. networkx.single_source_shortest_path : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.unweighted.all_pairs_shortest_path.html @@ -73,7 +73,7 @@ def all_pairs_shortest_path(G, cutoff=None, get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `G.nodes` into `n` chunks, where `n` is the number of CPU cores. + `G.nodes` into `n_jobs` number of chunks. """ def _process_node_chunk(node_chunk): @@ -86,10 +86,10 @@ def _process_node_chunk(node_chunk): G = G.graph_object nodes = G.nodes - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(nodes) // total_cores, 1) + num_in_chunk = max(len(nodes) // n_jobs, 1) node_chunks = nxp.chunks(nodes, num_in_chunk) else: node_chunks = get_chunks(nodes) @@ -98,6 +98,6 @@ def _process_node_chunk(node_chunk): delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks ) - for path_chunk in Parallel(n_jobs=nxp.cpu_count())(paths_chunk_generator): + for path_chunk in Parallel()(paths_chunk_generator): for path in path_chunk: yield path diff --git a/nx_parallel/algorithms/shortest_paths/weighted.py b/nx_parallel/algorithms/shortest_paths/weighted.py index 53bda1b0..e45a2523 100644 --- a/nx_parallel/algorithms/shortest_paths/weighted.py +++ b/nx_parallel/algorithms/shortest_paths/weighted.py @@ -25,11 +25,12 @@ ] +@nxp._configure_if_nx_active() def all_pairs_dijkstra(G, cutoff=None, weight="weight", get_chunks="chunks"): """The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths and lengths for each `node_chunk`, and then employs joblib's `Parallel` function to execute these - computations in parallel across all available CPU cores. + computations in parallel across `n_jobs` number of CPU cores. networkx.all_pairs_dijkstra : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.all_pairs_dijkstra.html#all-pairs-dijkstra @@ -38,7 +39,7 @@ def all_pairs_dijkstra(G, cutoff=None, weight="weight", get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `G.nodes` into `n` chunks, where `n` is the number of CPU cores. + `G.nodes` into `n_jobs` number of chunks. """ def _process_node_chunk(node_chunk): @@ -51,10 +52,10 @@ def _process_node_chunk(node_chunk): G = G.graph_object nodes = G.nodes - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(nodes) // total_cores, 1) + num_in_chunk = max(len(nodes) // n_jobs, 1) node_chunks = nxp.chunks(nodes, num_in_chunk) else: node_chunks = get_chunks(nodes) @@ -63,18 +64,19 @@ def _process_node_chunk(node_chunk): delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks ) - for path_chunk in Parallel(n_jobs=nxp.cpu_count())(paths_chunk_generator): + for path_chunk in Parallel()(paths_chunk_generator): for path in path_chunk: yield path +@nxp._configure_if_nx_active() def all_pairs_dijkstra_path_length( G, cutoff=None, weight="weight", get_chunks="chunks" ): """The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths lengths for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these - computations in parallel across all available CPU cores. + computations in parallel across `n_jobs` number of CPU cores. networkx.all_pairs_dijkstra_path_length : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.all_pairs_dijkstra_path_length.html @@ -83,7 +85,7 @@ def all_pairs_dijkstra_path_length( get_chunks : str, function (default = "chunks") A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `G.nodes` into `n` chunks, where `n` is the number of CPU cores. + `G.nodes` into `n_jobs` number of chunks. """ def _process_node_chunk(node_chunk): @@ -101,10 +103,10 @@ def _process_node_chunk(node_chunk): G = G.graph_object nodes = G.nodes - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(nodes) // total_cores, 1) + num_in_chunk = max(len(nodes) // n_jobs, 1) node_chunks = nxp.chunks(nodes, num_in_chunk) else: node_chunks = get_chunks(nodes) @@ -113,16 +115,17 @@ def _process_node_chunk(node_chunk): delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks ) - for path_chunk in Parallel(n_jobs=nxp.cpu_count())(paths_chunk_generator): + for path_chunk in Parallel()(paths_chunk_generator): for path in path_chunk: yield path +@nxp._configure_if_nx_active() def all_pairs_dijkstra_path(G, cutoff=None, weight="weight", get_chunks="chunks"): """The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths for each `node_chunk`, and then employs joblib's `Parallel` function to execute these computations in - parallel across all available CPU cores. + parallel across `n_jobs` number of CPU cores. networkx.all_pairs_dijkstra_path : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.all_pairs_dijkstra_path.html @@ -131,7 +134,7 @@ def all_pairs_dijkstra_path(G, cutoff=None, weight="weight", get_chunks="chunks" get_chunks : str, function (default = "chunks") A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `G.nodes` into `n` chunks, where `n` is the number of CPU cores. + `G.nodes` into `n_jobs` number of chunks. """ def _process_node_chunk(node_chunk): @@ -144,10 +147,10 @@ def _process_node_chunk(node_chunk): G = G.graph_object nodes = G.nodes - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(nodes) // total_cores, 1) + num_in_chunk = max(len(nodes) // n_jobs, 1) node_chunks = nxp.chunks(nodes, num_in_chunk) else: node_chunks = get_chunks(nodes) @@ -156,16 +159,17 @@ def _process_node_chunk(node_chunk): delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks ) - for path_chunk in Parallel(n_jobs=nxp.cpu_count())(paths_chunk_generator): + for path_chunk in Parallel()(paths_chunk_generator): for path in path_chunk: yield path +@nxp._configure_if_nx_active() def all_pairs_bellman_ford_path_length(G, weight="weight", get_chunks="chunks"): """The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths lengths for each node in `node_chunk`, and then employs joblib's `Parallel` function to execute these - computations in parallel across all available CPU cores. + computations in parallel across `n_jobs` number of CPU cores. networkx.all_pairs_bellman_ford_path_length : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.all_pairs_bellman_ford_path_length.html @@ -174,7 +178,7 @@ def all_pairs_bellman_ford_path_length(G, weight="weight", get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `G.nodes` into `n` chunks, where `n` is the number of CPU cores. + `G.nodes` into `n_jobs` number of chunks. """ def _process_node_chunk(node_chunk): @@ -187,10 +191,10 @@ def _process_node_chunk(node_chunk): G = G.graph_object nodes = G.nodes - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(nodes) // total_cores, 1) + num_in_chunk = max(len(nodes) // n_jobs, 1) node_chunks = nxp.chunks(nodes, num_in_chunk) else: node_chunks = get_chunks(nodes) @@ -199,18 +203,17 @@ def _process_node_chunk(node_chunk): delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks ) - for path_length_chunk in Parallel(n_jobs=nxp.cpu_count())( - path_lengths_chunk_generator - ): + for path_length_chunk in Parallel()(path_lengths_chunk_generator): for path_length in path_length_chunk: yield path_length +@nxp._configure_if_nx_active() def all_pairs_bellman_ford_path(G, weight="weight", get_chunks="chunks"): """The parallel implementation first divides the nodes into chunks and then creates a generator to lazily compute shortest paths for each node_chunk, and then employs joblib's `Parallel` function to execute these computations in - parallel across all available CPU cores. + parallel across `n_jobs` number of CPU cores. networkx.all_pairs_bellman_ford_path : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.all_pairs_bellman_ford_path.html @@ -219,7 +222,7 @@ def all_pairs_bellman_ford_path(G, weight="weight", get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `G.nodes` into `n` chunks, where `n` is the number of CPU cores. + `G.nodes` into `n_jobs` number of chunks. """ def _process_node_chunk(node_chunk): @@ -232,10 +235,10 @@ def _process_node_chunk(node_chunk): G = G.graph_object nodes = G.nodes - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(nodes) // total_cores, 1) + num_in_chunk = max(len(nodes) // n_jobs, 1) node_chunks = nxp.chunks(nodes, num_in_chunk) else: node_chunks = get_chunks(nodes) @@ -244,11 +247,12 @@ def _process_node_chunk(node_chunk): delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks ) - for path_chunk in Parallel(n_jobs=nxp.cpu_count())(paths_chunk_generator): + for path_chunk in Parallel()(paths_chunk_generator): for path in path_chunk: yield path +@nxp._configure_if_nx_active() def johnson(G, weight="weight", get_chunks="chunks"): """The parallel computation is implemented by dividing the nodes into chunks and computing the shortest paths using Johnson's Algorithm @@ -261,7 +265,7 @@ def johnson(G, weight="weight", get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `G.nodes` into `n` chunks, where `n` is the number of CPU cores. + `G.nodes` into `n_jobs` number of chunks. """ if hasattr(G, "graph_object"): G = G.graph_object @@ -286,14 +290,12 @@ def dist_path(v): def _johnson_subset(chunk): return {node: dist_path(node) for node in chunk} - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(G.nodes) // total_cores, 1) + num_in_chunk = max(len(G.nodes) // n_jobs, 1) node_chunks = nxp.chunks(G.nodes, num_in_chunk) else: node_chunks = get_chunks(G.nodes) - results = Parallel(n_jobs=total_cores)( - delayed(_johnson_subset)(chunk) for chunk in node_chunks - ) + results = Parallel()(delayed(_johnson_subset)(chunk) for chunk in node_chunks) return {v: d_path for result_chunk in results for v, d_path in result_chunk.items()} diff --git a/nx_parallel/algorithms/tournament.py b/nx_parallel/algorithms/tournament.py index 8064ae77..bf9da86a 100644 --- a/nx_parallel/algorithms/tournament.py +++ b/nx_parallel/algorithms/tournament.py @@ -9,6 +9,7 @@ ] +@nxp._configure_if_nx_active() def is_reachable(G, s, t, get_chunks="chunks"): """The function parallelizes the calculation of two neighborhoods of vertices in `G` and checks closure conditions for each @@ -21,7 +22,7 @@ def is_reachable(G, s, t, get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` - into `n` chunks, where `n` is the total number of CPU cores available. + into `n_jobs` number of chunks. """ def two_neighborhood_close(G, chunk): @@ -41,21 +42,20 @@ def is_closed(G, nodes): if hasattr(G, "graph_object"): G = G.graph_object - cpu_count = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(G) // cpu_count, 1) + num_in_chunk = max(len(G) // n_jobs, 1) node_chunks = nxp.chunks(G, num_in_chunk) else: node_chunks = get_chunks(G) return all( - Parallel(n_jobs=cpu_count)( - delayed(two_neighborhood_close)(G, chunk) for chunk in node_chunks - ) + Parallel()(delayed(two_neighborhood_close)(G, chunk) for chunk in node_chunks) ) +@nxp._configure_if_nx_active() def tournament_is_strongly_connected(G, get_chunks="chunks"): """The parallel computation is implemented by dividing the nodes into chunks and then checking whether each node is reachable from each @@ -72,7 +72,7 @@ def tournament_is_strongly_connected(G, get_chunks="chunks"): get_chunks : str, function (default = "chunks") A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` - into `n` chunks, where `n` is the total number of CPU cores available. + into `n_jobs` number of chunks. """ if hasattr(G, "graph_object"): G = G.graph_object @@ -80,15 +80,15 @@ def tournament_is_strongly_connected(G, get_chunks="chunks"): def is_reachable_subset(G, chunk): return all(nx.tournament.is_reachable(G, u, v) for v in chunk for u in G) - cpu_count = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(min(len(G) // cpu_count, 10), 1) + num_in_chunk = max(min(len(G) // n_jobs, 10), 1) node_chunks = nxp.chunks(G, num_in_chunk) else: node_chunks = get_chunks(G) - results = Parallel(n_jobs=cpu_count)( + results = Parallel()( delayed(is_reachable_subset)(G, chunk) for chunk in node_chunks ) return all(results) diff --git a/nx_parallel/algorithms/vitality.py b/nx_parallel/algorithms/vitality.py index d0a57986..c44b11d2 100644 --- a/nx_parallel/algorithms/vitality.py +++ b/nx_parallel/algorithms/vitality.py @@ -6,6 +6,7 @@ __all__ = ["closeness_vitality"] +@nxp._configure_if_nx_active() def closeness_vitality( G, node=None, weight=None, wiener_index=None, get_chunks="chunks" ): @@ -19,7 +20,7 @@ def closeness_vitality( get_chunks : str, function (default = "chunks") A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the - `nodes` into `n` chunks, where `n` is the total number of CPU cores. + `nodes` into `n_jobs` number of chunks. """ def closeness_vitality_chunk_subset(chunk): @@ -35,10 +36,10 @@ def closeness_vitality_chunk_subset(chunk): after = nx.wiener_index(G.subgraph(set(G) - {node}), weight=weight) return wiener_index - after - total_cores = nxp.cpu_count() + n_jobs = nxp.get_n_jobs() if get_chunks == "chunks": - num_in_chunk = max(len(G) // total_cores, 1) + num_in_chunk = max(len(G) // n_jobs, 1) node_chunks = nxp.chunks(G.nodes, num_in_chunk) else: node_chunks = get_chunks(G.nodes) @@ -47,7 +48,7 @@ def closeness_vitality_chunk_subset(chunk): nx.closeness_vitality, G, weight=weight, wiener_index=wiener_index ) - result = Parallel(n_jobs=total_cores)( + result = Parallel()( delayed(closeness_vitality_chunk_subset)(chunk) for chunk in node_chunks ) return {k: v for d in result for k, v in d.items()} diff --git a/nx_parallel/tests/test_get_chunks.py b/nx_parallel/tests/test_get_chunks.py index 88057f7f..fbf6f3ed 100644 --- a/nx_parallel/tests/test_get_chunks.py +++ b/nx_parallel/tests/test_get_chunks.py @@ -42,7 +42,7 @@ def random_chunking(nodes): _nodes = list(nodes).copy() random.seed(42) random.shuffle(_nodes) - num_chunks = nxp.cpu_count() + num_chunks = nxp.get_n_jobs() num_in_chunk = max(len(_nodes) // num_chunks, 1) return nxp.chunks(_nodes, num_in_chunk) diff --git a/nx_parallel/utils/__init__.py b/nx_parallel/utils/__init__.py index 253f731e..8fe88645 100644 --- a/nx_parallel/utils/__init__.py +++ b/nx_parallel/utils/__init__.py @@ -1 +1,2 @@ from .chunk import * +from .decorators import * diff --git a/nx_parallel/utils/chunk.py b/nx_parallel/utils/chunk.py index 9c75c2c9..3ac11171 100644 --- a/nx_parallel/utils/chunk.py +++ b/nx_parallel/utils/chunk.py @@ -2,7 +2,7 @@ import os import networkx as nx -__all__ = ["chunks", "cpu_count", "create_iterables"] +__all__ = ["chunks", "get_n_jobs", "create_iterables"] def chunks(iterable, n): @@ -15,12 +15,25 @@ def chunks(iterable, n): yield x -def cpu_count(): - """Returns the number of logical CPUs or cores""" - # Check if we are running under pytest +def get_n_jobs(n_jobs=None): + """Returns the positive value of `n_jobs`.""" if "PYTEST_CURRENT_TEST" in os.environ: return 2 - return os.cpu_count() + else: + if nx.config.backends.parallel.active: + n_jobs = nx.config.backends.parallel.n_jobs + else: + from joblib.parallel import get_active_backend + + n_jobs = get_active_backend()[1] + n_cpus = os.cpu_count() + if n_jobs is None: + return 1 + if n_jobs < 0: + return n_cpus + n_jobs + 1 + if n_jobs == 0: + raise ValueError("n_jobs == 0 in Parallel has no meaning") + return int(n_jobs) def create_iterables(G, iterator, n_cores, list_of_iterator=None): diff --git a/nx_parallel/utils/decorators.py b/nx_parallel/utils/decorators.py new file mode 100644 index 00000000..c5bb7f84 --- /dev/null +++ b/nx_parallel/utils/decorators.py @@ -0,0 +1,36 @@ +from dataclasses import asdict +from joblib import parallel_config +import networkx as nx +from functools import wraps +import os + +__all__ = [ + "_configure_if_nx_active", +] + + +def _configure_if_nx_active(): + """Decorator to set the configuration for the parallel computation + of the nx-parallel algorithms.""" + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + if ( + nx.config.backends.parallel.active + or "PYTEST_CURRENT_TEST" in os.environ + ): + # to activate nx config system in nx_parallel use: + # `nx.config.backends.parallel.active = True` + config_dict = asdict(nx.config.backends.parallel) + del config_dict["active"] + config_dict.update(config_dict["backend_params"]) + del config_dict["backend_params"] + with parallel_config(**config_dict): + return func(*args, **kwargs) + else: + return func(*args, **kwargs) + + return wrapper + + return decorator diff --git a/pyproject.toml b/pyproject.toml index 831e2cac..c67712a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,8 @@ parallel = "_nx_parallel:get_info" [project.optional-dependencies] developer = [ - 'pre-commit', + 'pre-commit==3.8.0', + 'ruff==0.1.8', ] test = [ 'pytest>=7.2', @@ -85,7 +86,7 @@ line-length = 88 target-version = 'py310' [tool.ruff.lint.per-file-ignores] -"__init__.py" = ['I', 'F403'] +"__init__.py" = ['I', 'F403', 'F401'] [tool.ruff.format] docstring-code-format = true