diff --git a/biobalm/control.py b/biobalm/control.py index 1f836ea..86ceb45 100644 --- a/biobalm/control.py +++ b/biobalm/control.py @@ -5,13 +5,14 @@ from __future__ import annotations +from functools import reduce from itertools import combinations, product -from typing import Literal, cast +from typing import Iterator, Literal, cast import networkx as nx # type: ignore from biodivine_aeon import AsynchronousGraph, BooleanNetwork -from biobalm.space_utils import is_subspace, percolate_space +from biobalm.space_utils import intersect, is_subspace, percolate_space from biobalm.succession_diagram import SuccessionDiagram from biobalm.types import BooleanSpace, ControlOverrides, SubspaceSuccession @@ -158,6 +159,19 @@ def __str__(self): else: return "unknown strategy: " + self.__repr__() + def all_control_strategies(self) -> Iterator[ControlOverrides]: + """ + Returns all possible combinations of `ControlOverrides` sequences that + can be used to execute this `Intervention`. + + Internally, an intervention consists of multiple control steps that + need to be taken sequentially. For each step in the sequence, an intervention + can have multiple options of how to execute it. With this method, + we can generate the actual sequences that arise by combining all the + available options for each step. + """ + return map(lambda x: list(x), product(*self._control)) + def succession_control( succession_diagram: SuccessionDiagram, @@ -166,6 +180,7 @@ def succession_control( max_drivers_per_succession_node: int | None = None, forbidden_drivers: set[str] | None = None, successful_only: bool = True, + skip_feedforward_successions: bool = False, ) -> list[Intervention]: """ Performs succession-diagram control to reach a target subspace. @@ -280,7 +295,10 @@ def succession_control( interventions: list[Intervention] = [] successions = successions_to_target( - succession_diagram, target=target, expand_diagram=True + succession_diagram, + target=target, + expand_diagram=True, + skip_feedforward_successions=skip_feedforward_successions, ) for succession in successions: @@ -303,16 +321,17 @@ def successions_to_target( succession_diagram: SuccessionDiagram, target: BooleanSpace, expand_diagram: bool = True, + skip_feedforward_successions: bool = False, ) -> list[SubspaceSuccession]: """Find lists of nested trap spaces (successions) that lead to the specified target subspace. Generally, it is not necessary to call this function directly, as it is automatically invoked by - :func:`succession_control`. It is primarily - provided in the public API for testing and benchmarking purposes, or in the - case that the user wants to implement a custom strategy to identify - succession drivers rather than relying on + :func:`succession_control`. It is + primarily provided in the public API for testing and benchmarking purposes, + or in the case that the user wants to implement a custom strategy to + identify succession drivers rather than relying on :func:`drivers_of_succession`. Parameters @@ -324,6 +343,12 @@ def successions_to_target( expand_diagram: bool Whether to ensure that the succession diagram is expanded enough to capture all paths to the target (default: True). + skip_feedforward_successions: bool + Whether to skip redundant successions (default: False). Skipping these + can reduce the number of interventions to test, yielding performance + improvements, but can also cause the algorithm to miss some valid + interventions, particularly in cases when the order of intervention + application is important. Returns ------- @@ -332,6 +357,11 @@ def successions_to_target( nested trap spaces that specify the target. """ successions: list[SubspaceSuccession] = [] + # Tracks the combined perturbation that needs to be applied for the whole + # succession to take effect. We use this to detect which successions are + # redundant when they can be replaced by a succession with a subset + # signature. Used when skip_feedforward_successions is True + succession_signatures: list[BooleanSpace] = [] # expand the succession_diagram toward the target if expand_diagram: @@ -339,9 +369,34 @@ def successions_to_target( target=target, ) + # these contradict the target or have a motif avoidant attractor that isn't + # in full agreement with the target + hot_lava_nodes: set[int] = set() + + descendant_map: dict[int, set[int]] = {} + for s in succession_diagram.node_ids(): + is_consistent = intersect(succession_diagram.node_data(s)["space"], target) + is_goal = is_subspace(succession_diagram.node_data(s)["space"], target) + is_minimal = succession_diagram.node_is_minimal(s) + if not is_consistent or (not is_goal and is_minimal): + hot_lava_nodes.add(s) + + descendant_map[s] = set(nx.descendants(succession_diagram.dag, s)) # type: ignore + descendant_map[s].add(s) # for our purposes, s is its own descendant + found_valid_target_node = False for s in succession_diagram.node_ids(): - fixed_vars = succession_diagram.node_data(s)["space"] - if not is_subspace(fixed_vars, target): + # a node is a valid end point if all + # 1) its descendents (including itself) are not "hot lava" + # 2) it has a parent that is or can reach "hot lava" (otherwise, we can + # just control to the parent) note that all nodes are either cold lava + # or hot lava, but not both or neither + if descendant_map[s] & hot_lava_nodes: + continue + found_valid_target_node = True + if not any( + descendant_map[p] & hot_lava_nodes + for p in succession_diagram.dag.predecessors(s) # type: ignore + ): continue for path in cast( @@ -352,11 +407,37 @@ def successions_to_target( target=s, ), ): - succession = [ - succession_diagram.edge_stable_motif(x, y, reduced=True) + motif_list = [ + succession_diagram.edge_all_stable_motifs(x, y, reduced=True) for x, y in zip(path[:-1], path[1:]) ] - successions.append(succession) + for succession_tuple in product(*motif_list): + succession = list(succession_tuple) + if skip_feedforward_successions: + signature = reduce(lambda x, y: x | y, succession) + # First, check if any existing successions can be eliminated + # because they are redundant w.r.t. to this succession. + # (`reversed` is important here, because that way a delete + # only impacts indices that we already processed) + skip_completely = False + for i in reversed(range(len(succession_signatures))): + existing_signature = succession_signatures[i] + if is_subspace(signature, existing_signature): + # The current `path` is already superseded by a path in successions. + skip_completely = True + break + if is_subspace(existing_signature, signature): + # A path in successions is made redundant by the current path. + del succession_signatures[i] + del successions[i] + if skip_completely: + continue + + succession_signatures.append(signature) + successions.append(succession) + + if found_valid_target_node and len(successions) == 0: + successions = [[]] return successions diff --git a/biobalm/succession_diagram.py b/biobalm/succession_diagram.py index d74f6e8..282a6a1 100644 --- a/biobalm/succession_diagram.py +++ b/biobalm/succession_diagram.py @@ -1087,6 +1087,22 @@ def node_percolated_petri_net( return percolated_pn + def edge_all_stable_motifs( + self, parent_id: int, child_id: int, reduced: bool = False + ) -> list[BooleanSpace]: + """ + Similar to `edge_stable_motif`, but returns all motifs associated with an edge. + """ + all_motifs: list[BooleanSpace] = cast(list[BooleanSpace], self.dag.edges[parent_id, child_id]["all_motifs"]) # type: ignore + if reduced: + result: list[BooleanSpace] = [] + node_space = self.node_data(parent_id)["space"] + for m in all_motifs: + result.append({k: v for k, v in m.items() if k not in node_space}) + return result + else: + return all_motifs + def edge_stable_motif( self, parent_id: int, child_id: int, reduced: bool = False ) -> BooleanSpace: @@ -1641,5 +1657,7 @@ def _ensure_edge(self, parent_id: int, child_id: int, stable_motif: BooleanSpace # can be reached through multiple stable motifs. Not sure how to # approach these... but this is probably good enough for now. if not self.dag.has_edge(parent_id, child_id): # type: ignore - self.dag.add_edge(parent_id, child_id, motif=stable_motif) # type: ignore + self.dag.add_edge(parent_id, child_id, motif=stable_motif, all_motifs=[stable_motif]) # type: ignore + else: + self.dag.edges[parent_id, child_id]["all_motifs"].append(stable_motif) # type: ignore self._update_node_depth(child_id, parent_id) diff --git a/tests/control_test.py b/tests/control_test.py index 3f2f95d..0f7808a 100644 --- a/tests/control_test.py +++ b/tests/control_test.py @@ -305,3 +305,36 @@ def test_size_restriction(): sd, target, max_drivers_per_succession_node=1, successful_only=True ) assert len(interventions) == 0 + + +def test_no_control_needed(): + rules = """ + a, b + b, a | c + c, !a + """ + target: BooleanSpace = {"a": 1, "b": 1, "c": 0} + + sd = SuccessionDiagram.from_rules(rules) + sd.build() + interventions = succession_control(sd, target) + + assert len(interventions) == 1 + intervention = interventions[0] + assert intervention.successful + assert intervention.control == [] + + +def test_no_control_possible(): + rules = """ + a, b + b, a | c + c, !a + """ + target: BooleanSpace = {"a": 0, "b": 1, "c": 0} + + sd = SuccessionDiagram.from_rules(rules) + sd.build() + interventions = succession_control(sd, target) + + assert len(interventions) == 0