diff --git a/conftest.py b/conftest.py index fcc64569..deeb411d 100644 --- a/conftest.py +++ b/conftest.py @@ -1,17 +1,27 @@ +# type: ignore import os -from biodivine_aeon import BooleanNetwork # type: ignore -# The purpose of this file is to detect tests with `network_file` as input and -# then supply these tests with networks from `bbm-bnet-inputs-true` up to a certain -# network size. This network size can be configured using `--networksize` and -# its default value is 20. + +from biodivine_aeon import BooleanNetwork + +# The purpose of this file is to detect tests with `network_file` as input and +# then supply these tests with networks from `bbm-bnet-inputs-true` up to a +# certain network size. This network size can be configured using +# `--networksize` and its default value is 20. # We intentionally test on the `-inputs-true` models as opposed to `-inputs-identity`, # as having fixed inputs ensures there are not too many trap spaces, fixed points, etc. + def pytest_addoption(parser): - parser.addoption("--networksize", action="store", default="20", help="Only check networks up to this size.") + parser.addoption( + "--networksize", + action="store", + default="20", + help="Only check networks up to this size.", + ) + -def pytest_generate_tests(metafunc): +def pytest_generate_tests(metafunc): if "network_file" in metafunc.fixturenames: size = int(metafunc.config.getoption("networksize")) models = [] @@ -24,4 +34,4 @@ def pytest_generate_tests(metafunc): if bn.num_vars() > size: continue models.append(path) - metafunc.parametrize("network_file", models) \ No newline at end of file + metafunc.parametrize("network_file", models) diff --git a/nfvsmotifs/SuccessionDiagram.py b/nfvsmotifs/SuccessionDiagram.py index d48dc52f..2b00e2fe 100644 --- a/nfvsmotifs/SuccessionDiagram.py +++ b/nfvsmotifs/SuccessionDiagram.py @@ -1,24 +1,23 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, cast if TYPE_CHECKING: - from typing import Set, Iterator + from typing import Iterator from biodivine_aeon import BooleanNetwork import networkx as nx # type: ignore -from nfvsmotifs.interaction_graph_utils import feedback_vertex_set -from nfvsmotifs.petri_net_translation import network_to_petrinet -from nfvsmotifs.space_utils import percolate_space, space_unique_key -from nfvsmotifs.trappist_core import trappist - +from nfvsmotifs._sd_algorithms.compute_attractor_seeds import compute_attractor_seeds +from nfvsmotifs._sd_algorithms.expand_attractor_seeds import expand_attractor_seeds from nfvsmotifs._sd_algorithms.expand_bfs import expand_bfs from nfvsmotifs._sd_algorithms.expand_dfs import expand_dfs from nfvsmotifs._sd_algorithms.expand_minimal_spaces import expand_minimal_spaces -from nfvsmotifs._sd_algorithms.compute_attractor_seeds import compute_attractor_seeds -from nfvsmotifs._sd_algorithms.expand_attractor_seeds import expand_attractor_seeds from nfvsmotifs._sd_algorithms.expand_to_target import expand_to_target +from nfvsmotifs.interaction_graph_utils import feedback_vertex_set +from nfvsmotifs.petri_net_translation import network_to_petrinet +from nfvsmotifs.space_utils import percolate_space, space_unique_key +from nfvsmotifs.trappist_core import trappist # Enables helpful "progress" messages. DEBUG = False @@ -26,57 +25,70 @@ class SuccessionDiagram: """ - `SuccessionDiagram` (SD) is a directed acyclic graph representing the structure of trap spaces - induced by a particular `BooleanNetwork`. The root of the graph is the whole state space (after - percolation of constant values). The leaf nodes are individual minimal trap spaces. Each path - from the root to a leaf represents a succession of gradually more restrictive trap spaces. - - There are several additional "features" implemented by `SuccessionDiagram` that allow us to - implement more advanced (and efficient) algorithms: - - The succession diagram can be expanded lazily. Each node is initially a *stub* node, - meaning none of its child nodes are known. A stub can be then *expanded* into a full node - by computing the *stable motifs* for the associated trap space. - - Each node can be annotated with *attractor seed states*, i.e. states that are known to - cover all network attractors within that space. + `SuccessionDiagram` (SD) is a directed acyclic graph representing the + structure of trap spaces induced by a particular `BooleanNetwork`. The root + of the graph is the whole state space (after percolation of constant + values). The leaf nodes are individual minimal trap spaces. Each path from + the root to a leaf represents a succession of gradually more restrictive + trap spaces. + + There are several additional "features" implemented by `SuccessionDiagram` + that allow us to implement more advanced (and efficient) algorithms: + - The succession diagram can be expanded lazily. Each node is initially + a *stub* node, + meaning none of its child nodes are known. A stub can be then *expanded* + into a full node by computing the *stable motifs* for the associated + trap space. - Each node can be annotated with *attractor seed states*, + i.e. states that are known to cover all network attractors within that + space. Overview of the `SuccessionDiagram` API: - Introspection of the whole diagram: * `SuccessionDiagram.root()`: the integer ID of the root node. - * `SuccessionDiagram.depth()`: The depth (hight?) of the current succession diagram. - * `SuccessionDiagram.node_ids() / .stub_ids() / .expanded_ids()`: Iterates over the - corresponding *node IDs* managed by this SD. - * `SuccessionDiagram.is_subgraph() / .is_isomorphic()`: Compares two succession diagrams. + * `SuccessionDiagram.depth()`: The depth (hight?) of the current + succession diagram. + * `SuccessionDiagram.node_ids() / .stub_ids() / .expanded_ids()`: + Iterates over the + corresponding *node IDs* managed by this SD. * + `SuccessionDiagram.is_subgraph() / .is_isomorphic()`: Compares two + succession diagrams. - Inspecting individual nodes: - * `SuccessionDiagram.node_depth(id)`: The depth (length of a maximal path from the - root) of the given node. - * `SuccessionDiagram.node_space(id)`: The trap space associated with the given node. - * `SuccessionDiagram.node_is_minimal(id)`: Checks if the given node is a minimal - trap space: i.e. it is expanded and has no successor nodes. - * `SuccessionDiagram.node_is_expaned(id)`: Check if the given node is expanded, i.e. - its successor nodes are computed. - * `SuccessionDiagram.node_successors(id, compute=True/False)`: The list of successor - node IDs for the given node (can be computed if not yet known, in which case the - node becomes expanded). - * `SuccessionDiagram.node_attractor_seeds(id, compute=True/False)`: The list of - "attractor seed states" associated with the given node (if these are not known yet, - they can be computed). - * `SuccessionDiagram.edge_stable_motif(id, id)`: Obtain the stable motif which enables - the edge between the two nodes. - - There are several "expand procedures" that can explore a larger part of the SD at once, + * `SuccessionDiagram.node_depth(id)`: The depth (length of a maximal + path from the + root) of the given node. * `SuccessionDiagram.node_space(id)`: The + trap space associated with the given node. * + `SuccessionDiagram.node_is_minimal(id)`: Checks if the given node is + a minimal trap space: i.e. it is expanded and has no successor + nodes. * `SuccessionDiagram.node_is_expaned(id)`: Check if the given + node is expanded, i.e. its successor nodes are computed. * + `SuccessionDiagram.node_successors(id, compute=True/False)`: The + list of successor node IDs for the given node (can be computed if + not yet known, in which case the node becomes expanded). * + `SuccessionDiagram.node_attractor_seeds(id, compute=True/False)`: + The list of "attractor seed states" associated with the given node + (if these are not known yet, they can be computed). * + `SuccessionDiagram.edge_stable_motif(id, id)`: Obtain the stable + motif which enables the edge between the two nodes. + - There are several "expand procedures" that can explore a larger part + of the SD at once, typically with a specific goal in mind: - * `SuccessionDiagram.expand_bfs() / .expand_dfs()`: Expands the whole SD up to a certain - upper bound on size/depth. - * `SuccessionDiagram.expand_minimal_traps()`: Expands the SD such that each minimal trap - space is reachable by at least one path from the root. + * `SuccessionDiagram.expand_bfs() / .expand_dfs()`: Expands the + whole SD up to a certain + upper bound on size/depth. * + `SuccessionDiagram.expand_minimal_traps()`: Expands the SD such that + each minimal trap space is reachable by at least one path from the + root. + + *Other internal implementation notes:* - *Other internal implementation notes:* - - *The node IDs are assumed to be a continuous range of integers. If this breaks at any point, - please make sure to check the correctness of the new implementation thoroughly.* + *The node IDs are assumed to be a continuous range of integers. If this + breaks at any point, please make sure to check the correctness of the new + implementation thoroughly.* - *There is no way to remove or otherwise "contract" the succession diagram: Once a node is expanded, - it stays expanded. At the time of writing, there does not appear to be a need for deleting SD - nodes and it greatly simplifies reasoning about correctness.* + *There is no way to remove or otherwise "contract" the succession diagram: + Once a node is expanded, it stays expanded. At the time of writing, there + does not appear to be a need for deleting SD nodes and it greatly simplifies + reasoning about correctness.* """ @@ -91,11 +103,11 @@ def __init__(self, network: BooleanNetwork): ) # find_minimum_NFVS(network) # A directed acyclic graph representing the succession diagram. self.G = nx.DiGraph() - # A dictionary used for uniqueness checks on the nodes of the succession diagram. - # See `SuccessionDiagram.ensure_node` for details. + # A dictionary used for uniqueness checks on the nodes of the succession + # diagram. See `SuccessionDiagram.ensure_node` for details. self.node_indices: dict[int, int] = {} - # Create an un-expanded root node. + # Create an un-expanded root node. self._ensure_node(None, {}) def __len__(self) -> int: @@ -117,8 +129,8 @@ def depth(self) -> int: Depth is counted from zero (root has depth zero). """ d = 0 - for node in self.G.nodes(): # type: ignore[reportUnknownVariableType] # noqa - d = max(d, self.node_depth(int(node))) # type: ignore[reportUnknownArgumentType] # noqa + for node in cast(set[int], self.G.nodes()): + d = max(d, self.node_depth(int(node))) return d def node_ids(self) -> Iterator[int]: @@ -146,44 +158,49 @@ def expanded_ids(self) -> Iterator[int]: def minimal_trap_spaces(self) -> list[int]: """ - List of node IDs that represent the minimal trap spaces within this succession diagram. + List of node IDs that represent the minimal trap spaces within this + succession diagram. Note that stub nodes do not count as minimal! """ - return [ i for i in self.expanded_ids() if self.node_is_minimal(i) ] + return [i for i in self.expanded_ids() if self.node_is_minimal(i)] def find_node(self, node_space: dict[str, int]) -> int | None: """ - Return the ID of the node matching the provided `node_space`, or `None` if no such - node exists in this succession diagram. + Return the ID of the node matching the provided `node_space`, or `None` + if no such node exists in this succession diagram. """ try: key = space_unique_key(node_space, self.network) if key in self.node_indices: i = self.node_indices[key] - # This assertion could be violated if a user gives a node space that is not based - # on the same network as this succession diagram. + # This assertion could be violated if a user gives a node space + # that is not based on the same network as this succession + # diagram. assert node_space == self.node_space(i) return i else: return None except RuntimeError: - # If the user gives us a space that uses variables not used by this network, - # we should get an error that we can catch and report that no such space exists here. - return None + # If the user gives us a space that uses variables not used by this + # network, we should get an error that we can catch and report that + # no such space exists here. + return None def is_subgraph(self, other: SuccessionDiagram) -> bool: """ - Returns `True` if this succession diagram is a subgraph of the `other` succession diagram. - - Note that this function works even for diagrams based on different Boolean - networks, as long as both succession diagrams only depend on the same subset of - network variables. + Returns `True` if this succession diagram is a subgraph of the `other` + succession diagram. - WARNING: This does not take into account the stable motifs on individual edges. Just the - subspaces associated with nodes and the presence of edges between nodes. - """ - # Every stub node is reachable through an expanded node and + Note that this function works even for diagrams based on different + Boolean networks, as long as both succession diagrams only depend on the + same subset of network variables. + + WARNING: This does not take into account the stable motifs on individual + edges. Just the subspaces associated with nodes and the presence of + edges between nodes. + """ + # Every stub node is reachable through an expanded node and # thus will be checked by the following code. for i in self.expanded_ids(): other_i = other.find_node(self.node_space(i)) @@ -191,7 +208,7 @@ def is_subgraph(self, other: SuccessionDiagram) -> bool: return False my_successors = self.node_successors(i) other_successors = [] - if other.node_is_expanded(other_i): + if other.node_is_expanded(other_i): other_successors = other.node_successors(other_i) for my_s in my_successors: @@ -204,12 +221,13 @@ def is_isomorphic(self, other: SuccessionDiagram) -> bool: """ Returns `True` if the two succession diagrams are isomorphic. - Note that this function works even for diagrams based on different Boolean - networks, as long as both succession diagrams only depend on the same subset of - network variables. + Note that this function works even for diagrams based on different + Boolean networks, as long as both succession diagrams only depend on the + same subset of network variables. - WARNING: This does not take into account the stable motifs on individual edges. Just the - subspaces associated with nodes and the presence of edges between nodes. + WARNING: This does not take into account the stable motifs on individual + edges. Just the subspaces associated with nodes and the presence of + edges between nodes. """ return self.is_subgraph(other) and other.is_subgraph(self) @@ -217,8 +235,8 @@ def node_depth(self, node_id: int) -> int: """ Get the depth associated with the provided `node_id`. The depth is counted as the longest path from the root node to the given node. - """ - return self.G.nodes[node_id]["depth"] # type: ignore[reportUnknownVariableType] # noqa + """ + return cast(int, self.G.nodes[node_id]["depth"]) def node_space(self, node_id: int) -> dict[str, int]: """ @@ -227,102 +245,117 @@ def node_space(self, node_id: int) -> dict[str, int]: Note that this is the space *after* percolation. Hence it can hold that `|node_space(child)| < |node_space(parent)| + |stable_motif(parent, child)|`. """ - return self.G.nodes[node_id]["space"] # type: ignore[reportUnknownVariableType] # noqa + return cast(dict[str, int], self.G.nodes[node_id]["space"]) def node_is_expanded(self, node_id: int) -> bool: """ True if the successors of the given node are already computed. """ - return self.G.nodes[node_id]["expanded"] + return cast(bool, self.G.nodes[node_id]["expanded"]) def node_is_minimal(self, node_id: int) -> bool: """ - True if the node is expanded and it has no successors, i.e. it is a minimal trap space. + True if the node is expanded and it has no successors, i.e. it is a + minimal trap space. """ - is_leaf: bool = self.G.out_degree(node_id) == 0 # type: ignore[reportUnknownMemberType, reportUnknownVariableType] # noqa - return is_leaf and self.G.nodes[node_id]["expanded"] # type: ignore[reportUnknownVariableType] # noqa + is_leaf: bool = self.G.out_degree(node_id) == 0 # type: ignore + return is_leaf and self.G.nodes[node_id]["expanded"] # type: ignore def node_successors(self, node_id: int, compute: bool = False) -> list[int]: """ - Return the successor nodes for the given `node_id`. If the node is already expanded, known results - are simply returned. If the node is not expanded, but `compute` is set to `True`, then the node - is expanded and the newly computed results are returned. If the node is not expanded and `compute` - is set to `False`, the method raises a `KeyError` exception. + Return the successor nodes for the given `node_id`. If the node is + already expanded, known results are simply returned. If the node is not + expanded, but `compute` is set to `True`, then the node is expanded and + the newly computed results are returned. If the node is not expanded and + `compute` is set to `False`, the method raises a `KeyError` exception. - The default behaviour intentionally does not compute successors to prevent "accidental complexity". + The default behaviour intentionally does not compute successors to + prevent "accidental complexity". - WARNING: We do not guarantee the order of the returned nodes. If you need the successors in a - repeatable order, you should sort the list manually. + WARNING: We do not guarantee the order of the returned nodes. If you + need the successors in a repeatable order, you should sort the list + manually. - Also note that if the given `node_id` already has associated attractor data but is not expanded, - this data will be deleted as it is no longer up to date. + Also note that if the given `node_id` already has associated attractor + data but is not expanded, this data will be deleted as it is no longer + up to date. """ - node = self.G.nodes[node_id] + node = cast(dict[str, Any], self.G.nodes[node_id]) if not node["expanded"] and not compute: raise KeyError(f"Node {node_id} is not expanded.") if not node["expanded"]: self._expand_one_node(node_id) - - return list(self.G.successors(node_id)) - def node_attractor_seeds(self, node_id: int, compute: bool = False) -> list[dict[str, int]]: + return list(self.G.successors(node_id)) # type: ignore + + def node_attractor_seeds( + self, node_id: int, compute: bool = False + ) -> list[dict[str, int]]: """ - Return the list of attractor seed states corresponding to the given `node_id`. Similar to - `node_successors`, the method either computes the data if unknown, or throws an exception, - depending on the `compute` flag. + Return the list of attractor seed states corresponding to the given + `node_id`. Similar to `node_successors`, the method either computes the + data if unknown, or throws an exception, depending on the `compute` + flag. - Note that you can compute attractor seeds for stub nodes, but (a) these attractors are not - guaranteed to be unique (i.e. you can "discover" the same attractor in multiple stub nodes, - if the stub nodes intersect), and (b) this data is erased if the stub node is expanded - later on. + Note that you can compute attractor seeds for stub nodes, but (a) these + attractors are not guaranteed to be unique (i.e. you can "discover" the + same attractor in multiple stub nodes, if the stub nodes intersect), and + (b) this data is erased if the stub node is expanded later on. """ - node = self.G.nodes[node_id] + node = cast(dict[str, Any], self.G.nodes[node_id]) + + attractors = cast(list[dict[str, int]] | None, node["attractors"]) - attractors = node["attractors"] - if attractors is None and not compute: raise KeyError(f"Attractor data not computed for node {node_id}.") - + if attractors is None: attractors = compute_attractor_seeds(self, node_id) - node["attractors"] = attractors + node["attractors"] = attractors - return attractors + return attractors def edge_stable_motif(self, parent_id: int, child_id: int) -> dict[str, int]: """ - Return the *stable motif* associated with the specified parent-child edge. + Return the *stable motif* associated with the specified parent-child + edge. - This corresponds to the maximal trap space within the `parent_id` node that, after percolation, - yields the `child_id` node. + This corresponds to the maximal trap space within the `parent_id` node + that, after percolation, yields the `child_id` node. """ - return self.G.edges[parent_id, child_id]["motif"] # type: ignore[reportUnknownVariableType] # noqa + return cast(dict[str, int], self.G.edges[parent_id, child_id]["motif"]) def expand_bfs( - self, - node_id: int | None = None, - bfs_level_limit: int | None = None, + self, + node_id: int | None = None, + bfs_level_limit: int | None = None, size_limit: int | None = None, ) -> bool: """ Explore the succession diagram in a BFS manner. - - If `node_id` is given, initiate BFS from this node. Otherwise use root. - - If `bfs_level_limit` is given, this is the last "level" (distance from the initial node) - of nodes that should be expanded (any subsequent child nodes are left unexplored). - - If `size_limit` is given, the procedure stops once `SuccessionDiagram` exceeds the given size. - - With default settings, the method will explore the whole succession diagram without any restrictions. - - The method returns `True` if the whole exploration was completed and `False` if it was terminated - early based on one of the aforementioned conditions. - - Note that the procedure also explores nodes that are already expanded. I.e. if all nodes at levels - 0,1,2 are expanded, but there are stub nodes on level 3, the procedure will still discover and - expand these stub nodes (assuming sufficient level and size limit). - - Also note that the `size_limit` is only a soft limit: for each node, we always have to create all - child nodes when expanding it. Hence the procedure can only check the condition in between - expanding new nodes. + - If `node_id` is given, initiate BFS from this node. Otherwise use + root. + - If `bfs_level_limit` is given, this is the last "level" (distance + from the initial node) + of nodes that should be expanded (any subsequent child nodes are + left unexplored). - If `size_limit` is given, the procedure stops + once `SuccessionDiagram` exceeds the given size. + + With default settings, the method will explore the whole succession + diagram without any restrictions. + + The method returns `True` if the whole exploration was completed and + `False` if it was terminated early based on one of the aforementioned + conditions. + + Note that the procedure also explores nodes that are already expanded. + I.e. if all nodes at levels 0,1,2 are expanded, but there are stub nodes + on level 3, the procedure will still discover and expand these stub + nodes (assuming sufficient level and size limit). + + Also note that the `size_limit` is only a soft limit: for each node, we + always have to create all child nodes when expanding it. Hence the + procedure can only check the condition in between expanding new nodes. """ return expand_bfs(self, node_id, bfs_level_limit, size_limit) @@ -335,82 +368,90 @@ def expand_dfs( """ Similar to `expand_bfs`, but uses DFS instead of BFS. - The only major difference is the `dfs_stack_limit` which restricts the size of the DFS stack. - Nodes that would appear "deeper" in the stack than this limit are left unexpanded. Note that - this stack size is technically *some* form of distance from the initial node, but not necessarily - the minimal distance. + The only major difference is the `dfs_stack_limit` which restricts the + size of the DFS stack. Nodes that would appear "deeper" in the stack + than this limit are left unexpanded. Note that this stack size is + technically *some* form of distance from the initial node, but not + necessarily the minimal distance. """ return expand_dfs(self, node_id, dfs_stack_limit, size_limit) def expand_minimal_spaces(self, size_limit: int | None = None) -> bool: """ - Expands the succession diagram in a way that guarantees every minimal trap space to be reachable - from the root node, but otherwise (greedily) avoids unnecesary expansion of nodes whenever possible. + Expands the succession diagram in a way that guarantees every minimal + trap space to be reachable from the root node, but otherwise (greedily) + avoids unnecesary expansion of nodes whenever possible. - The algorithm is loosely based on `expand_bfs` implementation, but on each BFS level only expands the - first node that still contains some minimal trap space not covered by a previously expanded node - at that level. + The algorithm is loosely based on `expand_bfs` implementation, but on + each BFS level only expands the first node that still contains some + minimal trap space not covered by a previously expanded node at that + level. - The resulting succession diagram construction is deterministic, but can vary if some nodes are already - expanded initially. In such case, the procedure still tries to avoid expanding unnecessary nodes, - which means existing expanded nodes can be prioritised over the "canonical" ones. + The resulting succession diagram construction is deterministic, but can + vary if some nodes are already expanded initially. In such case, the + procedure still tries to avoid expanding unnecessary nodes, which means + existing expanded nodes can be prioritised over the "canonical" ones. """ return expand_minimal_spaces(self, size_limit) def expand_attractor_seeds(self, size_limit: int | None = None) -> bool: """ - Expands the succession diagram such that for every asynchronous attractor, there is - at least one expanded trap space which is the minimal trap space containing this attractor. - In other words, the procedure expands the succession diagram as little as possible, but + Expands the succession diagram such that for every asynchronous + attractor, there is at least one expanded trap space which is the + minimal trap space containing this attractor. In other words, the + procedure expands the succession diagram as little as possible, but ensures that every attractor is "as easy to identify" as possible. - After this procedure, it is sufficient to search for attractors in expanded nodes. - Note that this method does not perform exact attractor identification. It is possible - that some nodes are expanded spuriously and the succession diagram is thus larger - than necessary. + After this procedure, it is sufficient to search for attractors in + expanded nodes. Note that this method does not perform exact attractor + identification. It is possible that some nodes are expanded spuriously + and the succession diagram is thus larger than necessary. """ return expand_attractor_seeds(self, size_limit) - def expand_to_target(self, target: dict[str, int], size_limit: int | None = None) -> bool: + def expand_to_target( + self, target: dict[str, int], size_limit: int | None = None + ) -> bool: """ - Expands the succession diagram using BFS in such a way that only nodes which intersect - `target` but are not fully contained in it are expanded. - - This is used for example in control, as it ensures that all possible branches of the - succession diagram relevant for a particular "target subspace" are expanded as much - as necessary, but not more. + Expands the succession diagram using BFS in such a way that only nodes + which intersect `target` but are not fully contained in it are expanded. + + This is used for example in control, as it ensures that all possible + branches of the succession diagram relevant for a particular "target + subspace" are expanded as much as necessary, but not more. """ return expand_to_target(self, target, size_limit) def _update_node_depth(self, node_id: int, parent_id: int): """ - An internal method that updates the depth of a node based on a specific parent node. - This assumes that there is an edge from `parent` to `node_id`. + An internal method that updates the depth of a node based on a specific + parent node. This assumes that there is an edge from `parent` to + `node_id`. Note that the depth can only increase. """ assert self.G.edges[parent_id, node_id] is not None - parent_depth = self.G.nodes[parent_id]["depth"] - current_depth = self.G.nodes[node_id]["depth"] + parent_depth = cast(int, self.G.nodes[parent_id]["depth"]) + current_depth = cast(int, self.G.nodes[node_id]["depth"]) self.G.nodes[node_id]["depth"] = max(current_depth, parent_depth + 1) def _expand_one_node(self, node_id: int): """ An internal method that expands a single node of the succession diagram. - This entails computing the maximal trap spaces within the node (stable motifs) - and creating a node for the result (if it does not exist yet). + This entails computing the maximal trap spaces within the node (stable + motifs) and creating a node for the result (if it does not exist yet). If the node is already expanded, the method does nothing. - If there are already some attractor data for this node (stub nodes can have - associated attractor data), this data is erased. + If there are already some attractor data for this node (stub nodes can + have associated attractor data), this data is erased. """ - node = self.G.nodes[node_id] + node = cast(dict[str, Any], self.G.nodes[node_id]) if node["expanded"]: return - node["expanded"] = True + node["expanded"] = True node["attractors"] = None current_space = node["space"] @@ -430,8 +471,11 @@ def _expand_one_node(self, node_id: int): ensure_subspace=current_space, ) - # Sort the spaces based on a unique key in case trappist is not always sorted deterministically. - sub_spaces = sorted(sub_spaces, key=lambda space: space_unique_key(space, self.network)) + # Sort the spaces based on a unique key in case trappist is not always + # sorted deterministically. + sub_spaces = sorted( + sub_spaces, key=lambda space: space_unique_key(space, self.network) + ) if len(sub_spaces) == 0: if DEBUG: @@ -442,21 +486,22 @@ def _expand_one_node(self, node_id: int): print(f"[{node_id}] Found sub-spaces: {len(sub_spaces)}") for sub_space in sub_spaces: - child_id = self._ensure_node(node_id, sub_space) # type: ignore[reportUnusedVariable] # noqa + child_id = self._ensure_node(node_id, sub_space) if DEBUG: print(f"[{node_id}] Created edge into node {child_id}.") def _ensure_node(self, parent_id: int | None, stable_motif: dict[str, int]) -> int: """ - Internal method that ensures the provided node is present in this succession diagram as a - child of the given `parent_id`. + Internal method that ensures the provided node is present in this + succession diagram as a child of the given `parent_id`. - The `stable_motif` is an "initial" trap space that is then percolated to compute the actual - fixed variables for this node. The method also updates the depth of the child node if necessary. + The `stable_motif` is an "initial" trap space that is then percolated to + compute the actual fixed variables for this node. The method also + updates the depth of the child node if necessary. - If the `parent_id` is not given, no edge is created and depth is considered to be zero - (i.e. the node is the root). + If the `parent_id` is not given, no edge is created and depth is + considered to be zero (i.e. the node is the root). """ fixed_vars, _ = percolate_space( @@ -468,26 +513,25 @@ def _ensure_node(self, parent_id: int | None, stable_motif: dict[str, int]) -> i child_id = None if key not in self.node_indices: child_id = self.G.number_of_nodes() - self.G.add_node( + self.G.add_node( # type: ignore child_id, - id = child_id, # Just in case we ever need it within the "node data" dictionary. - space = fixed_vars, - depth = 0, - expanded = False, - attractors = None, + id=child_id, # In case we ever need it within the "node data" dictionary. + space=fixed_vars, + depth=0, + expanded=False, + attractors=None, ) self.node_indices[key] = child_id else: child_id = self.node_indices[key] - assert child_id is not None - + if parent_id is not None: - # TODO: It seems that there are some networks where the same child can be reached - # through multiple stable motifs. Not sure how to approach these... but this is - # probably good enough for now. - self.G.add_edge(parent_id, child_id, motif=stable_motif) + # TODO: It seems that there are some networks where the same child + # can be reached through multiple stable motifs. Not sure how to + # approach these... but this is probably good enough for now. + self.G.add_edge(parent_id, child_id, motif=stable_motif) # type: ignore self._update_node_depth(child_id, parent_id) return child_id diff --git a/nfvsmotifs/_sd_algorithms/__init__.py b/nfvsmotifs/_sd_algorithms/__init__.py index 7f502494..d31fddff 100644 --- a/nfvsmotifs/_sd_algorithms/__init__.py +++ b/nfvsmotifs/_sd_algorithms/__init__.py @@ -1,11 +1,12 @@ """ -This is an internal module which contains the implementations of the more involved -algorithms related to `SuccessionDiagram` construction and manipulation. The idea -is that any implementation that requires some helper methods or data structures -should be present here instead of `SuccessionDiagram.py` to avoid mixing of code -used by different algorithms and bloating the core succession diagram implementation. +This is an internal module which contains the implementations of the more +involved algorithms related to `SuccessionDiagram` construction and +manipulation. The idea is that any implementation that requires some helper +methods or data structures should be present here instead of +`SuccessionDiagram.py` to avoid mixing of code used by different algorithms and +bloating the core succession diagram implementation. -The algorithms within this module here should only use the public API of -the `SuccessionDiagram` to avoid violating its invariants. The actual algorithms -are then "re-exported" as a public methods of the `SuccessionDiagram` class. -""" \ No newline at end of file +The algorithms within this module here should only use the public API of the +`SuccessionDiagram` to avoid violating its invariants. The actual algorithms are +then "re-exported" as a public methods of the `SuccessionDiagram` class. +""" diff --git a/nfvsmotifs/_sd_algorithms/compute_attractor_seeds.py b/nfvsmotifs/_sd_algorithms/compute_attractor_seeds.py index 2794164f..81abbe7a 100644 --- a/nfvsmotifs/_sd_algorithms/compute_attractor_seeds.py +++ b/nfvsmotifs/_sd_algorithms/compute_attractor_seeds.py @@ -5,21 +5,27 @@ if TYPE_CHECKING: from nfvsmotifs.SuccessionDiagram import SuccessionDiagram -from nfvsmotifs.motif_avoidant import detect_motif_avoidant_attractors, make_retained_set -from nfvsmotifs.terminal_restriction_space import get_terminal_restriction_space -from nfvsmotifs.trappist_core import compute_fixed_point_reduced_STG, trappist import nfvsmotifs +import nfvsmotifs.SuccessionDiagram +from nfvsmotifs.motif_avoidant import ( + detect_motif_avoidant_attractors, + make_retained_set, +) +from nfvsmotifs.terminal_restriction_space import get_terminal_restriction_space +from nfvsmotifs.trappist_core import compute_fixed_point_reduced_STG + def compute_attractor_seeds( sd: SuccessionDiagram, node_id: int, ) -> list[dict[str, int]]: """ - Compute the list of vertices such that each attractor within the subspace of the given `node_id` is covered by - exactly one vertex. + Compute the list of vertices such that each attractor within the subspace of + the given `node_id` is covered by exactly one vertex. - If the node is a stub, the result covers the whole subspace. If the node is expanded, the result only covers - the "immediate" subspace without the subspaces of the child nodes. + If the node is a stub, the result covers the whole subspace. If the node is + expanded, the result only covers the "immediate" subspace without the + subspaces of the child nodes. """ if nfvsmotifs.SuccessionDiagram.DEBUG: @@ -28,13 +34,14 @@ def compute_attractor_seeds( node_space = sd.node_space(node_id) if len(node_space) == sd.network.num_vars(): - # This node is a fixed-point. + # This node is a fixed-point. return [node_space] - # Compute the list of child spaces if the node is expanded. Otherwise "pretend" that there are no children. + # Compute the list of child spaces if the node is expanded. Otherwise + # "pretend" that there are no children. child_spaces = [] if sd.node_is_expanded(node_id): - child_spaces = [ sd.node_space(s) for s in sd.node_successors(node_id) ] + child_spaces = [sd.node_space(s) for s in sd.node_successors(node_id)] # Fix everything in the NFVS to zero, as long as # it isn't already fixed by our `node_space`. @@ -43,7 +50,7 @@ def compute_attractor_seeds( # the space is a trap and this will remove the corresponding unnecessary # Petri net transitions. retained_set = make_retained_set(sd.network, sd.nfvs, node_space, child_spaces) - + if len(retained_set) == sd.network.num_vars() and len(child_spaces) == 0: # There is only a single attractor remaining here, # and its "seed" is the retained set. @@ -62,7 +69,7 @@ def compute_attractor_seeds( retained_set, ensure_subspace=node_space, avoid_subspaces=child_spaces, - ) + ) if nfvsmotifs.SuccessionDiagram.DEBUG: print(f"[{node_id}] Found {len(candidate_seeds)} seed candidates.") @@ -78,7 +85,7 @@ def compute_attractor_seeds( candidate_seeds, terminal_restriction_space, max_iterations=1000, - is_in_an_mts=len(child_spaces)==0 + is_in_an_mts=len(child_spaces) == 0, ) return attractors diff --git a/nfvsmotifs/_sd_algorithms/expand_attractor_seeds.py b/nfvsmotifs/_sd_algorithms/expand_attractor_seeds.py index 623fd1aa..fcc89b42 100644 --- a/nfvsmotifs/_sd_algorithms/expand_attractor_seeds.py +++ b/nfvsmotifs/_sd_algorithms/expand_attractor_seeds.py @@ -5,24 +5,29 @@ if TYPE_CHECKING: from nfvsmotifs.SuccessionDiagram import SuccessionDiagram -from nfvsmotifs.trappist_core import compute_fixed_point_reduced_STG +import nfvsmotifs +import nfvsmotifs.SuccessionDiagram from nfvsmotifs.motif_avoidant import make_retained_set from nfvsmotifs.space_utils import intersect -import nfvsmotifs +from nfvsmotifs.trappist_core import compute_fixed_point_reduced_STG + def expand_attractor_seeds(sd: SuccessionDiagram, size_limit: int | None = None): """ See `SuccessionDiagram.expand_attractor_seeds` for documentation. """ - # First, expand the succession diagram such that all minimal trap spaces are found. - # This reduces the amount of work performed in this algorithm, because for every attractor - # in a minimal trap space, we already have the closest trap space, now we just need to - # do the same for (potential) motif-avoidant attractors. + # First, expand the succession diagram such that all minimal trap spaces are + # found. This reduces the amount of work performed in this algorithm, + # because for every attractor in a minimal trap space, we already have the + # closest trap space, now we just need to do the same for (potential) + # motif-avoidant attractors. sd.expand_minimal_spaces(size_limit) if nfvsmotifs.SuccessionDiagram.DEBUG: - print("Minimal trap space expansion finished. Proceeding to attractor expansion.") + print( + "Minimal trap space expansion finished. Proceeding to attractor expansion." + ) root = sd.root() seen = set([root]) @@ -37,21 +42,23 @@ def expand_attractor_seeds(sd: SuccessionDiagram, size_limit: int | None = None) return False successors = sd.node_successors(node, compute=True) - successors = sorted(successors, reverse=True) # For determinism! + successors = sorted(successors, reverse=True) # For determinism! # (reversed because we explore the list from the back) - - node_space = sd.node_space(node) # Retrieve the stable motifs of children that are already expanded. - expanded_children = [ x for x in sd.node_successors(node) if sd.node_is_expanded(x) ] - expanded_motifs = [ sd.edge_stable_motif(node, child) for child in expanded_children ] - - # Now, we skip all successors that are either already seen, or that + expanded_children = [ + x for x in sd.node_successors(node) if sd.node_is_expanded(x) + ] + expanded_motifs = [ + sd.edge_stable_motif(node, child) for child in expanded_children + ] + + # Now, we skip all successors that are either already seen, or that # do not contain any candidate states for motif-avoidant attractors. while len(successors) > 0: if successors[-1] in seen: - # The next node was already seen on stack. We can thus skip it and continue - # to the next one. + # The next node was already seen on stack. We can thus skip it + # and continue to the next one. successors.pop() continue if sd.node_is_expanded(successors[-1]): @@ -65,8 +72,10 @@ def expand_attractor_seeds(sd: SuccessionDiagram, size_limit: int | None = None) successor_space = sd.node_space(successors[-1]) retained_set = make_retained_set(sd.network, sd.nfvs, successor_space) - avoid_or_none = [ intersect(successor_space, child) for child in expanded_motifs ] - avoid = [ x for x in avoid_or_none if x is not None ] + avoid_or_none = [ + intersect(successor_space, child) for child in expanded_motifs + ] + avoid = [x for x in avoid_or_none if x is not None] successor_seeds = compute_fixed_point_reduced_STG( sd.petri_net, @@ -77,14 +86,16 @@ def expand_attractor_seeds(sd: SuccessionDiagram, size_limit: int | None = None) ) if len(successor_seeds) == 0: - # At this point, we know that this successor is not expanded and there are either - # no candidate states in it, or all candidate states are already covered by some - # other expanded successor. + # At this point, we know that this successor is not expanded and + # there are either no candidate states in it, or all candidate + # states are already covered by some other expanded successor. successors.pop() continue if nfvsmotifs.SuccessionDiagram.DEBUG: - print(f"[{node}] Found successor with new attractor candidate seeds. Expand node {successors[-1]}.") + print( + f"[{node}] Found successor with new attractor candidate seeds. Expand node {successors[-1]}." + ) break @@ -100,5 +111,5 @@ def expand_attractor_seeds(sd: SuccessionDiagram, size_limit: int | None = None) stack.append((node, successors)) # Push the successor onto the stack. stack.append((s, None)) - - return True \ No newline at end of file + + return True diff --git a/nfvsmotifs/_sd_algorithms/expand_bfs.py b/nfvsmotifs/_sd_algorithms/expand_bfs.py index b756180d..9a2be00c 100644 --- a/nfvsmotifs/_sd_algorithms/expand_bfs.py +++ b/nfvsmotifs/_sd_algorithms/expand_bfs.py @@ -5,11 +5,12 @@ if TYPE_CHECKING: from nfvsmotifs.SuccessionDiagram import SuccessionDiagram + def expand_bfs( - sd: SuccessionDiagram, - node_id: int | None = None, - bfs_level_limit: int | None = None, - size_limit: int | None = None + sd: SuccessionDiagram, + node_id: int | None = None, + bfs_level_limit: int | None = None, + size_limit: int | None = None, ) -> bool: """ See `SuccessionDiagram.expand_bfs` for documentation. @@ -18,12 +19,12 @@ def expand_bfs( if node_id is None: node_id = sd.root() - seen = set() + seen: set[int] = set() seen.add(node_id) level_id = 0 current_level = [node_id] - next_level = [] + next_level: list[int] = [] while len(current_level) > 0: for node in current_level: @@ -42,9 +43,9 @@ def expand_bfs( if s not in seen: seen.add(s) next_level.append(s) - + # The level is explored. Check if this exceeds the level limit. - if (bfs_level_limit is not None) and (level_id >= bfs_level_limit): + if (bfs_level_limit is not None) and (level_id >= bfs_level_limit): # Level limit reached. return False diff --git a/nfvsmotifs/_sd_algorithms/expand_dfs.py b/nfvsmotifs/_sd_algorithms/expand_dfs.py index d113d613..d9d59f53 100644 --- a/nfvsmotifs/_sd_algorithms/expand_dfs.py +++ b/nfvsmotifs/_sd_algorithms/expand_dfs.py @@ -5,6 +5,7 @@ if TYPE_CHECKING: from nfvsmotifs.SuccessionDiagram import SuccessionDiagram + def expand_dfs( sd: SuccessionDiagram, node_id: int | None = None, @@ -18,7 +19,7 @@ def expand_dfs( if node_id is None: node_id = sd.root() - seen = set() + seen: set[int] = set() seen.add(node_id) stack: list[tuple[int, list[int] | None]] = [(node_id, None)] @@ -32,9 +33,9 @@ def expand_dfs( if (size_limit is not None) and (len(sd) >= size_limit): # Size limit reached. return False - + successors = sd.node_successors(node, compute=True) - successors = sorted(successors, reverse=True) # For determinism! + successors = sorted(successors, reverse=True) # For determinism! # (reversed because we explore the list from the back) # Remove all immediate successors that are already visited. @@ -48,7 +49,7 @@ def expand_dfs( if (dfs_stack_limit is not None) and (len(stack) >= dfs_stack_limit): # We cannot push any successor nodes because it would exceed # the stack limit. As such, we can just continue with the next - # item on the stack. however, we must remember that we skipped + # item on the stack. however, we must remember that we skipped # some nodes and the result is thus incomplete. result_is_complete = False continue @@ -58,6 +59,6 @@ def expand_dfs( # Push the node back with the remaining successors. stack.append((node, successors)) # Push the successor onto the stack. - stack.append((s, None)) + stack.append((s, None)) - return result_is_complete \ No newline at end of file + return result_is_complete diff --git a/nfvsmotifs/_sd_algorithms/expand_minimal_spaces.py b/nfvsmotifs/_sd_algorithms/expand_minimal_spaces.py index fba25046..7fc4c9a8 100644 --- a/nfvsmotifs/_sd_algorithms/expand_minimal_spaces.py +++ b/nfvsmotifs/_sd_algorithms/expand_minimal_spaces.py @@ -5,10 +5,10 @@ if TYPE_CHECKING: from nfvsmotifs.SuccessionDiagram import SuccessionDiagram - from nfvsmotifs.space_utils import is_subspace from nfvsmotifs.trappist_core import trappist + def expand_minimal_spaces(sd: SuccessionDiagram, size_limit: int | None = None) -> bool: """ See `SuccessionDiagram.expand_minimal_spaces` for documentation. @@ -18,7 +18,7 @@ def expand_minimal_spaces(sd: SuccessionDiagram, size_limit: int | None = None) root = sd.root() - seen = set([root]) + seen = set([root]) stack: list[tuple[int, list[int] | None]] = [(root, None)] @@ -29,18 +29,18 @@ def expand_minimal_spaces(sd: SuccessionDiagram, size_limit: int | None = None) if (size_limit is not None) and (len(sd) >= size_limit): # Size limit reached. return False - + successors = sd.node_successors(node, compute=True) - successors = sorted(successors, reverse=True) # For determinism! + successors = sorted(successors, reverse=True) # For determinism! # (reversed because we explore the list from the back) node_space = sd.node_space(node) # Remove all immediate successors that are already visited or those who - # do not cover any new minimal trap space. - while len(successors) > 0: + # do not cover any new minimal trap space. + while len(successors) > 0: if successors[-1] in seen: - successors.pop() + successors.pop() continue if len([s for s in minimal_traps if is_subspace(s, node_space)]) == 0: successors.pop() @@ -56,7 +56,7 @@ def expand_minimal_spaces(sd: SuccessionDiagram, size_limit: int | None = None) continue # At this point, we know that `s` is not visited and it contains - # at least one minimal trap space that does not appear in the + # at least one minimal trap space that does not appear in the # succession diagram yet. s = successors.pop() diff --git a/nfvsmotifs/_sd_algorithms/expand_to_target.py b/nfvsmotifs/_sd_algorithms/expand_to_target.py index 5c189edf..e033bd21 100644 --- a/nfvsmotifs/_sd_algorithms/expand_to_target.py +++ b/nfvsmotifs/_sd_algorithms/expand_to_target.py @@ -5,9 +5,12 @@ if TYPE_CHECKING: from nfvsmotifs.SuccessionDiagram import SuccessionDiagram -from nfvsmotifs.space_utils import is_subspace, intersect +from nfvsmotifs.space_utils import intersect, is_subspace -def expand_to_target(sd: SuccessionDiagram, target: dict[str, int], size_limit: int | None = None): + +def expand_to_target( + sd: SuccessionDiagram, target: dict[str, int], size_limit: int | None = None +): """ See `SuccessionDiagram.exapnd_to_target` for documentation. """ @@ -17,7 +20,7 @@ def expand_to_target(sd: SuccessionDiagram, target: dict[str, int], size_limit: level_id = 0 current_level = [root] - next_level = [] + next_level: list[int] = [] while len(current_level) > 0: for node in current_level: @@ -45,7 +48,6 @@ def expand_to_target(sd: SuccessionDiagram, target: dict[str, int], size_limit: # Add successors to the next level and to the seen set. for s in successors: - if s not in seen: seen.add(s) next_level.append(s) @@ -55,4 +57,4 @@ def expand_to_target(sd: SuccessionDiagram, target: dict[str, int], size_limit: current_level = next_level next_level = [] - return True \ No newline at end of file + return True diff --git a/nfvsmotifs/control.py b/nfvsmotifs/control.py new file mode 100644 index 00000000..6543442a --- /dev/null +++ b/nfvsmotifs/control.py @@ -0,0 +1,367 @@ +from __future__ import annotations + +from itertools import combinations, product +from typing import cast + +import networkx as nx # type: ignore +from biodivine_aeon import BooleanNetwork + +from nfvsmotifs.space_utils import percolate_space +from nfvsmotifs.SuccessionDiagram import SuccessionDiagram + +SuccessionType = list[dict[str, int]] # sequence of stable motifs +ControlType = list[dict[str, int]] # ways of locking in an individual stable motif + + +def controls_are_equal(a: ControlType, b: ControlType) -> bool: + return set(frozenset(x.items()) for x in a) == set(frozenset(x.items()) for x in b) + + +class Intervention: + def __init__( + self, control: list[ControlType], strategy: str, succession: SuccessionType + ): + self._control = control + self._strategy = strategy + self._succession = succession + self._successful = not any(not c for c in control) + + @property + def control(self): + return self._control + + @property + def strategy(self): + return self._strategy + + @property + def succession(self): + return self._succession + + @property + def successful(self): + return self._successful + + def is_equivalent(self, other: Intervention) -> bool: + if self.strategy != other.strategy: + return False + + # if using external drivers, the succession matters because it + # determines how long you have to maintain temporary controls + if self.strategy == "all": + if self.succession != other.succession: + return False + + if len(self.control) != len(other.control): + return False + + for d1, d2 in zip(self.control, other.control): + if not controls_are_equal(d1, d2): + return False + + return True + + def __eq__(self, other: object): + if not isinstance(other, Intervention): + return False + + # if the strategy is "all", then is_equivalent will handle the + # succession comparison + if self.strategy != "all": + if self.succession != other.succession: + return False + + if not self.is_equivalent(other): + return False + + return True + + def __repr__(self): + return ( + f"Intervention(" + f"{self.control}," + f"{self.strategy}," + f"{self.succession}," + f"{self.successful})" + ) + + def __str__(self): + succession_string = ( + f"Intervention is {'' if self.successful else 'UN'}SUCCESSFUL operating on\n" + + "\n".join(map(str, self.succession)) + + "\noverride\n" + ) + if self.strategy == "internal": + return succession_string + " and \n".join( + f"({' or '.join(map(str,motif_control))})" + for motif_control in self.control + ) + elif self.strategy == "all": + return succession_string + "temporarily, and then \n".join( + f"({' or '.join(map(str,motif_control))})" + for motif_control in self.control + ) + else: + return "unknown strategy: " + self.__repr__() + + +def succession_control( + bn: BooleanNetwork, + target: dict[str, int], + strategy: str = "internal", + succession_diagram: SuccessionDiagram | None = None, + max_drivers_per_succession_node: int | None = None, + forbidden_drivers: set[str] | None = None, + successful_only: bool = True, +) -> list[Intervention]: + """_summary_ + + Parameters + ---------- + bn : BooleanNetwork + The network to analyze, which contains the Boolean update functions. + target : dict[str, int] + The target subspace. + strategy : str, optional + The searching strategy to use to look for driver nodes. Options are + 'internal' (default), 'all'. + succession_diagram : SuccessionDiagram | None, optional + The succession diagram from which successions will be extracted. If + `None`, then a succession diagram will be generated from `bn`. + max_drivers_per_succession_node: int | None = None, + The maximum number of drivers that will be tested for a succession + diagram node. If `None`, then a number of drivers up to the size of the + succession diagram node's stable motif will be tested + forbidden_drivers: set[str] | None + A set of forbidden drivers that will not be overridden for control. If + `None`, then all nodes are candidates for control. + successful_only: bool + Whether to only return successful interventions (default: `True`). + + Returns + ------- + list[Intervention] + A list of control intervention objects. Note that when `successful_only` + is `False`, returned interventions may be unsuccessful if + `max_drivers_per_succession_node` is set too small, or crucial nodes are + included in `forbidden_drivers`. To test, examine the `successful` + property of the intervention. + """ + interventions: list[Intervention] = [] + + if succession_diagram is None: + succession_diagram = SuccessionDiagram(bn) + + successions = successions_to_target( + succession_diagram, target=target, expand_diagram=True + ) + + for succession in successions: + controls = drivers_of_succession( + bn, + succession, + strategy=strategy, + max_drivers_per_succession_node=max_drivers_per_succession_node, + forbidden_drivers=forbidden_drivers, + ) + intervention = Intervention(controls, strategy, succession) + + if not successful_only or intervention.successful: + interventions.append(intervention) + + return interventions + + +def successions_to_target( + succession_diagram: SuccessionDiagram, + target: dict[str, int], + expand_diagram: bool = True, +) -> list[SuccessionType]: + """Find lists of nested trap spaces (successions) that lead to the + specified target subspace. + + Parameters + ---------- + succession_diagram : SuccessionDiagram + The succession diagram from which successions will be extracted. + target : dict[str, int] + The target subspace. + expand_diagram: bool + Whether to ensure that the succession diagram is expanded enough to + capture all paths to the target (default: True). + + Returns + ------- + list[SuccessionType] + A list of successions, where each succession is a list of sequentially + nested trap spaces that specify the target. + """ + successions: list[SuccessionType] = [] + + # expand the succession_diagram toward the target + if expand_diagram: + succession_diagram.expand_to_target( + target=target, + ) + + for s in cast(list[int], succession_diagram.G.nodes()): + fixed_vars = cast(dict[str, int], succession_diagram.G.nodes[s]["space"]) + is_consistent = not any( + k in target and target[k] != v for k, v in fixed_vars.items() + ) + is_last_needed = set(target) <= set(fixed_vars) + + if not is_consistent or not is_last_needed: + continue + + for path in cast( + list[list[int]], + nx.all_simple_paths( # type: ignore + succession_diagram.G, + source=succession_diagram.root(), + target=s, + ), + ): + succession = [ + cast(dict[str, int], succession_diagram.G.edges[x, y]["motif"]) + for x, y in zip(path[:-1], path[1:]) + ] + successions.append(succession) + + return successions + + +def drivers_of_succession( + bn: BooleanNetwork, + succession: list[dict[str, int]], + strategy: str = "internal", + max_drivers_per_succession_node: int | None = None, + forbidden_drivers: set[str] | None = None, +) -> list[ControlType]: + """Find driver nodes of a list of sequentially nested trap spaces + + Parameters + ---------- + bn : BooleanNetwork + The network to analyze, which contains the Boolean update functions. + succession : list[dict[str, int]] + A list of sequentially nested trap spaces that specify the target. + strategy: str + The searching strategy to use to look for driver nodes. Options are + 'internal' (default), 'all'. + max_drivers_per_succession_node: int | None = None, + The maximum number of drivers that will be tested for a succession + diagram node. If `None`, then a number of drivers up to the size of the + succession diagram node's stable motif will be tested + forbidden_drivers: set[str] | None + A set of forbidden drivers that will not be overridden for control. If + `None`, then all nodes are candidates for control. + + Returns + ------- + list[ControlType] + A list of controls. Each control is a list of lists of driver sets, + represented as state dictionaries. Each list item corresponds to a list + of drivers for the corresponding trap space in the succession. + """ + control_strategies: list[ControlType] = [] + assume_fixed: dict[str, int] = {} + for ts in succession: + control_strategies.append( + find_drivers( + bn, + ts, + strategy=strategy, + assume_fixed=assume_fixed, + max_drivers_per_succession_node=max_drivers_per_succession_node, + forbidden_drivers=forbidden_drivers, + ) + ) + ldoi, _ = percolate_space(bn, ts | assume_fixed, strict_percolation=False) + assume_fixed.update(ldoi) + + return control_strategies + + +def find_drivers( + bn: BooleanNetwork, + target_trap_space: dict[str, int], + strategy: str = "internal", + assume_fixed: dict[str, int] | None = None, + max_drivers_per_succession_node: int | None = None, + forbidden_drivers: set[str] | None = None, +) -> ControlType: + """Finds drives of a given target trap space + + Parameters + ---------- + bn : BooleanNetwork + The network to analyze, which contains the Boolean update functions. + target_trap_space : dict[str, int] + The trap space we want to find drivers for. + strategy: str + The searching strategy to use to look for driver nodes. Options are + 'internal' (default), 'all'. + assume_fixed: dict[str,int] | None + A dictionary of fixed variables that should be assumed to be fixed. + max_drivers_per_succession_node: int | None = None, + The maximum number of drivers that will be tested for a succession + diagram node. If `None`, then a number of drivers up to the size of the + succession diagram node's stable motif will be tested + forbidden_drivers: set[str] | None + A set of forbidden drivers that will not be overridden for control. If + `None`, then all nodes are candidates for control. + + Returns + ------- + ControlType + A list of internal driver sets, represented as state dictionaries. If + empty, then no drivers are found. This can happen if + `max_drivers_per_succession_node` is not `None`, or if all controls + require nodes in `forbidden_drivers`. + """ + if assume_fixed is None: + assume_fixed = {} + if forbidden_drivers is None: + forbidden_drivers = set() + + target_trap_space_inner = { + k: v for k, v in target_trap_space.items() if k not in assume_fixed + } + + if strategy == "internal": + driver_pool = set(target_trap_space_inner) - forbidden_drivers + elif strategy == "all": + driver_pool = ( + set(bn.get_variable_name(id) for id in bn.variables()) - forbidden_drivers + ) + else: + raise ValueError("Unknown driver search strategy") + + if max_drivers_per_succession_node is None: + max_drivers_per_succession_node = len(target_trap_space_inner) + + drivers: ControlType = [] + for driver_set_size in range(max_drivers_per_succession_node + 1): + for driver_set in combinations(driver_pool, driver_set_size): + if any(set(d) <= set(driver_set) for d in drivers): + continue + + if strategy == "internal": + driver_dict = {k: target_trap_space_inner[k] for k in driver_set} + ldoi, _ = percolate_space( + bn, driver_dict | assume_fixed, strict_percolation=False + ) + if target_trap_space.items() <= ldoi.items(): + drivers.append(driver_dict) + elif strategy == "all": + for vals in product([0, 1], repeat=driver_set_size): + driver_dict = { + driver: value for driver, value in zip(driver_set, vals) + } + ldoi, _ = percolate_space( + bn, driver_dict | assume_fixed, strict_percolation=False + ) + if target_trap_space.items() <= ldoi.items(): + drivers.append(driver_dict) + return drivers diff --git a/nfvsmotifs/motif_avoidant.py b/nfvsmotifs/motif_avoidant.py index 2b79ac8f..524a0b86 100644 --- a/nfvsmotifs/motif_avoidant.py +++ b/nfvsmotifs/motif_avoidant.py @@ -1,7 +1,6 @@ from __future__ import annotations import random -from copy import deepcopy from functools import reduce from typing import TYPE_CHECKING @@ -21,8 +20,8 @@ ) if TYPE_CHECKING: - from pyeda.boolalg.bdd import BinaryDecisionDiagram from biodivine_aeon import BooleanNetwork + from pyeda.boolalg.bdd import BinaryDecisionDiagram """ @@ -39,30 +38,31 @@ def make_retained_set( """ Calculate the retained set. - The retained set is technically a space-like object that describes the variables which have - to be fixed in order for the network to lose any complex attractors. However, note that this - really means changing the update functions. I.e. this is not a trap space that only contains - fixed-points, but a description of how the network must be modified to remove complex + The retained set is technically a space-like object that describes the + variables which have to be fixed in order for the network to lose any + complex attractors. However, note that this really means changing the update + functions. I.e. this is not a trap space that only contains fixed-points, + but a description of how the network must be modified to remove complex attractors. - Finally, the construction guarantees that any complex attractor of the old network will - manifest as at least one fixed-point in the new network. + Finally, the construction guarantees that any complex attractor of the old + network will manifest as at least one fixed-point in the new network. """ if child_spaces is None: child_spaces = [] - # Initially, the retained set only contains the fixed values from the - # current node space (this elimiantes unnecessary Petri net transitions - # for values which we already proved are constant). - # - # In the following code, we then extend the retained set based on the model's NFVS - # and the current child spaces. + # Initially, the retained set only contains the fixed values from the + # current node space (this elimiantes unnecessary Petri net transitions for + # values which we already proved are constant). + # + # In the following code, we then extend the retained set based on the + # model's NFVS and the current child spaces. retained_set = space.copy() - - # First, if there are any child spaces present, we extend the retained set with the - # values from the one that has the least amount of fixed variables shared with the NFVS. + # First, if there are any child spaces present, we extend the retained set + # with the values from the one that has the least amount of fixed variables + # shared with the NFVS. if len(child_spaces) > 0: # Find the child space that has the fewest nodes in common with the NFVS: least_common_child_space = child_spaces[0] @@ -76,9 +76,9 @@ def make_retained_set( for x in least_common_child_space: if (x not in retained_set) and (x in nfvs): retained_set[x] = least_common_child_space[x] - - # Then, set the remaining NFVS variables based on the majority output value in the update - # function of the relevant variable. + + # Then, set the remaining NFVS variables based on the majority output value + # in the update function of the relevant variable. for x in nfvs: if x in retained_set: continue @@ -88,8 +88,8 @@ def make_retained_set( input_count = len(list(pyeda_fx.support)) half_count = pow(2, input_count - 1) - sat_count = pyeda_fx.satisfy_count() - + sat_count = pyeda_fx.satisfy_count() # type: ignore + if sat_count > half_count: retained_set[x] = 1 else: @@ -97,6 +97,7 @@ def make_retained_set( return retained_set + def detect_motif_avoidant_attractors( network: BooleanNetwork, petri_net: DiGraph, @@ -107,12 +108,16 @@ def detect_motif_avoidant_attractors( is_in_an_mts: bool = False, ) -> list[dict[str, int]]: """ - Compute a sub-list of `candidates` which correspond to motif-avoidant attractors. - Other method inputs: - - `network` and `petri_net` represent the model in which the property should be checked. - - `terminal_restriction_space` is a symbolic set of states which contains all motif avoidant - attractors (i.e. if a candidate state can leave this set, the candidate cannot be an attractor). - - `max_iterations` specifies how much time should be spent on the "simpler" preprocessing + Compute a sub-list of `candidates` which correspond to motif-avoidant + attractors. Other method inputs: + - `network` and `petri_net` represent the model in which the property + should be checked. + - `terminal_restriction_space` is a symbolic set of states which contains + all motif avoidant + attractors (i.e. if a candidate state can leave this set, the candidate + cannot be an attractor). + - `max_iterations` specifies how much time should be spent on the "simpler" + preprocessing before applying a more complete method. """ if ensure_subspace is None: @@ -130,7 +135,7 @@ def detect_motif_avoidant_attractors( terminal_restriction_space, max_iterations, ensure_subspace=ensure_subspace, - is_in_an_mts=is_in_an_mts + is_in_an_mts=is_in_an_mts, ) if len(candidates) == 0: @@ -148,7 +153,7 @@ def _preprocess_candidates( terminal_restriction_space: BinaryDecisionDiagram, max_iterations: int, ensure_subspace: dict[str, int] | None = None, - is_in_an_mts: bool = False + is_in_an_mts: bool = False, ) -> list[dict[str, int]]: """ A fast but incomplete method for eliminating spurious attractor candidates. @@ -179,7 +184,7 @@ def _preprocess_candidates( continue var_name = network.get_variable_name(varID) variables.append(var_name) - function_expression = network.get_update_function(varID) + function_expression = network.get_update_function(varID) function_bdd = expr2bdd(aeon_to_pyeda(function_expression)) update_functions[var_name] = function_bdd @@ -192,16 +197,16 @@ def _preprocess_candidates( # is a minimal trap or not. In previous work, this was shown to work # well, but in the future we need to better document the resoning # behind these two algorithms. - if is_in_an_mts == False: + if not is_in_an_mts: # Copy is sufficient because we won't be modifying the states within the set. candidates_dnf = candidates.copy() - filtered_candidates = [] - for state in candidates: + filtered_candidates: list[dict[str, int]] = [] + for state in candidates: # Remove the state from the candidates. If we can prove that is # is not an attractor, we will put it back. candidates_dnf = remove_state_from_dnf(candidates_dnf, state) - simulation = state.copy() # A copy of the state that we can overwrite. + simulation = state.copy() # A copy of the state that we can overwrite. is_valid_candidate = True for _ in range(max_iterations): # Advance all variables by one step in random order. @@ -214,7 +219,7 @@ def _preprocess_candidates( if dnf_function_is_true(candidates_dnf, simulation): # The state can reach some other state in the candidate # set. This does not mean it cannot be an attractor, but - # it means it is sufficient to keep considering + # it means it is sufficient to keep considering # the remaining candidates. is_valid_candidate = False break @@ -231,12 +236,12 @@ def _preprocess_candidates( # into the candidate set. candidates_dnf.append(state) filtered_candidates.append(state) - + return filtered_candidates else: filtered_candidates = [] - for i in range(max_iterations): - generator.shuffle(variables) + for _ in range(max_iterations): + generator.shuffle(variables) candidates_dnf = candidates.copy() filtered_candidates = [] @@ -260,6 +265,7 @@ def _preprocess_candidates( return filtered_candidates + def _filter_candidates( petri_net: DiGraph, candidates: list[dict[str, int]], diff --git a/nfvsmotifs/space_utils.py b/nfvsmotifs/space_utils.py index e12ddea3..5402d0c0 100644 --- a/nfvsmotifs/space_utils.py +++ b/nfvsmotifs/space_utils.py @@ -275,16 +275,17 @@ def expression_to_space_list(expression: Expression) -> list[dict[str, int]]: return sub_spaces + def space_unique_key(space: dict[str, int], network: BooleanNetwork) -> int: """ - Computes an integer which is a unique representation of the provided `space` + Computes an integer which is a unique representation of the provided `space` (with respect to the given `network`). - This integer key can be used instead of the original `space` in places where - dictionaries are not allowed, such as a key within a larger dictionary, or + This integer key can be used instead of the original `space` in places where + dictionaries are not allowed, such as a key within a larger dictionary, or a sorting key. - Note that when used for sorting, this key essentially implements a particular + Note that when used for sorting, this key essentially implements a particular form of lexicographic ordering on spaces. This is always a total ordering (there is no ambiguity). """ diff --git a/nfvsmotifs/trappist_core.py b/nfvsmotifs/trappist_core.py index 5ccc41af..e3963f77 100644 --- a/nfvsmotifs/trappist_core.py +++ b/nfvsmotifs/trappist_core.py @@ -31,9 +31,9 @@ def trappist_async( avoid_subspaces: list[dict[str, int]] | None = None, ): """ - The same as the `trappist` method, but instead of returning a list of spaces as a result, the - spaces are returned to the supplied `on_solution` callback. You can stop the enumeration by - returning `False` from this callback. + The same as the `trappist` method, but instead of returning a list of spaces + as a result, the spaces are returned to the supplied `on_solution` callback. + You can stop the enumeration by returning `False` from this callback. """ if ensure_subspace is None: ensure_subspace = {} @@ -91,21 +91,26 @@ def trappist( avoid_subspaces: list[dict[str, int]] | None = None, ) -> list[dict[str, int]]: """ - Solve the given `problem` for the given `network` using the Trappist algorithm, internally relying on the - Python bindings of the `clingo` ASP solver. + Solve the given `problem` for the given `network` using the Trappist + algorithm, internally relying on the Python bindings of the `clingo` ASP + solver. Arguments: - - `network`: Can be either a `BooleanNetwork`, or a Petri net (`DiGraph`) compatible with the encoding - in `petri_net_translation` module. The behaviour is undefined for other `DiGraph` instances. - - `problem`: `min` minimum trap spaces; `max` maximum trap spaces; `fix` fixed points. Default: `min`. - - `reverse_time`: If `True`, a time-reversed network should be considered. Default: `False`. - - `solution_limit`: If given, the result is limited to the given number of solutions. Default: `None`. - - The result is a list of spaces represented as dictionaries. If you want to avoid enumerating all solutions - explicitly as one list, you can use `trappist_async` which has a similar API but can yield solutions - one by one. - - Finally, recall that the supplied network must have its names sanitized (see `petri_net_translation` module). + - `network`: Can be either a `BooleanNetwork`, or a Petri net + (`DiGraph`) compatible with the encoding + in `petri_net_translation` module. The behaviour is undefined for other + `DiGraph` instances. - `problem`: `min` minimum trap spaces; `max` + maximum trap spaces; `fix` fixed points. Default: `min`. - + `reverse_time`: If `True`, a time-reversed network should be considered. + Default: `False`. - `solution_limit`: If given, the result is limited to + the given number of solutions. Default: `None`. + + The result is a list of spaces represented as dictionaries. If you want to + avoid enumerating all solutions explicitly as one list, you can use + `trappist_async` which has a similar API but can yield solutions one by one. + + Finally, recall that the supplied network must have its names sanitized (see + `petri_net_translation` module). """ if ensure_subspace is None: ensure_subspace = {} @@ -159,24 +164,29 @@ def _create_clingo_constraints( optimize_source_variables: list[str] | None = None, ) -> Control: """ - Translate the given Petri net (represented as a `DiGraph`; see also `petri_net_translation` - module for details) into a logic program that solves the given problem type. This logic - program is then added to the `Control` object provided by `clingo`. - - - The `problem` arugment specifies one of the three problem types: `min` (minimum trap spaces), - `max` (maximum trap spaces) and `fix` (fixed-points). - - If `reverse_time` is true, the problem is solved for a time-reversed problem. - - Argument `ensure_subspace` is a space in which all results must be included. - - Argument `avoid_subspaces` is a list of spaces that must be avoided by all solutions. - - Argument `optimize_source_variables` designates variables for which a `*` solution should be - disregarded when computing maximum trap spaces. - - Finally, note that when `ensure_subspace` or `avoid_subspaces` is included, the result is - maximal/minimal within the resulting space of solutions, not globally. For example, if specify - some `ensure_subspace` and `problem=max`, then the result is maximal *within* that subspace, - not globally. Furthermore, the result can still be a *superspace* of the `avoid_subspaces` - argument. For example, if you specify that you want to avoid a particular fixed-point, - a globally non-minimal trap space that contains this fixed-point can be still included. + Translate the given Petri net (represented as a `DiGraph`; see also + `petri_net_translation` module for details) into a logic program that solves + the given problem type. This logic program is then added to the `Control` + object provided by `clingo`. + + - The `problem` arugment specifies one of the three problem types: `min` + (minimum trap spaces), + `max` (maximum trap spaces) and `fix` (fixed-points). - If `reverse_time` + is true, the problem is solved for a time-reversed problem. - Argument + `ensure_subspace` is a space in which all results must be included. - + Argument `avoid_subspaces` is a list of spaces that must be avoided by all + solutions. - Argument `optimize_source_variables` designates variables for + which a `*` solution should be disregarded when computing maximum trap + spaces. + + Finally, note that when `ensure_subspace` or `avoid_subspaces` is included, + the result is maximal/minimal within the resulting space of solutions, not + globally. For example, if specify some `ensure_subspace` and `problem=max`, + then the result is maximal *within* that subspace, not globally. + Furthermore, the result can still be a *superspace* of the + `avoid_subspaces` argument. For example, if you specify that you want to + avoid a particular fixed-point, a globally non-minimal trap space that + contains this fixed-point can be still included. """ if ensure_subspace is None: ensure_subspace = {} @@ -284,8 +294,9 @@ def _clingo_model_to_fixed_point(model: Model) -> dict[str, int]: # but just in case. assert variable not in space - # Note that this is opposite to the case of trap spaces. If "positive" symbol - # appears in the solution, we fix the value to "1". Otherwise, we fix the value to "0". + # Note that this is opposite to the case of trap spaces. If "positive" + # symbol appears in the solution, we fix the value to "1". Otherwise, we + # fix the value to "0". space[variable] = 1 if is_positive else 0 @@ -420,14 +431,14 @@ def compute_fixed_point_reduced_STG( solution_limit: int | None = None, ) -> list[dict[str, int]]: """ - This method computes the fixed points of the given Petri-net-encoded Boolean network. - This makes it possible to modify the Petri net instead of re-encoding the BN repeatedly - for multiple subsequnet queries. - - The arguments `ensure_subspace`, `avoid_subspaces`, and `solution_limit` work exactly - the same way as in the `trappist` method. Meanwhile, the `retained_set` argument is - applied as a restriction on the transitions of the Petri net, forcing given variables - to retain the specified values. + This method computes the fixed points of the given Petri-net-encoded Boolean + network. This makes it possible to modify the Petri net instead of + re-encoding the BN repeatedly for multiple subsequnet queries. + + The arguments `ensure_subspace`, `avoid_subspaces`, and `solution_limit` + work exactly the same way as in the `trappist` method. Meanwhile, the + `retained_set` argument is applied as a restriction on the transitions of + the Petri net, forcing given variables to retain the specified values. """ results: list[dict[str, int]] = [] diff --git a/pyproject.toml b/pyproject.toml index 39d85861..a2afde82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,8 +4,8 @@ dependencies = [ 'biodivine_aeon ==0.2.0a4', 'clingo ==5.6.2', 'networkx==2.8.8', - 'pyeda==0.28.0', # requires that the python-dev package for your python version is also installed, e.g. python3.11-dev - 'pypint==1.6.2' + 'pyeda==0.28.0', # requires that the python-dev package for your python version is also installed, e.g. python3.11-dev + 'pypint==1.6.2', ] dynamic = ["version"] @@ -30,4 +30,8 @@ multi_line_output = 3 include_trailing_comma = true force_grid_wrap = 0 line_length = 88 -profile = "black" \ No newline at end of file +profile = "black" + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "--networksize=10" diff --git a/stubs/biodivine_aeon/__init__.pyi b/stubs/biodivine_aeon/__init__.pyi index 1462fbb8..9610ba50 100644 --- a/stubs/biodivine_aeon/__init__.pyi +++ b/stubs/biodivine_aeon/__init__.pyi @@ -1587,4 +1587,5 @@ class SymbolicAsyncGraph: TODO """ + def __init__(self, bn: BooleanNetwork) -> None: ... ... diff --git a/tests/clingo_test.py b/tests/clingo_test.py index 99da098d..3575ba99 100644 --- a/tests/clingo_test.py +++ b/tests/clingo_test.py @@ -1,17 +1,18 @@ import subprocess + def test_clingo(): """ - This just verifies that we have clingo installed in a way that - Trappist needs. + This just verifies that we have clingo installed in a way that + Trappist needs. """ result = subprocess.run( [ "clingo", - "--version", + "--version", ], capture_output=False, text=True, ) - assert result.returncode == 0 \ No newline at end of file + assert result.returncode == 0 diff --git a/tests/control_test.py b/tests/control_test.py new file mode 100644 index 00000000..9ffb7f62 --- /dev/null +++ b/tests/control_test.py @@ -0,0 +1,316 @@ +from biodivine_aeon import BooleanNetwork + +from nfvsmotifs.control import ( + Intervention, + controls_are_equal, + drivers_of_succession, + succession_control, + successions_to_target, +) +from nfvsmotifs.SuccessionDiagram import SuccessionDiagram + + +def test_intervention_equality_and_equivalence(): + i1 = Intervention([[{"Y": 0}, {"X": 0}]], "internal", [{}]) + i2 = Intervention([[{"X": 0}, {"Y": 0}]], "internal", [{}]) + assert i1 == i2 + + i1 = Intervention([[{"Y": 0}, {"X": 0}]], "internal", [{"A": 0}]) + i2 = Intervention([[{"X": 0}, {"Y": 0}]], "internal", [{}]) + assert i1 != i2 + assert i1.is_equivalent(i2) + + i1 = Intervention([[{"Y": 0}, {"X": 0}]], "all", [{}]) + i2 = Intervention([[{"X": 0}, {"Y": 0}]], "all", [{}]) + assert i1 == i2 + + i1 = Intervention([[{"Y": 0}, {"X": 0}]], "all", [{"A": 0}]) + i2 = Intervention([[{"X": 0}, {"Y": 0}]], "all", [{}]) + assert i1 != i2 + assert not i1.is_equivalent(i2) + + +def test_basic_succession_control(): + bn = BooleanNetwork.from_bnet( + """ + S, S + A, S | B + B, A + C, A | D + D, C + E, false + """ + ) + target_succession = [ + {"S": 0, "E": 0}, + {"S": 0, "E": 0, "A": 0, "B": 0}, + {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1}, + ] + + cs = [[{"S": 0}], [{"A": 0}, {"B": 0}], [{"C": 1}, {"D": 1}]] + + drivers = drivers_of_succession(bn, target_succession) + assert all([controls_are_equal(a, b) for a, b in zip(cs, drivers)]) + + target_succession = [ + {"S": 0, "E": 0}, + {"S": 0, "E": 0, "C": 1, "D": 1}, + {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1}, + ] + + cs = [[{"S": 0}], [{"C": 1}, {"D": 1}], [{"A": 0}, {"B": 0}]] + + drivers = drivers_of_succession(bn, target_succession) + assert all([controls_are_equal(a, b) for a, b in zip(cs, drivers)]) + + target_succession = [ + {"E": 1}, + {"S": 0, "E": 1}, + {"S": 0, "E": 1, "C": 1, "D": 1}, + {"S": 0, "E": 1, "A": 0, "B": 0, "C": 1, "D": 1}, + ] + + cs = [[{"E": 1}], [{"S": 0}], [{"C": 1}, {"D": 1}], [{"A": 0}, {"B": 0}]] + + drivers = drivers_of_succession(bn, target_succession) + assert all([controls_are_equal(a, b) for a, b in zip(cs, drivers)]) + + +def test_basic_succession_finding(): + bn = BooleanNetwork.from_bnet( + """ + S, S + A, S | B + B, A + C, A | D + D, C + E, false + """ + ) + target_successions = [ + [ + {"S": 0, "E": 0}, + {"S": 0, "E": 0, "A": 0, "B": 0}, + {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1}, + ], + [ + {"S": 0, "E": 0}, + {"S": 0, "E": 0, "C": 1, "D": 1}, + {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1}, + ], + ] + target = {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1} + succession_diagram = SuccessionDiagram(bn) + + successions = successions_to_target(succession_diagram, target) + + successions_hashed = set( + tuple(frozenset(ts.items()) for ts in succession) for succession in successions + ) + targets_hashed = set( + tuple(frozenset(ts.items()) for ts in succession) + for succession in target_successions + ) + + assert targets_hashed == successions_hashed + + +def test_internal_succession_control(): + bn = BooleanNetwork.from_bnet( + """ + S, S + A, S | B + B, A + C, A | D + D, C + E, false + """ + ) + target = {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1} + + true_controls = [ + [[{"S": 0}], [{"A": 0}, {"B": 0}], [{"C": 1}, {"D": 1}]], + [[{"S": 0}], [{"C": 1}, {"D": 1}], [{"A": 0}, {"B": 0}]], + ] + + true_successions = [ + [ + {"S": 0, "E": 0}, + {"S": 0, "E": 0, "A": 0, "B": 0}, + {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1}, + ], + [ + {"S": 0, "E": 0}, + {"S": 0, "E": 0, "C": 1, "D": 1}, + {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1}, + ], + ] + + true_interventions = [ + Intervention(c, "internal", s) for c, s in zip(true_controls, true_successions) + ] + + interventions = succession_control(bn, target) + + assert len(interventions) == len(true_interventions) + for intervention in interventions: + assert intervention in true_interventions + + +def test_all_succession_control(): + bn = BooleanNetwork.from_bnet( + """ + S, S + A, S | B + B, A + C, A | D + D, C + E, false + """ + ) + target = {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1} + + true_controls = [ + [[{"S": 0}], [{"A": 0}, {"B": 0}], [{"C": 1}, {"D": 1}]], + [[{"S": 0}], [{"A": 1}, {"B": 1}, {"C": 1}, {"D": 1}], [{"A": 0}, {"B": 0}]], + ] + + true_successions = [ + [ + {"S": 0, "E": 0}, + {"S": 0, "E": 0, "A": 0, "B": 0}, + {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1}, + ], + [ + {"S": 0, "E": 0}, + {"S": 0, "E": 0, "C": 1, "D": 1}, + {"S": 0, "E": 0, "A": 0, "B": 0, "C": 1, "D": 1}, + ], + ] + + true_interventions = [ + Intervention(c, "all", s) for c, s in zip(true_controls, true_successions) + ] + + interventions = succession_control(bn, target, strategy="all") + + assert len(interventions) == len(true_interventions) + for intervention in interventions: + assert intervention in true_interventions + + +def test_forbidden_drivers(): + bn = BooleanNetwork.from_bnet( + """ + A, B & C + B, A & C + C, A & B + """ + ) + target = {"A": 1, "B": 1, "C": 1} + + # Test with no forbidden drivers first + true_controls = [[[{"A": 1, "B": 1}, {"A": 1, "C": 1}, {"B": 1, "C": 1}]]] + true_successions = [[{"A": 1, "B": 1, "C": 1}]] + + true_interventions = [ + Intervention(c, "internal", s) for c, s in zip(true_controls, true_successions) + ] + + interventions = succession_control(bn, target) + + assert len(interventions) == len(true_interventions) + for intervention in interventions: + assert intervention in true_interventions + assert intervention.successful + + # Test with forbidden driver; case with solution + forbidden_drivers = set("A") + + true_controls = [[[{"B": 1, "C": 1}]]] + true_successions = [[{"A": 1, "B": 1, "C": 1}]] + + true_interventions = [ + Intervention(c, "internal", s) for c, s in zip(true_controls, true_successions) + ] + + interventions = succession_control(bn, target, forbidden_drivers=forbidden_drivers) + + assert len(interventions) == len(true_interventions) + for intervention in interventions: + assert intervention in true_interventions + assert intervention.successful + + # Test with forbidden driver; case without solution + forbidden_drivers: set[str] = set(("A", "B")) # type: ignore + + true_controls = [[[]]] # type: ignore + true_successions = [[{"A": 1, "B": 1, "C": 1}]] + + true_interventions = [ + Intervention(c, "internal", s) for c, s in zip(true_controls, true_successions) # type: ignore + ] + + # do not show failed solution (default) + interventions = succession_control(bn, target, forbidden_drivers=forbidden_drivers) + assert len(interventions) == 0 + + # show failed solution + interventions = succession_control( + bn, target, forbidden_drivers=forbidden_drivers, successful_only=False + ) + + assert len(interventions) == len(true_interventions) + for intervention in interventions: + assert intervention in true_interventions + assert not intervention.successful + + +def test_size_restriction(): + bn = BooleanNetwork.from_bnet( + """ + A, B & C + B, A & C + C, A & B + """ + ) + target = {"A": 1, "B": 1, "C": 1} + + # Test with no restrictions + true_controls = [[[{"A": 1, "B": 1}, {"A": 1, "C": 1}, {"B": 1, "C": 1}]]] + true_successions = [[{"A": 1, "B": 1, "C": 1}]] + + true_interventions = [ + Intervention(c, "internal", s) for c, s in zip(true_controls, true_successions) + ] + + interventions = succession_control(bn, target) + + assert len(interventions) == len(true_interventions) + for intervention in interventions: + assert intervention in true_interventions + assert intervention.successful + + # Test with size restriction; no solution exists + true_controls = [[[]]] # type: ignore + true_successions = [[{"A": 1, "B": 1, "C": 1}]] + + true_interventions = [ + Intervention(c, "internal", s) for c, s in zip(true_controls, true_successions) # type: ignore + ] + + # show the failed solution + interventions = succession_control( + bn, target, max_drivers_per_succession_node=1, successful_only=False + ) + + assert len(interventions) == len(true_interventions) + for intervention in interventions: + assert intervention in true_interventions + assert not intervention.successful + + # do not show the failed solution (default) + interventions = succession_control( + bn, target, max_drivers_per_succession_node=1, successful_only=True + ) + assert len(interventions) == 0 diff --git a/tests/drivers_test.py b/tests/drivers_test.py index 28a532aa..48d9fbd4 100644 --- a/tests/drivers_test.py +++ b/tests/drivers_test.py @@ -1,41 +1,67 @@ -from nfvsmotifs.drivers import * +from nfvsmotifs.drivers import ( + BooleanNetwork, + find_single_drivers, + find_single_node_LDOIs, +) + def test_find_single_node_LDOIs(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ S, S A, S | B B, A C, A | D D, C E, false - """) + """ + ) LDOIs = find_single_node_LDOIs(bn) - assert LDOIs[('S', 0)] == {'S': 0} - assert LDOIs[('S', 1)] == {'S': 1, 'A': 1, 'B': 1, 'C': 1, 'D': 1} - assert LDOIs[('A', 0)] == {'B': 0} - assert LDOIs[('A', 1)] == {'A': 1, 'B': 1, 'C': 1, 'D': 1} - assert LDOIs[('B', 0)] == {} - assert LDOIs[('B', 1)] == {'B': 1, 'A': 1, 'C': 1, 'D': 1} - assert LDOIs[('C', 0)] == {'D': 0} - assert LDOIs[('C', 1)] == {'C': 1, 'D': 1} - assert LDOIs[('D', 0)] == {} - assert LDOIs[('D', 1)] == {'D': 1, 'C': 1} + assert LDOIs[("S", 0)] == {"S": 0} + assert LDOIs[("S", 1)] == {"S": 1, "A": 1, "B": 1, "C": 1, "D": 1} + assert LDOIs[("A", 0)] == {"B": 0} + assert LDOIs[("A", 1)] == {"A": 1, "B": 1, "C": 1, "D": 1} + assert LDOIs[("B", 0)] == {} + assert LDOIs[("B", 1)] == {"B": 1, "A": 1, "C": 1, "D": 1} + assert LDOIs[("C", 0)] == {"D": 0} + assert LDOIs[("C", 1)] == {"C": 1, "D": 1} + assert LDOIs[("D", 0)] == {} + assert LDOIs[("D", 1)] == {"D": 1, "C": 1} + def test_find_single_drivers(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ S, S A, S | B B, A C, A | D D, C E, true - """) + """ + ) LDOIs = find_single_node_LDOIs(bn) - assert find_single_drivers({'A':0, 'B':0}, bn) == {('A', 0)} - assert find_single_drivers({'A':0, 'B':0}, bn, LDOIs) == {('A', 0)} - assert find_single_drivers({'A':1, 'B':1}, bn) == {('A', 1), ('B', 1), ('S', 1)} - assert find_single_drivers({'A':1, 'B':1}, bn, LDOIs) == {('A', 1), ('B', 1), ('S', 1)} - assert find_single_drivers({'C':0, 'D':0}, bn) == {('C', 0)} - assert find_single_drivers({'C':0, 'D':0}, bn, LDOIs) == {('C', 0)} - assert find_single_drivers({'C':1, 'D':1}, bn) == {('A', 1), ('B', 1), ('C', 1), ('D', 1), ('S', 1)} - assert find_single_drivers({'C':1, 'D':1}, bn, LDOIs) == {('A', 1), ('B', 1), ('C', 1), ('D', 1), ('S', 1)} + assert find_single_drivers({"A": 0, "B": 0}, bn) == {("A", 0)} + assert find_single_drivers({"A": 0, "B": 0}, bn, LDOIs) == {("A", 0)} + assert find_single_drivers({"A": 1, "B": 1}, bn) == {("A", 1), ("B", 1), ("S", 1)} + assert find_single_drivers({"A": 1, "B": 1}, bn, LDOIs) == { + ("A", 1), + ("B", 1), + ("S", 1), + } + assert find_single_drivers({"C": 0, "D": 0}, bn) == {("C", 0)} + assert find_single_drivers({"C": 0, "D": 0}, bn, LDOIs) == {("C", 0)} + assert find_single_drivers({"C": 1, "D": 1}, bn) == { + ("A", 1), + ("B", 1), + ("C", 1), + ("D", 1), + ("S", 1), + } + assert find_single_drivers({"C": 1, "D": 1}, bn, LDOIs) == { + ("A", 1), + ("B", 1), + ("C", 1), + ("D", 1), + ("S", 1), + } diff --git a/tests/interaction_graph_utils_test.py b/tests/interaction_graph_utils_test.py index 78157c3c..c0aeb951 100644 --- a/tests/interaction_graph_utils_test.py +++ b/tests/interaction_graph_utils_test.py @@ -1,33 +1,43 @@ -import os -from biodivine_aeon import BooleanNetwork # type:ignore -from networkx import DiGraph # type:ignore -from nfvsmotifs.interaction_graph_utils import infer_signed_interaction_graph, feedback_vertex_set, independent_cycles, find_minimum_NFVS +from biodivine_aeon import BooleanNetwork # type:ignore +from networkx import DiGraph # type:ignore + +from nfvsmotifs.interaction_graph_utils import ( + feedback_vertex_set, + find_minimum_NFVS, + independent_cycles, + infer_signed_interaction_graph, +) + def test_ig_inference(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ # Just a normal function. b, a | !b # Contradiciton on `a` - the regulation should not appear in the result # Also, non-monotonic dependence on b and c. a, (a & !a) | (b <=> c) c, c - """) + """ + ) ig = infer_signed_interaction_graph(bn) - edges = { edge:ig.get_edge_data(edge[0], edge[1])['sign'] for edge in ig.edges } - assert len(edges) == 5 - assert edges[('a', 'b')] == "+" - assert edges[('b', 'b')] == "-" - assert edges[('b', 'a')] == "?" - assert edges[('c', 'a')] == "?" - assert edges[('c', 'c')] == "+" - assert ('a', 'a') not in edges + edges = {edge: ig.get_edge_data(edge[0], edge[1])["sign"] for edge in ig.edges} # type: ignore + assert len(edges) == 5 # type: ignore + assert edges[("a", "b")] == "+" + assert edges[("b", "b")] == "-" + assert edges[("b", "a")] == "?" + assert edges[("c", "a")] == "?" + assert edges[("c", "c")] == "+" + assert ("a", "a") not in edges -# There should be a negative cycle between b_1 and b_2, + +# There should be a negative cycle between b_1 and b_2, # a positive cycle between d_1 and d_2, and a negative cycle # between d_1, d_2, and d_3. Other nodes are not on cycles # except for e, which has a positive self-loop. -CYCLES_BN = BooleanNetwork.from_aeon(""" +CYCLES_BN = BooleanNetwork.from_aeon( + """ a -> c b_1 -> b_2 b_2 -| b_1 @@ -39,28 +49,32 @@ def test_ig_inference(): d_2 -> d_1 d_1 -> d_2 e -> e - """) + """ +) CYCLES_DIGRAPH = DiGraph() -CYCLES_DIGRAPH.add_nodes_from(["a", "b_1", "b_2", "c", "d_1", "d_2", "d_3", "e"]) -CYCLES_DIGRAPH.add_edges_from([ - ("a", "c", {'sign': '+'}), - ("b_1", "b_2", {'sign': '+'}), - ("b_2", "b_1", {'sign': '-'}), - ("b_2", "c", {'sign': '+'}), - ("c", "d_2", {'sign': '+'}), - ("c", "e", {'sign': '+'}), - ("d_1", "d_3", {'sign': '+'}), - ("d_3", "d_2", {'sign': '-'}), - ("d_2", "d_1", {'sign': '+'}), - ("d_1", "d_2", {'sign': '+'}), - ("e", "e", {'sign': '+'}), -]) +CYCLES_DIGRAPH.add_nodes_from(["a", "b_1", "b_2", "c", "d_1", "d_2", "d_3", "e"]) # type: ignore +CYCLES_DIGRAPH.add_edges_from( # type: ignore + [ + ("a", "c", {"sign": "+"}), + ("b_1", "b_2", {"sign": "+"}), + ("b_2", "b_1", {"sign": "-"}), + ("b_2", "c", {"sign": "+"}), + ("c", "d_2", {"sign": "+"}), + ("c", "e", {"sign": "+"}), + ("d_1", "d_3", {"sign": "+"}), + ("d_3", "d_2", {"sign": "-"}), + ("d_2", "d_1", {"sign": "+"}), + ("d_1", "d_2", {"sign": "+"}), + ("e", "e", {"sign": "+"}), + ] +) + def test_fvs(): fvs = feedback_vertex_set(CYCLES_BN) - nfvs = feedback_vertex_set(CYCLES_BN, parity='negative') - pfvs = feedback_vertex_set(CYCLES_BN, parity='positive') + nfvs = feedback_vertex_set(CYCLES_BN, parity="negative") + pfvs = feedback_vertex_set(CYCLES_BN, parity="positive") assert len(fvs) == 3 assert len(nfvs) == 2 @@ -81,29 +95,34 @@ def test_fvs(): assert ("b_1" in fvs) or ("b_2" in fvs) assert ("b_1" in nfvs) or ("b_2" in nfvs) assert ("b_1" not in pfvs) and ("b_2" not in pfvs) - + # With "d_*", its a bit more complicated: # "d_1" or "d_2" must be in fvs and also pfvs, but in nfvs, "d_3" # is also sufficient as the "d_1 --- d_2" cycle is positive. assert ("d_1" in fvs) or ("d_2" in fvs) assert ("d_1" in nfvs) or ("d_2" in nfvs) or ("d_3" in nfvs) assert ("d_1" in pfvs) or ("d_2" in pfvs) - + # Check that the `DiGraph` results are the same as `BooleanNetwork` results. dg_fvs = feedback_vertex_set(CYCLES_DIGRAPH) - dg_nfvs = feedback_vertex_set(CYCLES_DIGRAPH, parity='negative') - dg_pfvs = feedback_vertex_set(CYCLES_DIGRAPH, parity='positive') + dg_nfvs = feedback_vertex_set(CYCLES_DIGRAPH, parity="negative") + dg_pfvs = feedback_vertex_set(CYCLES_DIGRAPH, parity="positive") assert fvs == dg_fvs assert nfvs == dg_nfvs assert pfvs == dg_pfvs + def test_subgraph_fvs(): - # We only keep the two cycles consisting of "d_*". The "b_*" cycle + # We only keep the two cycles consisting of "d_*". The "b_*" cycle # and "e" self-loop are not considered. fvs = feedback_vertex_set(CYCLES_BN, subgraph=["a", "b_1", "d_1", "d_2", "d_3"]) - pfvs = feedback_vertex_set(CYCLES_BN, parity='positive', subgraph=["a", "b_1", "d_1", "d_2", "d_3"]) - nfvs = feedback_vertex_set(CYCLES_BN, parity='negative', subgraph=["a", "b_1", "d_1", "d_2", "d_3"]) + pfvs = feedback_vertex_set( + CYCLES_BN, parity="positive", subgraph=["a", "b_1", "d_1", "d_2", "d_3"] + ) + nfvs = feedback_vertex_set( + CYCLES_BN, parity="negative", subgraph=["a", "b_1", "d_1", "d_2", "d_3"] + ) assert len(fvs) == 1 assert len(pfvs) == 1 @@ -112,15 +131,16 @@ def test_subgraph_fvs(): assert ("d_1" in nfvs) or ("d_2" in nfvs) or ("d_3" in nfvs) assert ("d_1" in pfvs) or ("d_2" in pfvs) + def test_ic(): ic = independent_cycles(CYCLES_BN) - n_ic = independent_cycles(CYCLES_BN, parity='negative') - p_ic = independent_cycles(CYCLES_BN, parity='positive') + n_ic = independent_cycles(CYCLES_BN, parity="negative") + p_ic = independent_cycles(CYCLES_BN, parity="positive") assert len(ic) == 3 assert len(n_ic) == 2 assert len(p_ic) == 2 - + # e is the shortes positive (and overall) cycle, so should be first assert ic[0] == ["e"] assert p_ic[0] == ["e"] @@ -139,12 +159,12 @@ def test_ic(): assert set(ic[1]) == set(["d_1", "d_2"]) or set(ic[2]) == set(["d_1", "d_2"]) # Check that the `DiGraph` results are the same as `BooleanNetwork` results. - # Note that these are not necessarily entirely equivalent, as the DiGraph - # seems to store the nodes/edges in a hashmap, resulting in + # Note that these are not necessarily entirely equivalent, as the DiGraph + # seems to store the nodes/edges in a hashmap, resulting in # not-quite-deterministic ordering and possibly different results (I think?). - dg_ic = independent_cycles(CYCLES_DIGRAPH) - dg_n_ic = independent_cycles(CYCLES_DIGRAPH, parity='negative') - dg_p_ic = independent_cycles(CYCLES_DIGRAPH, parity='positive') + dg_ic = independent_cycles(CYCLES_DIGRAPH) # type: ignore + dg_n_ic = independent_cycles(CYCLES_DIGRAPH, parity="negative") # type: ignore + dg_p_ic = independent_cycles(CYCLES_DIGRAPH, parity="positive") # type: ignore print(ic) print(dg_ic) @@ -155,11 +175,15 @@ def test_ic(): def test_subgraph_ic(): - # We only keep the two cycles consisting of "d_*". The "b_*" cycle + # We only keep the two cycles consisting of "d_*". The "b_*" cycle # and "e" self-loop are not considered. ic = independent_cycles(CYCLES_BN, subgraph=["a", "b_1", "d_1", "d_2", "d_3"]) - p_ic = independent_cycles(CYCLES_BN, parity='positive', subgraph=["a", "b_1", "d_1", "d_2", "d_3"]) - n_ic = independent_cycles(CYCLES_BN, parity='negative', subgraph=["a", "b_1", "d_1", "d_2", "d_3"]) + p_ic = independent_cycles( + CYCLES_BN, parity="positive", subgraph=["a", "b_1", "d_1", "d_2", "d_3"] + ) + n_ic = independent_cycles( + CYCLES_BN, parity="negative", subgraph=["a", "b_1", "d_1", "d_2", "d_3"] + ) assert len(ic) == 1 assert len(p_ic) == 1 @@ -174,7 +198,8 @@ def test_fvs_accuracy_CASCADE3(): Compare results of AEON and mtsNFVS on computing an negative feedback vertex set of the CASCADE3 model . Note that the result of mtsNFVS is not deterministic. """ - bn_real = BooleanNetwork.from_bnet(""" + bn_real = BooleanNetwork.from_bnet( + """ ABL1, (ATM & !RB1) ACVR1, BMPR2 ADAM17, ERK_f @@ -351,23 +376,25 @@ def test_fvs_accuracy_CASCADE3(): YAP_TAZ, (!BTRC & (!CSNK1_f & !LATS_f)) mTORC1_c, (!AKT1S1 & ((!RHEB & RSK_f) | RHEB)) mTORC2_c, ((!PIK3CA & (!S6K_f & TSC_f)) | (PIK3CA & !S6K_f)) - """) + """ + ) nfvs_mtsNFVS = find_minimum_NFVS(bn_real) - assert len(nfvs_mtsNFVS) <= 19 # the result of mtsNFVS is 19 + assert len(nfvs_mtsNFVS) <= 19 # the result of mtsNFVS is 19 for _i in range(10): nfvs = find_minimum_NFVS(bn_real) assert nfvs == nfvs_mtsNFVS - + def test_fvs_accuracy_SIPC(): """ Compare results of AEON and mtsNFVS on computing an negative feedback vertex set of the SIPC model . Note that the result of mtsNFVS is not deterministic. """ - bn_real = BooleanNetwork.from_bnet(""" + bn_real = BooleanNetwork.from_bnet( + """ AKT, ((!HSPs&(PIP3&!PTCH1))|(HSPs&!PTCH1)) AMPK, ((!AMP_ATP&(!ATM&(!ATR&(!EGFR&(!FGFR3&HIF1)))))|((!AMP_ATP&(!ATM&(ATR&(!EGFR&!FGFR3))))|((!AMP_ATP&(ATM&(!EGFR&!FGFR3)))|(AMP_ATP&(!EGFR&!FGFR3))))) AMP_ATP, !Nutrients @@ -484,13 +511,13 @@ def test_fvs_accuracy_SIPC(): p53, ((!Acidosis&(!BCL2&(!CHK1_2&(!HIF1&(!HSPs&(!MDM2&(!p14ARF&p38)))))))|((!Acidosis&(!BCL2&(!CHK1_2&(!HIF1&(!HSPs&(!MDM2&p14ARF))))))|((!Acidosis&(!BCL2&(!CHK1_2&(HIF1&(!HSPs&!MDM2)))))|((!Acidosis&(!BCL2&(CHK1_2&(!HSPs&!MDM2))))|(Acidosis&(!BCL2&(!HSPs&!MDM2))))))) p70S6kab, ((!mTORC2&PDK1)|mTORC2) p90RSK, ((!ERK&PDK1)|ERK) - """) + """ + ) nfvs_mtsNFVS = find_minimum_NFVS(bn_real) - assert len(nfvs_mtsNFVS) <= 13 # the result of mtsNFVS is 13 - + assert len(nfvs_mtsNFVS) <= 13 # the result of mtsNFVS is 13 + for _i in range(10): nfvs = find_minimum_NFVS(bn_real) assert nfvs == nfvs_mtsNFVS - diff --git a/tests/motif_avoidant_test.py b/tests/motif_avoidant_test.py index 7dc96699..82bb5bf6 100644 --- a/tests/motif_avoidant_test.py +++ b/tests/motif_avoidant_test.py @@ -1,18 +1,24 @@ -from biodivine_aeon import BooleanNetwork # type:ignore -from nfvsmotifs.motif_avoidant import _preprocess_candidates, _filter_candidates, _Pint_reachability -from nfvsmotifs.state_utils import state_list_to_bdd +from biodivine_aeon import BooleanNetwork # type:ignore + +from nfvsmotifs.motif_avoidant import _filter_candidates # type: ignore +from nfvsmotifs.motif_avoidant import _Pint_reachability # type: ignore +from nfvsmotifs.motif_avoidant import _preprocess_candidates # type: ignore from nfvsmotifs.petri_net_translation import network_to_petrinet +from nfvsmotifs.state_utils import state_list_to_bdd + def test_preprocessing_ssf_not_optimal(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ x1, (x1 & x2) | (!x1 & !x2) x2, (x1 & x2) | (!x1 & !x2) - """) - - s0 = {'x1': 0, 'x2': 0} - s1 = {'x1': 0, 'x2': 1} - s2 = {'x1': 1, 'x2': 0} - s3 = {'x1': 1, 'x2': 1} + """ + ) + + s0 = {"x1": 0, "x2": 0} + s1 = {"x1": 0, "x2": 1} + s2 = {"x1": 1, "x2": 0} + # s3 = {"x1": 1, "x2": 1} """ This BN has one minimal trap space: 11. @@ -29,32 +35,38 @@ def test_preprocessing_ssf_not_optimal(): If b_1 = 1 and b_2 = 1, then F = [01, 10]. """ - # F = {00} - F = [s0] - F = _preprocess_candidates(bn, F, terminal_restriction_space, 1000) - assert len(F) == 1 + # candidates_F = {00} + candidates_F = [s0] + candidates_F = _preprocess_candidates( + bn, candidates_F, terminal_restriction_space, 1000 + ) + assert len(candidates_F) == 1 + + # candidates_F = {01, 10} + candidates_F = [s1, s2] + candidates_F = _preprocess_candidates( + bn, candidates_F, terminal_restriction_space, 1000 + ) + assert len(candidates_F) == 1 - # F = {01, 10} - F = [s1, s2] - F = _preprocess_candidates(bn, F, terminal_restriction_space, 1000) - assert len(F) == 1 - def test_preprocessing_ssf_optimal(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ A, !B B, !A C, A | B - """) - - s0 = {'A': 0, 'B': 0, 'C': 0} - s1 = {'A': 0, 'B': 0, 'C': 1} - s2 = {'A': 0, 'B': 1, 'C': 0} - s3 = {'A': 0, 'B': 1, 'C': 1} - s4 = {'A': 1, 'B': 0, 'C': 0} - s5 = {'A': 1, 'B': 0, 'C': 1} - s6 = {'A': 1, 'B': 1, 'C': 0} - s7 = {'A': 1, 'B': 1, 'C': 1} + """ + ) + + s0 = {"A": 0, "B": 0, "C": 0} + # s1 = {"A": 0, "B": 0, "C": 1} + s2 = {"A": 0, "B": 1, "C": 0} + # s3 = {"A": 0, "B": 1, "C": 1} + s4 = {"A": 1, "B": 0, "C": 0} + # s5 = {"A": 1, "B": 0, "C": 1} + # s6 = {"A": 1, "B": 1, "C": 0} + s7 = {"A": 1, "B": 1, "C": 1} """ This BN has two minimal trap spaces: 101 + 011. @@ -70,58 +82,64 @@ def test_preprocessing_ssf_optimal(): Then F = [000]. """ - F = [s0] - F = _preprocess_candidates(bn, F, terminal_restriction_space, 1000) - assert len(F) == 0 + candidates_F = [s0] + candidates_F = _preprocess_candidates( + bn, candidates_F, terminal_restriction_space, 1000 + ) + assert len(candidates_F) == 0 def test_ABNReach_current_version(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ x1, (x1 & x2) | (!x1 & !x2) x2, (x1 & x2) | (!x1 & !x2) x3, x3 | !x3 - """) - - s0 = {'x1': 0, 'x2': 0, 'x3': 1} - s1 = {'x1': 0, 'x2': 1, 'x3': 1} - s2 = {'x1': 1, 'x2': 0, 'x3': 1} - s3 = {'x1': 1, 'x2': 1, 'x3': 1} + """ + ) + + s0 = {"x1": 0, "x2": 0, "x3": 1} + s1 = {"x1": 0, "x2": 1, "x3": 1} + # s2 = {"x1": 1, "x2": 0, "x3": 1} + s3 = {"x1": 1, "x2": 1, "x3": 1} petri_net = network_to_petrinet(bn) joint_target_set = state_list_to_bdd([s3]) is_reachable = _Pint_reachability(petri_net, s0, joint_target_set) - assert is_reachable == False # 00 does not reach 11, Pint cannot determinem but Mole can + assert ( + is_reachable is False + ) # 00 does not reach 11, Pint cannot determinem but Mole can joint_target_set = state_list_to_bdd([s0]) is_reachable = _Pint_reachability(petri_net, s3, joint_target_set) - assert is_reachable == False # 11 does not reach 00 + assert is_reachable is False # 11 does not reach 00 joint_target_set = state_list_to_bdd([s1]) is_reachable = _Pint_reachability(petri_net, s0, joint_target_set) - assert is_reachable == True # 00 reaches 01 + assert is_reachable is True # 00 reaches 01 joint_target_set = state_list_to_bdd([s1, s3]) is_reachable = _Pint_reachability(petri_net, s0, joint_target_set) - assert is_reachable == True # 00 reaches 01 + assert is_reachable is True # 00 reaches 01 def test_FilteringProcess(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ x1, (x1 & x2) | (!x1 & !x2) x2, (x1 & x2) | (!x1 & !x2) - """) - - s0 = {'x1': 0, 'x2': 0} - s1 = {'x1': 0, 'x2': 1} - s2 = {'x1': 1, 'x2': 0} - s3 = {'x1': 1, 'x2': 1} + """ + ) + + s0 = {"x1": 0, "x2": 0} + s1 = {"x1": 0, "x2": 1} + s2 = {"x1": 1, "x2": 0} + # s3 = {"x1": 1, "x2": 1} terminal_res_space = state_list_to_bdd([s0, s1, s2]) petri_net = network_to_petrinet(bn) - F = [s1, s2] # Candidate set after finishing preprocessing + F = [s1, s2] # Candidate set after finishing preprocessing list_motif_avoidant_atts = _filter_candidates(petri_net, F, terminal_res_space) - assert len(list_motif_avoidant_atts) == 1 # a motif-avoidant attractor {00, 01, 10} - - + assert len(list_motif_avoidant_atts) == 1 # a motif-avoidant attractor {00, 01, 10} diff --git a/tests/petri_net_translation_test.py b/tests/petri_net_translation_test.py index 3b1e78ad..e09cbcc2 100644 --- a/tests/petri_net_translation_test.py +++ b/tests/petri_net_translation_test.py @@ -1,7 +1,14 @@ -from nfvsmotifs.petri_net_translation import sanitize_network_names, network_to_petrinet, extract_variable_names -from biodivine_aeon import RegulatoryGraph, BooleanNetwork # type: ignore -from networkx import DiGraph, is_isomorphic # type: ignore +# type: ignore import pytest +from biodivine_aeon import BooleanNetwork, RegulatoryGraph # type: ignore +from networkx import DiGraph, is_isomorphic # type: ignore + +from nfvsmotifs.petri_net_translation import ( + extract_variable_names, + network_to_petrinet, + sanitize_network_names, +) + def test_sanitization(): rg = RegulatoryGraph([r"a_45[x]", r"b12{z}", "c[", "c]"]) @@ -22,22 +29,26 @@ def test_sanitization(): assert bn.get_variable_name(c1) == "c_" assert bn.get_variable_name(c2) == "c__id4" + def test_sanitization_failing(): rg = RegulatoryGraph(["x_", "x_id2", "x["]) bn = BooleanNetwork(rg) try: bn = sanitize_network_names(bn) pytest.fail("This network should not be sanitizeable.") - except: + except: # noqa: E722 pass + def test_translation(): - # A very very simple network for which we know how the + # A very very simple network for which we know how the # translation should look like. - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ A, !A & B B, !B & !A - """) + """ + ) expected = DiGraph() expected.add_node("b0_A", kind="place") @@ -56,7 +67,7 @@ def test_translation(): expected.add_node("tr_A_down_1", kind="transition") expected.add_edge("b1_A", "tr_A_down_1") expected.add_edge("tr_A_down_1", "b0_A") - + # B goes up if B=0 and A=0. expected.add_node("tr_B_up_1", kind="transition") expected.add_edge("b0_B", "tr_B_up_1") @@ -71,4 +82,4 @@ def test_translation(): pn = network_to_petrinet(bn) assert ["A", "B"] == extract_variable_names(pn) - assert is_isomorphic(pn, expected) \ No newline at end of file + assert is_isomorphic(pn, expected) diff --git a/tests/space_utils_test.py b/tests/space_utils_test.py index 19cc0899..d2f88678 100644 --- a/tests/space_utils_test.py +++ b/tests/space_utils_test.py @@ -1,104 +1,137 @@ -from biodivine_aeon import BooleanNetwork # type: ignore -from pyeda.inter import * # type: ignore +from typing import cast + +from biodivine_aeon import BooleanNetwork # type: ignore +from pyeda.boolalg.expr import Expression, expr + +from nfvsmotifs.space_utils import ( + expression_to_space_list, + is_subspace, + is_syntactic_trap_space, + percolate_network, + percolate_pyeda_expression, + percolate_space, + space_unique_key, +) -from nfvsmotifs.space_utils import * def test_is_subspace(): - assert is_subspace({'x': 0, 'y': 1}, {'x': 0}) - assert not is_subspace({'x': 1, 'y': 0}, {'x':0, 'y':0}) - assert is_subspace({'x': 0, 'y': 1}, {'x': 0}) - assert not is_subspace({'x': 1, 'y': 0}, {'x':0, 'y':0}) + assert is_subspace({"x": 0, "y": 1}, {"x": 0}) + assert not is_subspace({"x": 1, "y": 0}, {"x": 0, "y": 0}) + assert is_subspace({"x": 0, "y": 1}, {"x": 0}) + assert not is_subspace({"x": 1, "y": 0}, {"x": 0, "y": 0}) + def test_expression_percolation(): - e = expr("(a & ~x) | (a & y)") + e = cast(Expression, expr("(a & ~x) | (a & y)")) + + assert expr("a") == percolate_pyeda_expression(e, {"x": 0, "y": 1}) + assert expr(False) == percolate_pyeda_expression(e, {"a": 0}) + assert expr("a") == percolate_pyeda_expression(e, {"x": 0, "y": 1}) + assert expr(False) == percolate_pyeda_expression(e, {"a": 0}) - assert expr("a") == percolate_pyeda_expression(e, {'x': 0, 'y': 1}) - assert expr(False) == percolate_pyeda_expression(e, {'a': 0}) - assert expr("a") == percolate_pyeda_expression(e, {'x': 0, 'y': 1}) - assert expr(False) == percolate_pyeda_expression(e, {'a': 0}) def test_space_percolation(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ a, b b, c c, a - """) + """ + ) - assert {'a': 0, 'b': 0, 'c': 0} == percolate_space(bn, {'a': 0})[0] - assert {} == percolate_space(bn, {'a': 0})[1] - assert is_syntactic_trap_space(bn, {'a': 0, 'b': 0, 'c': 0}) - assert {'a': 1, 'b': 1, 'c': 1} == percolate_space(bn, {'a': 1})[0] - assert {} == percolate_space(bn, {'a': 1})[1] - assert is_syntactic_trap_space(bn, {'a': 1, 'b': 1, 'c': 1}) + assert {"a": 0, "b": 0, "c": 0} == percolate_space(bn, {"a": 0})[0] + assert {} == percolate_space(bn, {"a": 0})[1] + assert is_syntactic_trap_space(bn, {"a": 0, "b": 0, "c": 0}) + assert {"a": 1, "b": 1, "c": 1} == percolate_space(bn, {"a": 1})[0] + assert {} == percolate_space(bn, {"a": 1})[1] + assert is_syntactic_trap_space(bn, {"a": 1, "b": 1, "c": 1}) - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ a, b b, !c c, a - """) + """ + ) + + assert {"a": 0, "b": 0, "c": 0} == percolate_space( + bn, {"a": 0, "b": 0, "c": 0}, strict_percolation=False + )[0] + assert {"a": 0, "c": 0} == percolate_space( + bn, {"a": 0, "b": 0, "c": 0}, strict_percolation=True + )[0] - assert {'a': 0, 'b': 0, 'c': 0} == percolate_space(bn, {'a': 0, 'b': 0, 'c': 0},strict_percolation=False)[0] - assert {'a': 0, 'c': 0} == percolate_space(bn, {'a': 0, 'b': 0, 'c': 0},strict_percolation=True)[0] - # The conflict is on b. The rest is fine. - assert {'b': 1} == percolate_space(bn, {'a': 0, 'b': 0, 'c': 0})[1] - assert not is_syntactic_trap_space(bn, {'a': 0}) + assert {"b": 1} == percolate_space(bn, {"a": 0, "b": 0, "c": 0})[1] + assert not is_syntactic_trap_space(bn, {"a": 0}) assert is_syntactic_trap_space(bn, {}) - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ a, !b b, a c, a & c & d | b & !c | c & !d d, !a | d - """) - assert {'b': 1, 'c': 1} == percolate_space(bn, {'a': 1})[0] + """ + ) + assert {"b": 1, "c": 1} == percolate_space(bn, {"a": 1})[0] + def test_constant_percolation(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ a, true b, b c, a | b - """) + """ + ) - assert { 'a': 1, 'c': 1 } == percolate_space(bn, {}, strict_percolation=False)[0] - assert { 'a': 1 } == percolate_space(bn, {'a': 0}, strict_percolation=False)[1] + assert {"a": 1, "c": 1} == percolate_space(bn, {}, strict_percolation=False)[0] + assert {"a": 1} == percolate_space(bn, {"a": 0}, strict_percolation=False)[1] assert {} == percolate_space(bn, {}, strict_percolation=True)[0] - assert {} == percolate_space(bn, {'a': 0}, strict_percolation=True)[1] - + assert {} == percolate_space(bn, {"a": 0}, strict_percolation=True)[1] + + def test_network_percolation(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ a, c & b b, !a c, c - """) - - percolated_bn = percolate_network(bn, percolate_space(bn, {'c': 0})[0]) - percolated_bn = percolate_network(bn, percolate_space(bn, {'c': 0})[0]) + """ + ) + + percolated_bn = percolate_network(bn, percolate_space(bn, {"c": 0})[0]) + percolated_bn = percolate_network(bn, percolate_space(bn, {"c": 0})[0]) assert "false" == percolated_bn.get_update_function("c") assert "false" == percolated_bn.get_update_function("a") assert "true" == percolated_bn.get_update_function("b") - percolated_bn = percolate_network(bn, percolate_space(bn, {'c': 1})[0]) - percolated_bn = percolate_network(bn, percolate_space(bn, {'c': 1})[0]) + percolated_bn = percolate_network(bn, percolate_space(bn, {"c": 1})[0]) + percolated_bn = percolate_network(bn, percolate_space(bn, {"c": 1})[0]) assert "true" == percolated_bn.get_update_function("c") assert "b" == percolated_bn.get_update_function("a") assert "!a" == percolated_bn.get_update_function("b") + def test_expression_to_spaces(): - e = expr("(a & c) | (~d & (a | c)) | f") + e = cast(Expression, expr("(a & c) | (~d & (a | c)) | f")) spaces = expression_to_space_list(e) - assert {'f': 1} in spaces - assert {'a': 1, 'c': 1} in spaces - assert {'d': 0, 'a': 1} in spaces - assert {'d': 0, 'c': 1} in spaces - assert {'a': 1, 'c': 0} not in spaces + assert {"f": 1} in spaces + assert {"a": 1, "c": 1} in spaces + assert {"d": 0, "a": 1} in spaces + assert {"d": 0, "c": 1} in spaces + assert {"a": 1, "c": 0} not in spaces + def test_space_unique_key(): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ a, c & b b, !a c, c - """) + """ + ) assert space_unique_key({"a": 1}, bn) == space_unique_key({"a": 1}, bn) - assert space_unique_key({"a": 1}, bn) != space_unique_key({"b": 1}, bn) \ No newline at end of file + assert space_unique_key({"a": 1}, bn) != space_unique_key({"b": 1}, bn) diff --git a/tests/succession_diagram_test.py b/tests/succession_diagram_test.py index abe6c748..8601e204 100644 --- a/tests/succession_diagram_test.py +++ b/tests/succession_diagram_test.py @@ -1,79 +1,113 @@ -from nfvsmotifs.SuccessionDiagram import SuccessionDiagram -from biodivine_aeon import BooleanNetwork, SymbolicAsyncGraph, find_attractors # type: ignore import sys -import nfvsmotifs import unittest +from biodivine_aeon import find_attractors # type: ignore +from biodivine_aeon import BooleanNetwork, SymbolicAsyncGraph # type: ignore + +import nfvsmotifs +import nfvsmotifs.SuccessionDiagram +from nfvsmotifs.SuccessionDiagram import SuccessionDiagram + # This just ensures that the debug outputs are a part of the test output. nfvsmotifs.SuccessionDiagram.DEBUG = True -class SDTest(unittest.TestCase): + +class SuccessionDiagramTest(unittest.TestCase): def test_succession_diagram_structure(self): - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ x1, x2 x2, x1 x3, !x3 - """) - - SD = SuccessionDiagram(bn) - SD.expand_bfs() - assert SD.G.number_of_nodes() == 3 - assert SD.G.number_of_edges() == 2 - assert max(d['depth'] for n,d in SD.G.nodes(data=True)) == 1 - assert SD.depth() == 1 - assert max([SD.node_depth(i) for i in SD.node_ids()]) == SD.depth() - assert sum(1 for _ in SD.node_ids()) == len(SD) - assert sum(1 for _ in SD.expanded_ids()) == len(SD) - assert len(SD.minimal_trap_spaces()) == 2 - assert SD.find_node({"x1": 1, "x2": 0}) is None - - SD_one = SD - - bn = BooleanNetwork.from_bnet(""" + """ + ) + + succession_diagram = SuccessionDiagram(bn) + succession_diagram.expand_bfs() + assert succession_diagram.G.number_of_nodes() == 3 + assert succession_diagram.G.number_of_edges() == 2 # type: ignore + assert max(d["depth"] for _, d in succession_diagram.G.nodes(data=True)) == 1 # type: ignore + assert succession_diagram.depth() == 1 + assert ( + max( + [ + succession_diagram.node_depth(i) + for i in succession_diagram.node_ids() + ] + ) + == succession_diagram.depth() + ) + assert sum(1 for _ in succession_diagram.node_ids()) == len(succession_diagram) + assert sum(1 for _ in succession_diagram.expanded_ids()) == len( + succession_diagram + ) + assert len(succession_diagram.minimal_trap_spaces()) == 2 + assert succession_diagram.find_node({"x1": 1, "x2": 0}) is None + + succession_diagram_one = succession_diagram + + bn = BooleanNetwork.from_bnet( + """ a, b b, a c, a & c & d | b & !c | c & !d d, !a | d | c - """) - - SD = SuccessionDiagram(bn) + """ + ) + + succession_diagram = SuccessionDiagram(bn) # Initially, nothing is expanded so this should cause an error. with self.assertRaises(KeyError): - SD.node_successors(SD.root()) - + succession_diagram.node_successors(succession_diagram.root()) + # Also, attractors are initially unknown too. with self.assertRaises(KeyError): - SD.node_attractor_seeds(SD.root()) + succession_diagram.node_attractor_seeds(succession_diagram.root()) # Expand the root manually and check that iterators work correctly. - SD.node_successors(SD.root(), compute=True) - assert sum(1 for _ in SD.stub_ids()) == len(SD) - 1 - assert sum(1 for _ in SD.expanded_ids()) == 1 + succession_diagram.node_successors(succession_diagram.root(), compute=True) + assert ( + sum(1 for _ in succession_diagram.stub_ids()) == len(succession_diagram) - 1 + ) + assert sum(1 for _ in succession_diagram.expanded_ids()) == 1 # Then expand the whole thing. - SD.expand_bfs() - assert SD.G.number_of_nodes() == 4 - assert SD.G.number_of_edges() == 5 - assert max(d['depth'] for n,d in SD.G.nodes(data=True)) == 2 - assert SD.depth() == 2 - assert max([SD.node_depth(i) for i in SD.node_ids()]) == SD.depth() - assert sum(1 for _ in SD.node_ids()) == len(SD) - assert sum(1 for _ in SD.expanded_ids()) == len(SD) - assert len(SD.minimal_trap_spaces()) == 2 - assert SD.find_node({"a": 1, "b": 0}) is None + succession_diagram.expand_bfs() + assert succession_diagram.G.number_of_nodes() == 4 + assert succession_diagram.G.number_of_edges() == 5 # type: ignore + assert max(d["depth"] for _, d in succession_diagram.G.nodes(data=True)) == 2 # type: ignore + assert succession_diagram.depth() == 2 + assert ( + max( + [ + succession_diagram.node_depth(i) + for i in succession_diagram.node_ids() + ] + ) + == succession_diagram.depth() + ) + assert sum(1 for _ in succession_diagram.node_ids()) == len(succession_diagram) + assert sum(1 for _ in succession_diagram.expanded_ids()) == len( + succession_diagram + ) + assert len(succession_diagram.minimal_trap_spaces()) == 2 + assert succession_diagram.find_node({"a": 1, "b": 0}) is None # The comparison functions should work even if the diagrams # are not based on the same network. - assert not SD.is_subgraph(SD_one) - assert not SD_one.is_subgraph(SD) - assert not SD.is_isomorphic(SD_one) + assert not succession_diagram.is_subgraph(succession_diagram_one) + assert not succession_diagram_one.is_subgraph(succession_diagram) + assert not succession_diagram.is_isomorphic(succession_diagram_one) + + succession_diagram_partial = SuccessionDiagram(bn) + succession_diagram_partial.node_successors( + succession_diagram.root(), compute=True + ) - SD_partial = SuccessionDiagram(bn) - SD_partial.node_successors(SD.root(), compute=True) + assert succession_diagram_partial.is_subgraph(succession_diagram) + assert not succession_diagram.is_subgraph(succession_diagram_partial) - assert SD_partial.is_subgraph(SD) - assert not SD.is_subgraph(SD_partial) def test_expansion_depth_limit_bfs(): bn = BooleanNetwork.from_file("bbm-bnet-inputs-true/033.bnet") @@ -82,7 +116,8 @@ def test_expansion_depth_limit_bfs(): assert not sd.expand_bfs(bfs_level_limit=3) assert sd.expand_bfs(bfs_level_limit=10) assert len(sd) == 432 - + + def test_expansion_depth_limit_dfs(): bn = BooleanNetwork.from_file("bbm-bnet-inputs-true/033.bnet") @@ -91,6 +126,7 @@ def test_expansion_depth_limit_dfs(): assert sd.expand_dfs(dfs_stack_limit=10) assert len(sd) == 432 + def test_expansion_size_limit_bfs(): bn = BooleanNetwork.from_file("bbm-bnet-inputs-true/033.bnet") @@ -99,6 +135,7 @@ def test_expansion_size_limit_bfs(): assert sd.expand_bfs(size_limit=500) assert len(sd) == 432 + def test_expansion_size_limit_dfs(): bn = BooleanNetwork.from_file("bbm-bnet-inputs-true/033.bnet") @@ -107,9 +144,11 @@ def test_expansion_size_limit_dfs(): assert sd.expand_dfs(size_limit=500) assert len(sd) == 432 + # TODO: add tests for a wider variety of networks -def test_expansion_comparisons(network_file): + +def test_expansion_comparisons(network_file: str): # Compare the succession diagrams for various expansion methods. nfvsmotifs.SuccessionDiagram.DEBUG = True NODE_LIMIT = 100 @@ -119,14 +158,14 @@ def test_expansion_comparisons(network_file): bn = BooleanNetwork.from_file(network_file) bn = bn.infer_regulatory_graph() - + sd_bfs = SuccessionDiagram(bn) bfs_success = sd_bfs.expand_bfs(bfs_level_limit=DEPTH_LIMIT, size_limit=NODE_LIMIT) sd_dfs = SuccessionDiagram(bn) dfs_success = sd_dfs.expand_dfs(dfs_stack_limit=DEPTH_LIMIT, size_limit=NODE_LIMIT) if not (bfs_success and dfs_success): - # SD is too large for this test. + # succession_diagram is too large for this test. return sd_min = SuccessionDiagram(bn) @@ -144,8 +183,8 @@ def test_expansion_comparisons(network_file): # This will go through the minimal trap spaces of this network # and try to only expand towards this minimum trap space as a target. - # This should always create a SD with exactly one minimal trap space, - # as the rest + # This should always create a succession_diagram with exactly one minimal trap space, + # as the rest for min_trap in sd_bfs.minimal_trap_spaces(): space = sd_bfs.node_space(min_trap) @@ -155,9 +194,11 @@ def test_expansion_comparisons(network_file): assert sd_target.is_subgraph(sd_bfs) assert len(sd_target.minimal_trap_spaces()) == 1 -def test_attractor_detection(network_file): + +def test_attractor_detection(network_file: str): # TODO: Once attractor detection is faster, we should increase this limit. - # Right now, checking attractors in larger succession diagrams would often time out our CI. + # Right now, checking attractors in larger succession diagrams would often + # time out our CI. NODE_LIMIT = 100 # This is unfortunately necessary for PyEDA Boolean expression parser (for now). @@ -166,9 +207,10 @@ def test_attractor_detection(network_file): # TODO: Remove these once method is fast enough. print(network_file) if network_file.endswith("146.bnet"): - # For this model, we can compute the 100 SD nodes, but it takes a very long time - # and the SD is larger, so we wouldn't get to attractor computation anyway. - NODE_LIMIT = 10 + # For this model, we can compute the 100 succession_diagram nodes, but + # it takes a very long time and the succession_diagram is larger, so we + # wouldn't get to attractor computation anyway. + NODE_LIMIT = 10 # type: ignore bn = BooleanNetwork.from_file(network_file) bn = bn.infer_regulatory_graph() @@ -178,21 +220,21 @@ def test_attractor_detection(network_file): sd = SuccessionDiagram(bn) fully_expanded = sd.expand_bfs(bfs_level_limit=1000, size_limit=NODE_LIMIT) - # SD must be fully expanded, otherwise we may miss some results. - # If SD is not fully expanded, we just skip this network. + # succession_diagram must be fully expanded, otherwise we may miss some results. + # If succession_diagram is not fully expanded, we just skip this network. if not fully_expanded: return - # TODO: Remove these once method is fast enough. + # TODO: Remove these once method is fast enough. if network_file.endswith("075.bnet"): - # It seems that with current NFVS, the clingo fixed-point part takes too long. There are - # better NFVS-es that we could try, but we first need to make the NFVS algorithm deterministic. + # It seems that with current NFVS, the clingo fixed-point part takes too + # long. There are better NFVS-es that we could try, but we first need to + # make the NFVS algorithm deterministic. return - # Compute attractors in diagram nodes. # TODO: There will probably be a method that does this in one "go". - nfvs_attractors = [] + nfvs_attractors: list[dict[str, int]] = [] for i in sd.node_ids(): attr = sd.node_attractor_seeds(i, compute=True) for a in attr: @@ -202,36 +244,38 @@ def test_attractor_detection(network_file): nfvs_attractors += attr # Compute symbolic attractors using AEON. - symbolic_attractors = find_attractors(stg) + symbolic_attractors = find_attractors(stg) # type: ignore # Check that every "seed" returned by SuccessionDiagram appears in # some symbolic attractor, and that every symbolic attractor contains # at most one such "seed" state. for seed in nfvs_attractors: - symbolic_seed = stg.fix_subspace({ k:bool(v) for k,v in seed.items() }) + symbolic_seed = stg.fix_subspace({k: bool(v) for k, v in seed.items()}) # type: ignore found = None - + # The "seed" state must have a symbolic attractor (and that # attractor mustn't have been removed yet). - for i in range(len(symbolic_attractors)): - if symbolic_seed.is_subset(symbolic_attractors[i]): + for i in range(len(symbolic_attractors)): # type: ignore + if symbolic_seed.is_subset(symbolic_attractors[i]): # type: ignore found = i assert found is not None - symbolic_attractors.pop(found) + symbolic_attractors.pop(found) # type: ignore print("Attractors:", len(nfvs_attractors)) # All symbolic attractors must be covered by some seed at this point. - assert len(symbolic_attractors) == 0 + assert len(symbolic_attractors) == 0 # type: ignore + -def test_attractor_expansion(network_file): - # This test is similar to the "test attractor detection" function above, - # but it will perform only a partial expansion of the succession diagram, - # which is hopefully faster. +def test_attractor_expansion(network_file: str): + # This test is similar to the "test attractor detection" function above, but + # it will perform only a partial expansion of the succession diagram, which + # is hopefully faster. # TODO: Once attractor detection is faster, we should increase this limit. - # Right now, checking attractors in larger succession diagrams would often time out our CI. + # Right now, checking attractors in larger succession diagrams would often + # time out our CI. NODE_LIMIT = 100 # This is unfortunately necessary for PyEDA Boolean expression parser (for now). @@ -240,9 +284,10 @@ def test_attractor_expansion(network_file): # TODO: Remove these once method is fast enough. print(network_file) if network_file.endswith("146.bnet"): - # For this model, we can compute the 100 SD nodes, but it takes a very long time - # and the SD is larger, so we wouldn't get to attractor computation anyway. - NODE_LIMIT = 10 + # For this model, we can compute the 100 succession_diagram nodes, but + # it takes a very long time and the succession_diagram is larger, so we + # wouldn't get to attractor computation anyway. + NODE_LIMIT = 10 # type: ignore bn = BooleanNetwork.from_file(network_file) bn = bn.infer_regulatory_graph() @@ -252,21 +297,21 @@ def test_attractor_expansion(network_file): sd = SuccessionDiagram(bn) fully_expanded = sd.expand_attractor_seeds(size_limit=NODE_LIMIT) - # SD must be fully expanded, otherwise we may miss some results. - # If SD is not fully expanded, we just skip this network. + # succession_diagram must be fully expanded, otherwise we may miss some results. + # If succession_diagram is not fully expanded, we just skip this network. if not fully_expanded: return - # TODO: Remove these once method is fast enough. + # TODO: Remove these once method is fast enough. if network_file.endswith("075.bnet"): - # It seems that with current NFVS, the clingo fixed-point part takes too long. There are - # better NFVS-es that we could try, but we first need to make the NFVS algorithm deterministic. + # It seems that with current NFVS, the clingo fixed-point part takes too + # long. There are better NFVS-es that we could try, but we first need to + # make the NFVS algorithm deterministic. return - # Compute attractors in diagram nodes. # TODO: There will probably be a method that does this in one "go". - nfvs_attractors = [] + nfvs_attractors: list[dict[str, int]] = [] # This is an important change compared to the original test: Here, we only # care about expanded nodes, everything else is ignored. for i in sd.expanded_ids(): @@ -278,25 +323,25 @@ def test_attractor_expansion(network_file): nfvs_attractors += attr # Compute symbolic attractors using AEON. - symbolic_attractors = find_attractors(stg) + symbolic_attractors = find_attractors(stg) # type: ignore # Check that every "seed" returned by SuccessionDiagram appears in # some symbolic attractor, and that every symbolic attractor contains # at most one such "seed" state. for seed in nfvs_attractors: - symbolic_seed = stg.fix_subspace({ k:bool(v) for k,v in seed.items() }) + symbolic_seed = stg.fix_subspace({k: bool(v) for k, v in seed.items()}) # type: ignore found = None - + # The "seed" state must have a symbolic attractor (and that # attractor mustn't have been removed yet). - for i in range(len(symbolic_attractors)): - if symbolic_seed.is_subset(symbolic_attractors[i]): + for i in range(len(symbolic_attractors)): # type: ignore + if symbolic_seed.is_subset(symbolic_attractors[i]): # type: ignore found = i assert found is not None - symbolic_attractors.pop(found) + symbolic_attractors.pop(found) # type: ignore print("Attractors:", len(nfvs_attractors)) # All symbolic attractors must be covered by some seed at this point. - assert len(symbolic_attractors) == 0 \ No newline at end of file + assert len(symbolic_attractors) == 0 # type: ignore diff --git a/tests/terminal_restriction_space_test.py b/tests/terminal_restriction_space_test.py index 63665e14..e99a9ed4 100644 --- a/tests/terminal_restriction_space_test.py +++ b/tests/terminal_restriction_space_test.py @@ -1,46 +1,80 @@ -from biodivine_aeon import BooleanNetwork # type: ignore +from biodivine_aeon import BooleanNetwork # type: ignore + +from nfvsmotifs.terminal_restriction_space import ( + get_self_neg_tr_trap_spaces, + get_terminal_restriction_space, + state_list_to_bdd, +) from nfvsmotifs.trappist_core import trappist -from nfvsmotifs.terminal_restriction_space import * + def test_tr_trap_spaces(): """ TODO: need to make a test using real models """ - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ A, B B, A | B - """) + """ + ) tr_trap_spaces = trappist(bn, problem="max", reverse_time=True) - assert {'A': 0, 'B': 1} in tr_trap_spaces - assert {'B': 0} in tr_trap_spaces - assert [{'A': 0, 'B': 1}] == get_self_neg_tr_trap_spaces(bn) + assert {"A": 0, "B": 1} in tr_trap_spaces + assert {"B": 0} in tr_trap_spaces + assert [{"A": 0, "B": 1}] == get_self_neg_tr_trap_spaces(bn) + def test_get_terminal_restriction_space(): - network = BooleanNetwork.from_bnet(""" + network = BooleanNetwork.from_bnet( + """ A, !A & !B | C B, !A & !B | C C, A & B - """) - stable_motifs = [{'A':1, 'B':1, 'C':1}] + """ + ) + stable_motifs = [{"A": 1, "B": 1, "C": 1}] + + trs = get_terminal_restriction_space( + stable_motifs, + network, + ensure_subspace={}, + use_single_node_drivers=True, + use_tr_trapspaces=True, + ) - TRS = get_terminal_restriction_space(stable_motifs, network, ensure_subspace={}, use_single_node_drivers = True, use_tr_trapspaces = True) + assert trs == state_list_to_bdd( + [{"A": 0, "B": 0, "C": 0}, {"A": 1, "B": 0, "C": 0}, {"A": 0, "B": 1, "C": 0}] + ) - assert TRS == state_list_to_bdd([{'A':0,'B':0,'C':0},{'A':1,'B':0,'C':0},{'A':0,'B':1,'C':0}]) def test_get_terminal_restriction_space2(): - network = BooleanNetwork.from_bnet(""" + network = BooleanNetwork.from_bnet( + """ A, !D | (A & !B & C) B, E & !(A & !B & C) C, F | (A & !B & C) D, C E, A F, B - """) - stable_motifs = [{'A':1, 'B':0, 'C':1}] + """ + ) + stable_motifs = [{"A": 1, "B": 0, "C": 1}] - TRS = get_terminal_restriction_space(stable_motifs, network, ensure_subspace={}, use_single_node_drivers = True, use_tr_trapspaces = True) + trs = get_terminal_restriction_space( + stable_motifs, + network, + ensure_subspace={}, + use_single_node_drivers=True, + use_tr_trapspaces=True, + ) - assert TRS == state_list_to_bdd([{'A':0},{'B':1},{'C':0},]) \ No newline at end of file + assert trs == state_list_to_bdd( + [ + {"A": 0}, + {"B": 1}, + {"C": 0}, + ] + ) diff --git a/tests/trappist_core_test.py b/tests/trappist_core_test.py index 4ee7035d..2b47fc71 100644 --- a/tests/trappist_core_test.py +++ b/tests/trappist_core_test.py @@ -1,15 +1,22 @@ # type: ignore -from biodivine_aeon import BooleanNetwork, SymbolicAsyncGraph, FixedPoints # type: ignore -from nfvsmotifs.trappist_core import trappist, compute_fixed_point_reduced_STG +import sys + +from biodivine_aeon import ( # type: ignore + BooleanNetwork, + FixedPoints, + SymbolicAsyncGraph, +) + from nfvsmotifs.aeon_utils import remove_static_constraints -from nfvsmotifs.space_utils import is_syntactic_trap_space from nfvsmotifs.petri_net_translation import network_to_petrinet -import sys +from nfvsmotifs.space_utils import is_syntactic_trap_space +from nfvsmotifs.trappist_core import compute_fixed_point_reduced_STG, trappist # TODO: Right now, this is necessary to correctly parse some of the larger models # using PyEDA. In the future, we should ideally use a parser that does not have this problem. sys.setrecursionlimit(100_000) + def test_network_minimum_traps(network_file): bn = BooleanNetwork.from_file(network_file) bn = remove_static_constraints(bn) @@ -18,7 +25,9 @@ def test_network_minimum_traps(network_file): min_max_traps = trappist(bn, problem="min") + trappist(bn, problem="max") pn = network_to_petrinet(bn) - min_max_traps_pre_encoded = trappist(pn, problem="min") + trappist(pn, problem="max") + min_max_traps_pre_encoded = trappist(pn, problem="min") + trappist( + pn, problem="max" + ) assert min_max_traps == min_max_traps_pre_encoded @@ -30,11 +39,12 @@ def test_network_minimum_traps(network_file): if is_syntactic_trap_space(bn, trap): continue # Then a proper symbolic check that should be reliable every time. - symbolic_space = stg.fix_subspace({ x: bool(int(trap[x])) for x in trap }) + symbolic_space = stg.fix_subspace({x: bool(int(trap[x])) for x in trap}) if stg.is_trap_set(symbolic_space): continue raise Exception(f"Failed on {network_file}: {trap} is not a trap space.") + def test_network_fixed_points(network_file): # Verify that the fixed-points of the test models are the same # as when computing using BDDs. @@ -46,57 +56,87 @@ def test_network_fixed_points(network_file): trappist_fixed_points = trappist(bn, problem="fix") for fixed_point in trappist_fixed_points: # Convert trappist result to a symbolic singleton set. - vertex = stg.fix_subspace({ x: bool(int(fixed_point[x])) for x in fixed_point }) + vertex = stg.fix_subspace({x: bool(int(fixed_point[x])) for x in fixed_point}) # Check that the fixed-point has been found, and remove it. - assert vertex.is_subset(symbolic_fixed_points), \ - f"Failed on {network_file}: {fixed_point} is not in symbolic fixed points." + assert vertex.is_subset( + symbolic_fixed_points + ), f"Failed on {network_file}: {fixed_point} is not in symbolic fixed points." symbolic_fixed_points = symbolic_fixed_points.minus(vertex) # In the end, all fixed-points must have been found. - assert symbolic_fixed_points.is_empty(), \ - f"Failed on {network_file}: Some symbolic fixed points not detected by trappist." + assert ( + symbolic_fixed_points.is_empty() + ), f"Failed on {network_file}: Some symbolic fixed points not detected by trappist." + def test_network_fixed_point_reduced_STG(): - # Validate the function for computing fixed points of the reduced STG + # Validate the function for computing fixed points of the reduced STG # on a single small input. - bn = BooleanNetwork.from_bnet(""" + bn = BooleanNetwork.from_bnet( + """ x1, (x1 & x2) | (!x1 & !x2) x2, (x1 & x2) | (!x1 & !x2) - """) + """ + ) petri_net = network_to_petrinet(bn) - avoid_subspace_1 = {"x1" : 1, "x2" : 1} + avoid_subspace_1 = {"x1": 1, "x2": 1} avoid_subspace_2 = {} - avoid_subspace_3 = {"x2" : 1} + avoid_subspace_3 = {"x2": 1} ensure_subspace_1 = {} - ensure_subspace_2 = {"x1" : 0, "x2" : 0} + ensure_subspace_2 = {"x1": 0, "x2": 0} - - retained_set = {"x1" : 0, "x2" : 0} + retained_set = {"x1": 0, "x2": 0} candidate_set = compute_fixed_point_reduced_STG(petri_net, retained_set) - assert len(candidate_set) == 2 # candidate_set = {00, 11} - - candidate_set = compute_fixed_point_reduced_STG(petri_net, retained_set, ensure_subspace=ensure_subspace_1, avoid_subspaces=[avoid_subspace_1]) - assert len(candidate_set) == 1 # candidate_set = {00} - - candidate_set = compute_fixed_point_reduced_STG(petri_net, retained_set, ensure_subspace=ensure_subspace_1, avoid_subspaces=[avoid_subspace_2]) - assert len(candidate_set) == 0 # candidate_set = empty - - candidate_set = compute_fixed_point_reduced_STG(petri_net, retained_set, ensure_subspace=ensure_subspace_2) - assert len(candidate_set) == 1 # candidate_set = {00} - - retained_set = {"x1" : 1, "x2" : 1} + assert len(candidate_set) == 2 # candidate_set = {00, 11} + + candidate_set = compute_fixed_point_reduced_STG( + petri_net, + retained_set, + ensure_subspace=ensure_subspace_1, + avoid_subspaces=[avoid_subspace_1], + ) + assert len(candidate_set) == 1 # candidate_set = {00} + + candidate_set = compute_fixed_point_reduced_STG( + petri_net, + retained_set, + ensure_subspace=ensure_subspace_1, + avoid_subspaces=[avoid_subspace_2], + ) + assert len(candidate_set) == 0 # candidate_set = empty + + candidate_set = compute_fixed_point_reduced_STG( + petri_net, retained_set, ensure_subspace=ensure_subspace_2 + ) + assert len(candidate_set) == 1 # candidate_set = {00} + + retained_set = {"x1": 1, "x2": 1} candidate_set = compute_fixed_point_reduced_STG(petri_net, retained_set) - assert len(candidate_set) == 3 # candidate_set = {01, 10, 11} - - candidate_set = compute_fixed_point_reduced_STG(petri_net, retained_set, ensure_subspace=ensure_subspace_1, avoid_subspaces=[avoid_subspace_1]) - assert len(candidate_set) == 2 # candidate_set = {01, 10} - - candidate_set = compute_fixed_point_reduced_STG(petri_net, retained_set, ensure_subspace=ensure_subspace_1, avoid_subspaces=[avoid_subspace_2]) - assert len(candidate_set) == 0 # candidate_set = empty - - candidate_set = compute_fixed_point_reduced_STG(petri_net, retained_set, ensure_subspace=ensure_subspace_1, avoid_subspaces=[avoid_subspace_3]) - assert len(candidate_set) == 1 # candidate_set = {10} - + assert len(candidate_set) == 3 # candidate_set = {01, 10, 11} + + candidate_set = compute_fixed_point_reduced_STG( + petri_net, + retained_set, + ensure_subspace=ensure_subspace_1, + avoid_subspaces=[avoid_subspace_1], + ) + assert len(candidate_set) == 2 # candidate_set = {01, 10} + + candidate_set = compute_fixed_point_reduced_STG( + petri_net, + retained_set, + ensure_subspace=ensure_subspace_1, + avoid_subspaces=[avoid_subspace_2], + ) + assert len(candidate_set) == 0 # candidate_set = empty + + candidate_set = compute_fixed_point_reduced_STG( + petri_net, + retained_set, + ensure_subspace=ensure_subspace_1, + avoid_subspaces=[avoid_subspace_3], + ) + assert len(candidate_set) == 1 # candidate_set = {10}