From 5f5bf1c06778394e7d94508e0090fc58edc430c8 Mon Sep 17 00:00:00 2001 From: iopapamanoglou Date: Wed, 4 Sep 2024 11:55:51 +0200 Subject: [PATCH] WIP --- examples/iterative_design_nand.py | 25 +- examples/signal_processing.py | 3 +- src/faebryk/core/graphinterface.py | 13 +- src/faebryk/core/module.py | 94 +++++ src/faebryk/core/moduleinterface.py | 86 +++-- src/faebryk/core/node.py | 163 +++++++-- src/faebryk/core/parameter.py | 18 +- src/faebryk/core/util.py | 343 +----------------- src/faebryk/exporters/netlist/graph.py | 5 +- .../parameters/parameters_to_file.py | 6 +- .../pcb/layout/heuristic_decoupling.py | 8 +- .../exporters/pcb/layout/heuristic_pulls.py | 8 +- src/faebryk/exporters/pcb/routing/routing.py | 6 +- src/faebryk/exporters/pcb/routing/util.py | 19 +- .../exporters/visualize/interactive_graph.py | 7 +- .../CBM9002A_56ILG_Reference_Design.py | 4 +- src/faebryk/library/ESP32_C3.py | 15 +- src/faebryk/library/ElectricLogic.py | 11 +- src/faebryk/library/ElectricLogicGate.py | 6 +- src/faebryk/library/Electrical.py | 13 + src/faebryk/library/Footprint.py | 4 +- src/faebryk/library/Net.py | 10 +- src/faebryk/library/Pad.py | 7 +- src/faebryk/library/Powered_Relay.py | 4 +- src/faebryk/library/USB_C_PSU_Vertical.py | 11 +- .../has_pcb_routing_strategy_manual.py | 4 +- src/faebryk/libs/app/manufacturing.py | 3 +- src/faebryk/libs/app/parameters.py | 4 +- src/faebryk/libs/app/pcb.py | 7 +- src/faebryk/libs/examples/buildutil.py | 3 +- src/faebryk/libs/examples/pickers.py | 3 +- src/faebryk/libs/picker/jlcpcb/jlcpcb.py | 5 +- src/faebryk/libs/picker/picker.py | 22 +- src/faebryk/libs/util.py | 92 ++++- test/core/test_hierarchy_connect.py | 13 +- test/core/test_parameters.py | 3 +- test/core/test_performance.py | 12 +- test/core/test_util.py | 8 +- 38 files changed, 493 insertions(+), 575 deletions(-) diff --git a/examples/iterative_design_nand.py b/examples/iterative_design_nand.py index 305c2f15..91038587 100644 --- a/examples/iterative_design_nand.py +++ b/examples/iterative_design_nand.py @@ -18,11 +18,7 @@ import faebryk.library._F as F from faebryk.core.module import Module -from faebryk.core.util import ( - get_all_nodes_with_trait, - specialize_interface, - specialize_module, -) +from faebryk.core.util import get_all_nodes_with_trait from faebryk.libs.brightness import TypicalLuminousIntensity from faebryk.libs.examples.buildutil import apply_design_to_pcb from faebryk.libs.library import L @@ -89,10 +85,10 @@ def App(): logic_in.connect_via(switch, on) # bring logic signals into electrical domain - e_in = specialize_interface(logic_in, F.ElectricLogic()) - e_out = specialize_interface(logic_out, F.ElectricLogic()) - e_on = specialize_interface(on, F.ElectricLogic()) - e_off = specialize_interface(off, F.ElectricLogic()) + e_in = logic_in.specialize(F.ElectricLogic()) + e_out = logic_out.specialize(F.ElectricLogic()) + e_on = on.specialize(F.ElectricLogic()) + e_off = off.specialize(F.ElectricLogic()) e_in.reference.connect(power) e_out.reference.connect(power) e_on.reference.connect(power) @@ -103,13 +99,12 @@ def App(): e_out.connect(led.logic_in) - nxor = specialize_module(xor, XOR_with_NANDS()) - battery = specialize_module(power_source, F.Battery()) + nxor = xor.specialize(XOR_with_NANDS()) + battery = power_source.specialize(F.Battery()) - el_switch = specialize_module(switch, F.Switch(F.ElectricLogic)()) + el_switch = switch.specialize(F.Switch(F.ElectricLogic)()) e_switch = F.Switch(F.Electrical)() - e_switch = specialize_module( - el_switch, + e_switch = el_switch.specialize( e_switch, matrix=[(e, el.signal) for e, el in zip(e_switch.unnamed, el_switch.unnamed)], ) @@ -136,7 +131,7 @@ def App(): # packages single nands as explicit IC nand_ic = F.TI_CD4011BE() for ic_nand, xor_nand in zip(nand_ic.gates, nxor.nands): - specialize_module(xor_nand, ic_nand) + xor_nand.specialize(ic_nand) app.add(nand_ic) diff --git a/examples/signal_processing.py b/examples/signal_processing.py index abe3fc63..5f8201a7 100644 --- a/examples/signal_processing.py +++ b/examples/signal_processing.py @@ -11,7 +11,6 @@ import faebryk.library._F as F from faebryk.core.module import Module -from faebryk.core.util import specialize_module from faebryk.libs.examples.buildutil import apply_design_to_pcb from faebryk.libs.logging import setup_basic_logging from faebryk.libs.units import P @@ -30,7 +29,7 @@ def __preinit__(self) -> None: self.lowpass.response.merge(F.Filter.Response.LOWPASS) # Specialize - special = specialize_module(self.lowpass, F.FilterElectricalLC()) + special = self.lowpass.specialize(F.FilterElectricalLC()) # Construct special.get_trait(F.has_construction_dependency).construct() diff --git a/src/faebryk/core/graphinterface.py b/src/faebryk/core/graphinterface.py index 943e8d7d..568d8458 100644 --- a/src/faebryk/core/graphinterface.py +++ b/src/faebryk/core/graphinterface.py @@ -1,7 +1,7 @@ # This file is part of the faebryk project # SPDX-License-Identifier: MIT import logging -from typing import TYPE_CHECKING, Mapping, Optional +from typing import TYPE_CHECKING, Mapping, Optional, cast from typing_extensions import Self, deprecated @@ -105,6 +105,17 @@ def __repr__(self) -> str: else "| " ) + def get_connected_nodes[T: Node](self, types: type[T]) -> set[T]: + from faebryk.core.link import LinkDirect + + return { + n + for n in Node.get_nodes_from_gifs( + g for g, t in self.edges.items() if isinstance(t, LinkDirect) + ) + if isinstance(n, types) + } + class GraphInterfaceHierarchical(GraphInterface): def __init__(self, is_parent: bool) -> None: diff --git a/src/faebryk/core/module.py b/src/faebryk/core/module.py index f4137f98..380a8e7f 100644 --- a/src/faebryk/core/module.py +++ b/src/faebryk/core/module.py @@ -1,12 +1,16 @@ # This file is part of the faebryk project # SPDX-License-Identifier: MIT import logging +from typing import TYPE_CHECKING, Iterable from faebryk.core.graphinterface import GraphInterface from faebryk.core.node import Node from faebryk.core.trait import Trait from faebryk.libs.util import unique_ref +if TYPE_CHECKING: + from faebryk.core.moduleinterface import ModuleInterface + logger = logging.getLogger(__name__) @@ -34,3 +38,93 @@ def get_most_special(self) -> "Module": len(specialest_next) == 1 ), f"Ambiguous specialest {specialest_next} for {self}" return next(iter(specialest_next)) + + def get_children_modules( + self: Node, most_special: bool = True, direct_only: bool = False + ): + out = self.get_children(direct_only=direct_only, types=Module) + if most_special: + out = {n.get_most_special() for n in out} + return out + + def specialize[T: Module]( + self, + special: T, + matrix: list[tuple["ModuleInterface", "ModuleInterface"]] | None = None, + attach_to: Node | None = None, + ) -> T: + from faebryk.core.parameter import Parameter + + logger.debug(f"Specializing Module {self} with {special}" + " " + "=" * 20) + + def get_node_prop_matrix[N: Node](sub_type: type[N]): + return list(self.zip_children_by_name_with(special, sub_type).values()) + + if matrix is None: + matrix = get_node_prop_matrix(ModuleInterface) + + # TODO add warning if not all src interfaces used + + param_matrix = get_node_prop_matrix(Parameter) + + for src, dst in matrix: + if src is None: + continue + if dst is None: + raise Exception(f"Special module misses interface: {src.get_name()}") + src.specialize(dst) + + for src, dst in param_matrix: + if src is None: + continue + if dst is None: + raise Exception(f"Special module misses parameter: {src.get_name()}") + dst.merge(src) + + # TODO this cant work + # for t in self.traits: + # # TODO needed? + # if special.has_trait(t.trait): + # continue + # special.add(t) + + self.specialized.connect(special.specializes) + + # Attach to new parent + has_parent = special.get_parent() is not None + assert not has_parent or attach_to is None + if not has_parent: + if attach_to: + attach_to.add(special, container=attach_to.specialized) + else: + gen_parent = self.get_parent() + if gen_parent: + gen_parent[0].add(special, name=f"{gen_parent[1]}_specialized") + + return special + + @staticmethod + def connect_all_interfaces_by_name( + src: Iterable["Module"] | "Module", + dst: Iterable["Module"] | "Module", + allow_partial: bool = False, + ): + if isinstance(src, Module): + src = [src] + if isinstance(dst, Module): + dst = [dst] + + for src_, dst_ in zip(src, dst): + for k, (src_m, dst_m) in src_.zip_children_by_name_with( + dst_, ModuleInterface + ).items(): + if src_m is None or dst_m is None: + if not allow_partial: + raise Exception(f"Node with name {k} not present in both") + continue + src_m.connect(dst_m) + + def connect_interfaces_by_name(self, *dst: "Module", allow_partial: bool = False): + type(self).connect_all_interfaces_by_name( + self, dst, allow_partial=allow_partial + ) diff --git a/src/faebryk/core/moduleinterface.py b/src/faebryk/core/moduleinterface.py index ae3f0c7c..32a7cc9b 100644 --- a/src/faebryk/core/moduleinterface.py +++ b/src/faebryk/core/moduleinterface.py @@ -22,7 +22,7 @@ ) from faebryk.core.node import Node from faebryk.core.trait import Trait -from faebryk.libs.util import once, print_stack +from faebryk.libs.util import cast_assert, once, print_stack logger = logging.getLogger(__name__) @@ -137,11 +137,32 @@ class _LinkDirectShallowMif( def __preinit__(self) -> None: ... + @staticmethod + def _get_connected(gif: GraphInterface): + assert isinstance(gif.node, ModuleInterface) + connections = gif.edges.items() + + # check if ambiguous links between mifs + assert len(connections) == len({c[0] for c in connections}) + + return { + cast_assert(ModuleInterface, s.node): link + for s, link in connections + if s.node is not gif.node + } + + def get_connected(self): + return self._get_connected(self.connected) + + def get_specialized(self): + return self._get_connected(self.specialized) + + def get_specializes(self): + return self._get_connected(self.specializes) + def _connect_siblings_and_connections( self, other: "ModuleInterface", linkcls: type[Link] ) -> Self: - from faebryk.core.util import get_connected_mifs_with_link - if other is self: return self @@ -158,15 +179,19 @@ def _connect_siblings_and_connections( logger.debug(f"MIF connection: {self} to {other}") def cross_connect( - s_group: dict[ModuleInterface, type[Link]], - d_group: dict[ModuleInterface, type[Link]], + s_group: dict[ModuleInterface, type[Link] | Link], + d_group: dict[ModuleInterface, type[Link] | Link], hint=None, ): if logger.isEnabledFor(logging.DEBUG) and hint is not None: logger.debug(f"Connect {hint} {s_group} -> {d_group}") for s, slink in s_group.items(): + if isinstance(slink, Link): + slink = type(slink) for d, dlink in d_group.items(): + if isinstance(dlink, Link): + dlink = type(dlink) # can happen while connection trees are resolving if s is d: continue @@ -174,25 +199,14 @@ def cross_connect( s._connect_across_hierarchies(d, linkcls=link) - def _get_connected_mifs(gif: GraphInterface): - return {k: type(v) for k, v in get_connected_mifs_with_link(gif).items()} - # Connect to all connections - s_con = _get_connected_mifs(self.connected) | {self: linkcls} - d_con = _get_connected_mifs(other.connected) | {other: linkcls} + s_con = self.get_connected() | {self: linkcls} + d_con = other.get_connected() | {other: linkcls} cross_connect(s_con, d_con, "connections") # Connect to all siblings - s_sib = ( - _get_connected_mifs(self.specialized) - | _get_connected_mifs(self.specializes) - | {self: linkcls} - ) - d_sib = ( - _get_connected_mifs(other.specialized) - | _get_connected_mifs(other.specializes) - | {other: linkcls} - ) + s_sib = self.get_specialized() | self.get_specializes() | {self: linkcls} + d_sib = other.get_specialized() | other.get_specializes() | {other: linkcls} cross_connect(s_sib, d_sib, "siblings") return self @@ -202,12 +216,12 @@ def _on_connect(self, other: "ModuleInterface"): ... def _try_connect_down(self, other: "ModuleInterface", linkcls: type[Link]) -> None: - from faebryk.core.util import zip_children_by_name - if not isinstance(other, type(self)): return - for _, (src, dst) in zip_children_by_name(self, other, ModuleInterface).items(): + for _, (src, dst) in self.zip_children_by_name_with( + other, ModuleInterface + ).items(): if src is None or dst is None: continue src.connect(dst, linkcls=linkcls) @@ -233,12 +247,10 @@ def _is_connected(a, b): assert isinstance(b, ModuleInterface) return a.is_connected_to(b) - from faebryk.core.util import zip_children_by_name - connection_map = [ (src_i, dst_i, _is_connected(src_i, dst_i)) - for src_i, dst_i in zip_children_by_name( - src_m, dst_m, sub_type=ModuleInterface + for src_i, dst_i in src_m.zip_children_by_name_with( + dst_m, sub_type=ModuleInterface ).values() ] @@ -311,12 +323,15 @@ def get_direct_connections(self) -> set["ModuleInterface"]: if isinstance(gif.node, ModuleInterface) and gif.node is not self } - def connect(self, other: Self, linkcls=None) -> Self: + def connect(self: Self, *other: Self, linkcls=None) -> Self: # TODO consider some type of check at the end within the graph instead # assert type(other) is type(self) if linkcls is None: linkcls = LinkDirect - return self._connect_siblings_and_connections(other, linkcls=linkcls) + + for o in other: + self._connect_siblings_and_connections(o, linkcls=linkcls) + return other[-1] if other else self def connect_via( self, bridge: Node | Sequence[Node], other: Self | None = None, linkcls=None @@ -338,3 +353,16 @@ def connect_shallow(self, other: Self) -> Self: def is_connected_to(self, other: "ModuleInterface"): return self.connected.is_connected(other.connected) + + def specialize[T: ModuleInterface](self, special: T) -> T: + logger.debug(f"Specializing MIF {self} with {special}") + + assert isinstance(special, type(self)) + + # This is doing the heavy lifting + self.connect(special) + + # Establish sibling relationship + self.specialized.connect(special.specializes) + + return special diff --git a/src/faebryk/core/node.py b/src/faebryk/core/node.py index 9e64c7c1..8908a108 100644 --- a/src/faebryk/core/node.py +++ b/src/faebryk/core/node.py @@ -2,7 +2,16 @@ # SPDX-License-Identifier: MIT import logging from itertools import chain -from typing import TYPE_CHECKING, Any, Callable, Iterable, Type, get_args, get_origin +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Iterable, + Type, + cast, + get_args, + get_origin, +) from deprecated import deprecated from more_itertools import partition @@ -17,10 +26,14 @@ from faebryk.libs.exceptions import FaebrykException from faebryk.libs.util import ( KeyErrorNotFound, + NotNone, + PostInitCaller, + Tree, cast_assert, find, times, try_avoid_endless_recursion, + zip_dicts_by_key, ) if TYPE_CHECKING: @@ -91,14 +104,6 @@ def __() -> T: return _ -# ----------------------------------------------------------------------------- -class PostInitCaller(type): - def __call__(cls, *args, **kwargs): - obj = type.__call__(cls, *args, **kwargs) - obj.__post_init__(*args, **kwargs) - return obj - - class NodeException(FaebrykException): def __init__(self, node: "Node", *args: object) -> None: super().__init__(*args) @@ -118,6 +123,9 @@ def __init__(self, node: "Node", other: "Node", *args: object) -> None: class NodeNoParent(NodeException): ... +# ----------------------------------------------------------------------------- + + class Node(FaebrykLibObject, metaclass=PostInitCaller): runtime_anon: list["Node"] runtime: dict[str, "Node"] @@ -423,6 +431,8 @@ def get_full_name(self, types: bool = False): else: return ".".join([f"{name}" for _, name in hierarchy]) + # printing ------------------------------------------------------------------------- + @try_avoid_endless_recursion def __str__(self) -> str: return f"<{self.get_full_name(types=True)}>" @@ -432,6 +442,17 @@ def __repr__(self) -> str: id_str = f"(@{hex(id(self))})" if ID_REPR else "" return f"<{self.get_full_name(types=True)}>{id_str}" + def pretty_params(self) -> str: + from faebryk.core.parameter import Parameter + + params = { + NotNone(p.get_parent())[1]: p.get_most_narrow() + for p in self.get_children(direct_only=True, types=Parameter) + } + params_str = "\n".join(f"{k}: {v}" for k, v in params.items()) + + return params_str + # Trait stuff ---------------------------------------------------------------------- @deprecated("Just use add") @@ -477,13 +498,26 @@ def get_trait[V: "Trait"](self, trait: Type[V]) -> V: # Graph stuff ---------------------------------------------------------------------- - def get_node_direct_children_(self): + def _get_children_direct(self): return { gif.node for gif, link in self.get_graph().get_edges(self.children).items() if isinstance(link, LinkNamedParent) } + def _get_children_all(self, include_root: bool): + # TODO looks like get_node_tree is 2x faster + + out = self.bfs_node( + lambda x: isinstance(x, (GraphInterfaceSelf, GraphInterfaceHierarchical)) + and x is not self.parent, + ) + + if not include_root: + out.remove(self) + + return set(out) + def get_children[T: Node]( self, direct_only: bool, @@ -491,33 +525,58 @@ def get_children[T: Node]( include_root: bool = False, f_filter: Callable[[T], bool] | None = None, sort: bool = True, - ): + ) -> set[T]: if direct_only: - out = self.get_node_direct_children_() + out = self._get_children_direct() if include_root: out.add(self) else: - out = self.get_node_children_all(include_root=include_root) + out = self._get_children_all(include_root=include_root) - out = {n for n in out if isinstance(n, types) and (not f_filter or f_filter(n))} + if types is not Node or f_filter: + out = { + n for n in out if isinstance(n, types) and (not f_filter or f_filter(n)) + } + + out = cast(set[T], out) if sort: out = set(sorted(out, key=lambda n: n.get_name())) return out - def get_node_children_all(self, include_root=True) -> list["Node"]: - # TODO looks like get_node_tree is 2x faster + def get_tree[T: Node]( + self, + types: type[T] | tuple[type[T], ...], + include_root: bool = True, + f_filter: Callable[[T], bool] | None = None, + sort: bool = True, + ) -> Tree[T]: + out = self.get_children( + direct_only=True, + types=types, + f_filter=f_filter, + sort=sort, + ) - out = self.bfs_node( - lambda x: isinstance(x, (GraphInterfaceSelf, GraphInterfaceHierarchical)) - and x is not self.parent, + tree = Tree[T]( + { + n: n.get_tree( + types=types, + include_root=False, + f_filter=f_filter, + sort=sort, + ) + for n in out + } ) - if not include_root: - out.remove(self) + if include_root: + if isinstance(self, types): + if not f_filter or f_filter(self): + tree = Tree[T]({self: tree}) - return list(out) + return tree def bfs_node(self, filter: Callable[[GraphInterface], bool]): return Node.get_nodes_from_gifs( @@ -530,3 +589,63 @@ def get_nodes_from_gifs(gifs: Iterable[GraphInterface]): return {gif.node for gif in gifs} # TODO what is faster # return {n.node for n in gifs if isinstance(n, GraphInterfaceSelf)} + + @staticmethod + def get_direct_connected_nodes[T: Node]( + gif: GraphInterface, types: type[T] + ) -> set[T]: + from faebryk.core.link import LinkDirect + + out = Node.get_nodes_from_gifs( + g for g, t in gif.edges.items() if isinstance(t, LinkDirect) + ) + assert all(isinstance(n, types) for n in out) + return cast(set[T], out) + + # Hierarchy queries ---------------------------------------------------------------- + + def get_parent_f(self, filter_expr: Callable): + candidates = [p for p, _ in self.get_hierarchy() if filter_expr(p)] + if not candidates: + return None + return candidates[-1] + + def get_parent_of_type[T: Node](self, parent_type: type[T]) -> T | None: + return cast( + parent_type, self.get_parent_f(lambda p: isinstance(p, parent_type)) + ) + + def get_parent_with_trait[TR: Trait]( + self, trait: type[TR], include_self: bool = True + ): + hierarchy = self.get_hierarchy() + if not include_self: + hierarchy = hierarchy[:-1] + for parent, _ in reversed(hierarchy): + if parent.has_trait(trait): + return parent, parent.get_trait(trait) + raise KeyErrorNotFound(f"No parent with trait {trait} found") + + def get_first_child_of_type[U: Node](self, child_type: type[U]) -> U: + for level in self.get_tree(types=Node).iter_by_depth(): + for child in level: + if isinstance(child, child_type): + return child + raise KeyErrorNotFound(f"No child of type {child_type} found") + + # ---------------------------------------------------------------------------------- + def zip_children_by_name_with[N: Node]( + self, other: "Node", sub_type: type[N] + ) -> dict[str, tuple[N, N]]: + nodes = self, other + children = tuple( + Node.with_names( + n.get_children(direct_only=True, include_root=False, types=sub_type) + ) + for n in nodes + ) + return zip_dicts_by_key(*children) + + @staticmethod + def with_names[N: Node](nodes: Iterable[N]) -> dict[str, N]: + return {n.get_name(): n for n in nodes} diff --git a/src/faebryk/core/parameter.py b/src/faebryk/core/parameter.py index 3b245c5a..ea82754b 100644 --- a/src/faebryk/core/parameter.py +++ b/src/faebryk/core/parameter.py @@ -12,7 +12,7 @@ from faebryk.core.graphinterface import GraphInterface from faebryk.core.node import Node from faebryk.core.trait import Trait -from faebryk.libs.util import TwistArgs, is_type_pair, try_avoid_endless_recursion +from faebryk.libs.util import Tree, TwistArgs, is_type_pair, try_avoid_endless_recursion logger = logging.getLogger(__name__) @@ -392,10 +392,8 @@ def __str__(self) -> str: # return repr(narrowest) def get_narrowing_chain(self) -> list["Parameter"]: - from faebryk.core.util import get_direct_connected_nodes - out: list[Parameter] = [self] - narrowers = get_direct_connected_nodes(self.narrowed_by, Parameter) + narrowers = self.narrowed_by.get_connected_nodes(Parameter) if narrowers: assert len(narrowers) == 1, "Narrowing tree diverged" out += next(iter(narrowers)).get_narrowing_chain() @@ -403,12 +401,18 @@ def get_narrowing_chain(self) -> list["Parameter"]: return out def get_narrowed_siblings(self) -> set["Parameter"]: - from faebryk.core.util import get_direct_connected_nodes - - return get_direct_connected_nodes(self.narrows, Parameter) + return self.narrows.get_connected_nodes(Parameter) def __copy__(self) -> Self: return type(self)() def __deepcopy__(self, memo) -> Self: return self.__copy__() + + def get_tree_param(self, include_root: bool = True) -> Tree["Parameter"]: + out = Tree[Parameter]( + {p: p.get_tree_param() for p in self.get_narrowed_siblings()} + ) + if include_root: + out = Tree[Parameter]({self: out}) + return out diff --git a/src/faebryk/core/util.py b/src/faebryk/core/util.py index dac10cf8..cd85c33a 100644 --- a/src/faebryk/core/util.py +++ b/src/faebryk/core/util.py @@ -7,6 +7,7 @@ from typing import ( Callable, Iterable, + Sequence, cast, ) @@ -25,7 +26,7 @@ from faebryk.core.parameter import Parameter from faebryk.core.trait import Trait from faebryk.libs.units import Quantity, UnitsContainer, to_si_str -from faebryk.libs.util import NotNone, cast_assert, zip_dicts_by_key +from faebryk.libs.util import NotNone, Tree, cast_assert, zip_dicts_by_key, zip_exhaust logger = logging.getLogger(__name__) @@ -142,15 +143,6 @@ def node_projected_graph(g: Graph) -> set[Node]: return Node.get_nodes_from_gifs(g.subgraph_type(GraphInterfaceSelf)) -@deprecated("Use get_node_children_all") -def get_all_nodes(node: Node, include_root=False) -> list[Node]: - return node.get_node_children_all(include_root=include_root) - - -def get_all_modules(node: Node) -> set[Module]: - return {n for n in get_all_nodes(node) if isinstance(n, Module)} - - @deprecated("Use node_projected_graph or get_all_nodes_by/of/with") def get_all_nodes_graph(g: Graph): return node_projected_graph(g) @@ -191,96 +183,16 @@ def get_all_nodes_of_types(g: Graph, t: tuple[type[Node], ...]) -> set[Node]: return {n for n in node_projected_graph(g) if isinstance(n, t)} -def get_all_connected(gif: GraphInterface) -> list[tuple[GraphInterface, Link]]: - return list(gif.edges.items()) - - -def get_connected_mifs(gif: GraphInterface): - return set(get_connected_mifs_with_link(gif).keys()) - - -def get_connected_mifs_with_link(gif: GraphInterface): - assert isinstance(gif.node, ModuleInterface) - connections = get_all_connected(gif) - - # check if ambiguous links between mifs - assert len(connections) == len({c[0] for c in connections}) - - return { - cast_assert(ModuleInterface, s.node): link - for s, link in connections - if s.node is not gif.node - } - - def get_direct_connected_nodes[T: Node]( gif: GraphInterface, ty: type[T] = Node ) -> set[T]: out = Node.get_nodes_from_gifs( - g for g, t in get_all_connected(gif) if isinstance(t, LinkDirect) + g for g, t in gif.edges.items() if isinstance(t, LinkDirect) ) assert all(isinstance(n, ty) for n in out) return cast(set[T], out) -def get_net(mif: F.Electrical): - nets = { - net - for mif in get_connected_mifs(mif.connected) - if (net := get_parent_of_type(mif, F.Net)) is not None - } - - if not nets: - return None - - assert len(nets) == 1 - return next(iter(nets)) - - -@deprecated("Use get_node_direct_mods_or_mifs") -def get_node_direct_children(node: Node, include_mifs: bool = True): - return get_node_direct_mods_or_mifs(node, include_mifs=include_mifs) - - -def get_node_direct_mods_or_mifs(node: Node, include_mifs: bool = True): - types = (Module, ModuleInterface) if include_mifs else Module - return node.get_children(direct_only=True, types=types) - - -def get_node_tree( - node: Node, - include_mifs: bool = True, - include_root: bool = True, -) -> dict[Node, dict[Node, dict]]: - out = get_node_direct_mods_or_mifs(node, include_mifs=include_mifs) - - tree = { - n: get_node_tree(n, include_mifs=include_mifs, include_root=False) for n in out - } - - if include_root: - return {node: tree} - return tree - - -def iter_tree_by_depth(tree: dict[Node, dict]): - yield list(tree.keys()) - - # zip iterators, but if one iterators stops producing, the rest continue - def zip_exhaust(*args): - while True: - out = [next(a, None) for a in args] - out = [a for a in out if a] - if not out: - return - - yield out - - for level in zip_exhaust(*[iter_tree_by_depth(v) for v in tree.values()]): - # merge lists of parallel subtrees - yield [n for subtree in level for n in subtree] - - def get_mif_tree( obj: ModuleInterface | Module, ) -> dict[ModuleInterface, dict[ModuleInterface, dict]]: @@ -306,255 +218,6 @@ def get_name(k: ModuleInterface): return json.dumps(str_tree(tree), indent=4) -def get_param_tree(param: Parameter) -> list[tuple[Parameter, list]]: - out = [] - for p in param.get_narrowed_siblings(): - out.append((p, get_param_tree(p))) - return out - - -# -------------------------------------------------------------------------------------- - -# Connection utils --------------------------------------------------------------------- - - -def connect_interfaces_via_chain( - start: ModuleInterface, bridges: Iterable[Node], end: ModuleInterface, linkcls=None -): - last = start - for bridge in bridges: - last.connect(bridge.get_trait(F.can_bridge).get_in(), linkcls=linkcls) - last = bridge.get_trait(F.can_bridge).get_out() - last.connect(end, linkcls=linkcls) - - -def connect_all_interfaces[MIF: ModuleInterface]( - interfaces: Iterable[MIF], linkcls=None -): - interfaces = list(interfaces) - if not interfaces: - return - return connect_to_all_interfaces(interfaces[0], interfaces[1:], linkcls=linkcls) - # not needed with current connection implementation - # for i in interfaces: - # for j in interfaces: - # i.connect(j) - - -def connect_to_all_interfaces[MIF: ModuleInterface]( - source: MIF, targets: Iterable[MIF], linkcls=None -): - for i in targets: - source.connect(i, linkcls=linkcls) - return source - - -def connect_module_mifs_by_name( - src: Iterable[Module] | Module, - dst: Iterable[Module] | Module, - allow_partial: bool = False, -): - if isinstance(src, Module): - src = [src] - if isinstance(dst, Module): - dst = [dst] - - for src_, dst_ in zip(src, dst): - for k, (src_m, dst_m) in zip_children_by_name( - src_, dst_, ModuleInterface - ).items(): - if src_m is None or dst_m is None: - if not allow_partial: - raise Exception(f"Node with name {k} not present in both") - continue - src_m.connect(dst_m) - - -def reversed_bridge(bridge: Node): - class _reversed_bridge(Node): - def __init__(self) -> None: - super().__init__() - - bridge_trait = bridge.get_trait(F.can_bridge) - if_in = bridge_trait.get_in() - if_out = bridge_trait.get_out() - - self.add(F.can_bridge_defined(if_out, if_in)) - - return _reversed_bridge() - - -def zip_children_by_name[N: Node]( - node1: Node, node2: Node, sub_type: type[N] -) -> dict[str, tuple[N, N]]: - nodes = (node1, node2) - children = tuple( - with_names(n.get_children(direct_only=True, include_root=False, types=sub_type)) - for n in nodes - ) - return zip_dicts_by_key(*children) - - -# -------------------------------------------------------------------------------------- - -# Specialization ----------------------------------------------------------------------- - - -def specialize_interface[T: ModuleInterface]( - general: ModuleInterface, - special: T, -) -> T: - logger.debug(f"Specializing MIF {general} with {special}") - - # This is doing the heavy lifting - general.connect(special) - - # Establish sibling relationship - general.specialized.connect(special.specializes) - - return special - - -def specialize_module[T: Module]( - general: Module, - special: T, - matrix: list[tuple[ModuleInterface, ModuleInterface]] | None = None, - attach_to: Node | None = None, -) -> T: - logger.debug(f"Specializing Module {general} with {special}" + " " + "=" * 20) - - def get_node_prop_matrix[N: Node](sub_type: type[N]): - return list(zip_children_by_name(general, special, sub_type).values()) - - if matrix is None: - matrix = get_node_prop_matrix(ModuleInterface) - - # TODO add warning if not all src interfaces used - - param_matrix = get_node_prop_matrix(Parameter) - - for src, dst in matrix: - if src is None: - continue - if dst is None: - raise Exception(f"Special module misses interface: {src.get_name()}") - specialize_interface(src, dst) - - for src, dst in param_matrix: - if src is None: - continue - if dst is None: - raise Exception(f"Special module misses parameter: {src.get_name()}") - dst.merge(src) - - # TODO this cant work - # for t in general.traits: - # # TODO needed? - # if special.has_trait(t.trait): - # continue - # special.add(t) - - general.specialized.connect(special.specializes) - - # Attach to new parent - has_parent = special.get_parent() is not None - assert not has_parent or attach_to is None - if not has_parent: - if attach_to: - attach_to.add(special, container=attach_to.specialized) - else: - gen_parent = general.get_parent() - if gen_parent: - gen_parent[0].add(special, name=f"{gen_parent[1]}_specialized") - - return special - - -# -------------------------------------------------------------------------------------- - - -# Hierarchy queries -------------------------------------------------------------------- - - -def get_parent(node: Node, filter_expr: Callable): - candidates = [p for p, _ in node.get_hierarchy() if filter_expr(p)] - if not candidates: - return None - return candidates[-1] - - -def get_parent_of_type[T: Node](node: Node, parent_type: type[T]) -> T | None: - return cast(parent_type, get_parent(node, lambda p: isinstance(p, parent_type))) - - -def get_parent_with_trait[TR: Trait]( - node: Node, trait: type[TR], include_self: bool = True -): - hierarchy = node.get_hierarchy() - if not include_self: - hierarchy = hierarchy[:-1] - for parent, _ in reversed(hierarchy): - if parent.has_trait(trait): - return parent, parent.get_trait(trait) - raise ValueError("No parent with trait found") - - -def get_children_of_type[U: Node](node: Node, child_type: type[U]) -> list[U]: - return list(node.get_children(direct_only=False, types=child_type)) - - -def get_first_child_of_type[U: Node](node: Node, child_type: type[U]) -> U: - for level in iter_tree_by_depth(get_node_tree(node)): - for child in level: - if isinstance(child, child_type): - return child - raise ValueError("No child of type found") - - -# -------------------------------------------------------------------------------------- - -# Printing ----------------------------------------------------------------------------- - - -def pretty_params(node: Module | ModuleInterface) -> str: - params = { - NotNone(p.get_parent())[1]: p.get_most_narrow() - for p in node.get_children(direct_only=True, types=Parameter) - } - params_str = "\n".join(f"{k}: {v}" for k, v in params.items()) - - return params_str - - -def pretty_param_tree(param: Parameter) -> str: - # TODO this is def broken for actual trees - # TODO i think the repr automatically resolves - - tree = get_param_tree(param) - out = f"{param!r}" - next_levels = [tree] - while next_levels: - if any(next_levels): - out += indent("\n|\nv\n", " " * 12) - for next_level in next_levels: - for p, _ in next_level: - out += f"{p!r}" - next_levels = [ - children for next_level in next_levels for _, children in next_level - ] - - return out - - -def pretty_param_tree_top(param: Parameter) -> str: - arrow = indent("\n|\nv", prefix=" " * 12) - out = (arrow + "\n").join( - f"{param!r}| {len(param.get_narrowed_siblings())}x" - for param in param.get_narrowing_chain() - ) - return out - - # -------------------------------------------------------------------------------------- diff --git a/src/faebryk/exporters/netlist/graph.py b/src/faebryk/exporters/netlist/graph.py index ea1b3df5..f8270d91 100644 --- a/src/faebryk/exporters/netlist/graph.py +++ b/src/faebryk/exporters/netlist/graph.py @@ -91,12 +91,9 @@ def get_kicad_obj(self): def add_or_get_net(interface: F.Electrical): - from faebryk.core.util import get_connected_mifs - - mifs = get_connected_mifs(interface.connected) nets = { p[0] - for mif in mifs + for mif in interface.get_connected() if (p := mif.get_parent()) is not None and isinstance(p[0], F.Net) } if not nets: diff --git a/src/faebryk/exporters/parameters/parameters_to_file.py b/src/faebryk/exporters/parameters/parameters_to_file.py index 4e49f7ae..9c5938b2 100644 --- a/src/faebryk/exporters/parameters/parameters_to_file.py +++ b/src/faebryk/exporters/parameters/parameters_to_file.py @@ -12,14 +12,10 @@ def export_parameters_to_file(module: Module, path: Path): """Write all parameters of the given module to a file.""" - from faebryk.core.util import get_all_modules - # {module_name: [{param_name: param_value}, {param_name: param_value},...]} parameters = dict[str, list[dict[str, Parameter]]]() - for m in { - _m.get_most_special() for _m in get_all_modules(module.get_most_special()) - }: + for m in module.get_children_modules(): parameters[m.get_full_name(types=True).split(".", maxsplit=1)[-1]] = [ {param.get_full_name().split(".")[-1]: param} for param in m.get_children(direct_only=True, types=Parameter) diff --git a/src/faebryk/exporters/pcb/layout/heuristic_decoupling.py b/src/faebryk/exporters/pcb/layout/heuristic_decoupling.py index c8cbb0f0..fb288bfa 100644 --- a/src/faebryk/exporters/pcb/layout/heuristic_decoupling.py +++ b/src/faebryk/exporters/pcb/layout/heuristic_decoupling.py @@ -219,8 +219,6 @@ def __init__(self, params: Params | None = None): self._params = params or Params() def apply(self, *node: Node): - from faebryk.core.util import get_parent_of_type - # Remove nodes that have a position defined node = tuple( n @@ -230,18 +228,16 @@ def apply(self, *node: Node): for n in node: assert isinstance(n, F.Capacitor) - power = NotNone(get_parent_of_type(n, F.ElectricPower)) + power = NotNone(n.get_parent_of_type(F.ElectricPower)) place_next_to(power.hv, n, route=True, params=self._params) @staticmethod def find_module_candidates(node: Node): - from faebryk.core.util import get_parent_of_type - return node.get_children( direct_only=False, types=F.Capacitor, - f_filter=lambda c: get_parent_of_type(c, F.ElectricPower) is not None, + f_filter=lambda c: c.get_parent_of_type(F.ElectricPower) is not None, ) @classmethod diff --git a/src/faebryk/exporters/pcb/layout/heuristic_pulls.py b/src/faebryk/exporters/pcb/layout/heuristic_pulls.py index 3aaf4240..e9884ac2 100644 --- a/src/faebryk/exporters/pcb/layout/heuristic_pulls.py +++ b/src/faebryk/exporters/pcb/layout/heuristic_pulls.py @@ -20,8 +20,6 @@ def __init__(self, params: Params | None = None): self._params = params or Params() def apply(self, *node: Node): - from faebryk.core.util import get_parent_of_type - # Remove nodes that have a position defined node = tuple( n @@ -31,18 +29,16 @@ def apply(self, *node: Node): for n in node: assert isinstance(n, F.Resistor) - logic = NotNone(get_parent_of_type(n, F.ElectricLogic)) + logic = NotNone(n.get_parent_of_type(F.ElectricLogic)) place_next_to(logic.signal, n, route=True, params=self._params) @staticmethod def find_module_candidates(node: Node): - from faebryk.core.util import get_parent_of_type - return node.get_children( direct_only=False, types=F.Resistor, - f_filter=lambda c: get_parent_of_type(c, F.ElectricLogic) is not None, + f_filter=lambda c: c.get_parent_of_type(F.ElectricLogic) is not None, ) @classmethod diff --git a/src/faebryk/exporters/pcb/routing/routing.py b/src/faebryk/exporters/pcb/routing/routing.py index a91dee86..6d2d7aa0 100644 --- a/src/faebryk/exporters/pcb/routing/routing.py +++ b/src/faebryk/exporters/pcb/routing/routing.py @@ -233,10 +233,8 @@ def route_net(self, net: F.Net): arc=False, ) - def route_if_net(self, mif): - from faebryk.core.util import get_net - - net = get_net(mif) + def route_if_net(self, mif: F.Electrical): + net = mif.get_net() assert net is not None self.route_net(net) diff --git a/src/faebryk/exporters/pcb/routing/util.py b/src/faebryk/exporters/pcb/routing/util.py index fe9b6299..809cc333 100644 --- a/src/faebryk/exporters/pcb/routing/util.py +++ b/src/faebryk/exporters/pcb/routing/util.py @@ -109,9 +109,7 @@ def add(self, obj: Path.Obj): @property def net(self): - from faebryk.core.util import get_net - - net = get_net(self.net_) + net = self.net_.get_net() assert net return net @@ -177,18 +175,13 @@ def get_internal_nets_of_node( For Nets returns all connected mifs """ - from faebryk.core.util import ( - get_all_nodes, - get_connected_mifs, - get_net, - ) from faebryk.libs.util import groupby if isinstance(node, F.Net): - return {node: get_connected_mifs(node.part_of.connected)} + return {node: node.part_of.get_connected()} - mifs = {n for n in get_all_nodes(node) + [node] if isinstance(n, F.Electrical)} - nets = groupby(mifs, lambda mif: get_net(mif)) + mifs = node.get_children(include_root=True, direct_only=False, types=F.Electrical) + nets = groupby(mifs, lambda mif: mif.get_net()) return nets @@ -224,10 +217,8 @@ def group_pads_that_are_connected_already( def get_routes_of_pad(pad: F.Pad): - from faebryk.core.util import get_parent_of_type - return { route for mif in pad.pcb.get_direct_connections() - if (route := get_parent_of_type(mif, Route)) + if (route := mif.get_parent_of_type(Route)) } diff --git a/src/faebryk/exporters/visualize/interactive_graph.py b/src/faebryk/exporters/visualize/interactive_graph.py index 7726e811..5b78d9ee 100644 --- a/src/faebryk/exporters/visualize/interactive_graph.py +++ b/src/faebryk/exporters/visualize/interactive_graph.py @@ -4,9 +4,10 @@ import rich import rich.text -from faebryk.core.core import GraphInterface, Link, Node from faebryk.core.graph import Graph -from faebryk.core.util import get_all_connected +from faebryk.core.graphinterface import GraphInterface +from faebryk.core.link import Link +from faebryk.core.node import Node from faebryk.exporters.visualize.util import IDSet, generate_pastel_palette @@ -72,7 +73,7 @@ def _not_none(x): *( filter( _not_none, - (_link(link) for gif in G for _, link in get_all_connected(gif)), + (_link(link) for gif in G for link in gif.get_links()), ) ), *( diff --git a/src/faebryk/library/CBM9002A_56ILG_Reference_Design.py b/src/faebryk/library/CBM9002A_56ILG_Reference_Design.py index eeabab69..00573d34 100644 --- a/src/faebryk/library/CBM9002A_56ILG_Reference_Design.py +++ b/src/faebryk/library/CBM9002A_56ILG_Reference_Design.py @@ -48,9 +48,7 @@ class CBM9002A_56ILG_Reference_Design(Module): # ---------------------------------------- def __preinit__(self): gnd = self.vcc.lv - from faebryk.core.util import connect_module_mifs_by_name - - connect_module_mifs_by_name(self, self.mcu, allow_partial=True) + self.connect_interfaces_by_name(self.mcu, allow_partial=True) self.reset.signal.connect_via( self.reset_lowpass_cap, gnd diff --git a/src/faebryk/library/ESP32_C3.py b/src/faebryk/library/ESP32_C3.py index d18d115e..5213e3f5 100644 --- a/src/faebryk/library/ESP32_C3.py +++ b/src/faebryk/library/ESP32_C3.py @@ -36,8 +36,6 @@ class ESP32_C3(Module): ) def __preinit__(self): - from faebryk.core.util import connect_to_all_interfaces - x = self # https://www.espressif.com/sites/default/files/documentation/esp32-c3_technical_reference_manual_en.pdf#uart @@ -60,14 +58,11 @@ def __preinit__(self): ) # TODO: when configured as input # connect all grounds to eachother and power - connect_to_all_interfaces( - self.vdd3p3.lv, - [ - self.vdd3p3_cpu.lv, - self.vdd3p3_rtc.lv, - self.vdda.lv, - self.vdd_spi.lv, - ], + self.vdd3p3.lv.connect( + self.vdd3p3_cpu.lv, + self.vdd3p3_rtc.lv, + self.vdda.lv, + self.vdd_spi.lv, ) # connect decoupling caps to power domains diff --git a/src/faebryk/library/ElectricLogic.py b/src/faebryk/library/ElectricLogic.py index bc621e41..618ddb06 100644 --- a/src/faebryk/library/ElectricLogic.py +++ b/src/faebryk/library/ElectricLogic.py @@ -150,17 +150,12 @@ def set_weak(self, on: bool): @staticmethod def connect_all_references(ifs: Iterable["ElectricLogic"]) -> F.ElectricPower: - from faebryk.core.util import connect_all_interfaces - - out = connect_all_interfaces([x.reference for x in ifs]) - assert out - return out + return F.ElectricPower.connect(*[x.reference for x in ifs]) @staticmethod def connect_all_node_references( nodes: Iterable[Node], gnd_only=False ) -> F.ElectricPower: - from faebryk.core.util import connect_all_interfaces # TODO check if any child contains ElectricLogic which is not connected # e.g find them in graph and check if any has parent without "single reference" @@ -172,10 +167,10 @@ def connect_all_node_references( assert refs if gnd_only: - connect_all_interfaces({r.lv for r in refs}) + F.Electrical.connect(*{r.lv for r in refs}) return next(iter(refs)) - connect_all_interfaces(refs) + F.ElectricPower.connect(*refs) return next(iter(refs)) @classmethod diff --git a/src/faebryk/library/ElectricLogicGate.py b/src/faebryk/library/ElectricLogicGate.py index 8a46be26..7c4b80bf 100644 --- a/src/faebryk/library/ElectricLogicGate.py +++ b/src/faebryk/library/ElectricLogicGate.py @@ -19,14 +19,12 @@ def __init__( super().__init__(input_cnt, output_cnt, *functions) def __preinit__(self): - from faebryk.core.util import specialize_interface - self_logic = self for in_if_l, in_if_el in zip(self_logic.inputs, self.inputs): - specialize_interface(in_if_l, in_if_el) + in_if_l.specialize(in_if_el) for out_if_l, out_if_el in zip(self_logic.outputs, self.outputs): - specialize_interface(out_if_l, out_if_el) + out_if_l.specialize(out_if_el) @L.rt_field def inputs(self): diff --git a/src/faebryk/library/Electrical.py b/src/faebryk/library/Electrical.py index fb2046f0..de9b2c73 100644 --- a/src/faebryk/library/Electrical.py +++ b/src/faebryk/library/Electrical.py @@ -8,3 +8,16 @@ class Electrical(ModuleInterface): potential: F.TBD[Quantity] + + def get_net(self): + nets = { + net + for mif in self.get_connected() + if (net := mif.get_parent_of_type(F.Net)) is not None + } + + if not nets: + return None + + assert len(nets) == 1 + return next(iter(nets)) diff --git a/src/faebryk/library/Footprint.py b/src/faebryk/library/Footprint.py index f53ba3f5..5d5a606c 100644 --- a/src/faebryk/library/Footprint.py +++ b/src/faebryk/library/Footprint.py @@ -16,7 +16,5 @@ class TraitT(Trait): ... def get_footprint_of_parent( intf: ModuleInterface, ) -> "tuple[Node, Footprint]": - from faebryk.core.util import get_parent_with_trait - - parent, trait = get_parent_with_trait(intf, F.has_footprint) + parent, trait = intf.get_parent_with_trait(F.has_footprint) return parent, trait.get_footprint() diff --git a/src/faebryk/library/Net.py b/src/faebryk/library/Net.py index f08089d6..cecc387d 100644 --- a/src/faebryk/library/Net.py +++ b/src/faebryk/library/Net.py @@ -42,22 +42,18 @@ def get_name(_self): return _() def get_fps(self): - from faebryk.core.util import get_parent_of_type - return { pad: fp for mif in self.get_connected_interfaces() - if (fp := get_parent_of_type(mif, F.Footprint)) is not None - and (pad := get_parent_of_type(mif, F.Pad)) is not None + if (fp := mif.get_parent_of_type(F.Footprint)) is not None + and (pad := mif.get_parent_of_type(F.Pad)) is not None } # TODO should this be here? def get_connected_interfaces(self): - from faebryk.core.util import get_connected_mifs - return { mif - for mif in get_connected_mifs(self.part_of.connected) + for mif in self.part_of.get_connected() if isinstance(mif, type(self.part_of)) } diff --git a/src/faebryk/library/Pad.py b/src/faebryk/library/Pad.py index 4b44f00a..84e83d5b 100644 --- a/src/faebryk/library/Pad.py +++ b/src/faebryk/library/Pad.py @@ -4,6 +4,7 @@ import faebryk.library._F as F from faebryk.core.moduleinterface import ModuleInterface +from faebryk.libs.util import NotNone class Pad(ModuleInterface): @@ -42,8 +43,4 @@ def find_pad_for_intf_with_parent_that_has_footprint( return pads def get_fp(self) -> F.Footprint: - from faebryk.core.util import get_parent_of_type - - fp = get_parent_of_type(self, F.Footprint) - assert fp - return fp + return NotNone(self.get_parent_of_type(F.Footprint)) diff --git a/src/faebryk/library/Powered_Relay.py b/src/faebryk/library/Powered_Relay.py index 91dca3fe..2e3f070e 100644 --- a/src/faebryk/library/Powered_Relay.py +++ b/src/faebryk/library/Powered_Relay.py @@ -26,9 +26,7 @@ class Powered_Relay(Module): power: F.ElectricPower def __preinit__(self): - from faebryk.core.util import connect_module_mifs_by_name - - connect_module_mifs_by_name(self, self.relay, allow_partial=True) + self.connect_interfaces_by_name(self.relay, allow_partial=True) self.relay_driver.power_in.connect(self.power) self.relay_driver.logic_in.connect(self.enable) diff --git a/src/faebryk/library/USB_C_PSU_Vertical.py b/src/faebryk/library/USB_C_PSU_Vertical.py index 02626e07..9705ba48 100644 --- a/src/faebryk/library/USB_C_PSU_Vertical.py +++ b/src/faebryk/library/USB_C_PSU_Vertical.py @@ -22,8 +22,6 @@ class USB_C_PSU_Vertical(Module): fuse: F.Fuse def __preinit__(self): - from faebryk.core.util import connect_all_interfaces - self.gnd_capacitor.capacitance.merge(100 * P.nF) self.gnd_capacitor.rated_voltage.merge(16 * P.V) self.gnd_resistor.resistance.merge(1 * P.Mohm) @@ -44,12 +42,9 @@ def __preinit__(self): v5.connect(self.esd.usb[0].usb_if.buspower) # connect usb data - connect_all_interfaces( - [ - self.usb_connector.usb.usb_if.d, - self.usb.usb_if.d, - self.esd.usb[0].usb_if.d, - ] + self.usb.usb_if.d.connect( + self.usb_connector.usb.usb_if.d, + self.esd.usb[0].usb_if.d, ) # configure as ufp with 5V@max3A diff --git a/src/faebryk/library/has_pcb_routing_strategy_manual.py b/src/faebryk/library/has_pcb_routing_strategy_manual.py index 317706f6..60516e8d 100644 --- a/src/faebryk/library/has_pcb_routing_strategy_manual.py +++ b/src/faebryk/library/has_pcb_routing_strategy_manual.py @@ -31,15 +31,13 @@ def __init__( self.absolute = absolute def calculate(self, transformer: PCB_Transformer): - from faebryk.core.util import get_parent_with_trait - node = self.obj nets = get_internal_nets_of_node(node) relative_to = (self.relative_to or self.obj) if not self.absolute else None if relative_to: - pos = get_parent_with_trait(relative_to, F.has_pcb_position)[ + pos = relative_to.get_parent_with_trait(F.has_pcb_position)[ 1 ].get_position() for _, path in self.paths_rel: diff --git a/src/faebryk/libs/app/manufacturing.py b/src/faebryk/libs/app/manufacturing.py index a3070dda..b88b4676 100644 --- a/src/faebryk/libs/app/manufacturing.py +++ b/src/faebryk/libs/app/manufacturing.py @@ -5,7 +5,6 @@ from pathlib import Path from faebryk.core.module import Module -from faebryk.core.util import get_all_modules from faebryk.exporters.bom.jlcpcb import write_bom_jlcpcb from faebryk.exporters.pcb.kicad.artifacts import ( export_dxf, @@ -27,7 +26,7 @@ def export_pcba_artifacts(out: Path, pcb_path: Path, app: Module): logger.info("Exporting PCBA artifacts") - write_bom_jlcpcb(get_all_modules(app), out.joinpath("jlcpcb_bom.csv")) + write_bom_jlcpcb(app.get_children_modules(), out.joinpath("jlcpcb_bom.csv")) export_step(pcb_path, step_file=cad_path.joinpath("pcba.step")) export_glb(pcb_path, glb_file=cad_path.joinpath("pcba.glb")) export_dxf(pcb_path, dxf_file=cad_path.joinpath("pcba.dxf")) diff --git a/src/faebryk/libs/app/parameters.py b/src/faebryk/libs/app/parameters.py index 4af45c73..c8569520 100644 --- a/src/faebryk/libs/app/parameters.py +++ b/src/faebryk/libs/app/parameters.py @@ -17,8 +17,6 @@ def replace_tbd_with_any(module: Module, recursive: bool, loglvl: int | None = N :param module: The module to replace F.TBD instances in. :param recursive: If True, replace F.TBD instances in submodules as well. """ - from faebryk.core.util import get_all_modules - lvl = logger.getEffectiveLevel() if loglvl is not None: logger.setLevel(loglvl) @@ -33,5 +31,5 @@ def replace_tbd_with_any(module: Module, recursive: bool, loglvl: int | None = N logger.setLevel(lvl) if recursive: - for m in {_m.get_most_special() for _m in get_all_modules(module)}: + for m in module.get_children_modules(): replace_tbd_with_any(m, recursive=False, loglvl=loglvl) diff --git a/src/faebryk/libs/app/pcb.py b/src/faebryk/libs/app/pcb.py index dbd302d7..dc0b36f1 100644 --- a/src/faebryk/libs/app/pcb.py +++ b/src/faebryk/libs/app/pcb.py @@ -10,7 +10,6 @@ import faebryk.library._F as F from faebryk.core.graph import Graph from faebryk.core.module import Module -from faebryk.core.util import get_node_tree, iter_tree_by_depth from faebryk.exporters.pcb.kicad.transformer import PCB_Transformer from faebryk.exporters.pcb.routing.util import apply_route_in_pcb from faebryk.libs.app.kicad_netlist import write_netlist @@ -31,8 +30,7 @@ def apply_layouts(app: Module): ) ) - tree = get_node_tree(app) - for level in iter_tree_by_depth(tree): + for level in app.get_tree(types=Module).iter_by_depth(): for n in level: if n.has_trait(F.has_pcb_layout): n.get_trait(F.has_pcb_layout).apply() @@ -41,8 +39,7 @@ def apply_layouts(app: Module): def apply_routing(app: Module, transformer: PCB_Transformer): strategies: list[tuple[F.has_pcb_routing_strategy, int]] = [] - tree = get_node_tree(app) - for i, level in enumerate(list(iter_tree_by_depth(tree))): + for i, level in enumerate(app.get_tree(types=Module).iter_by_depth()): for n in level: if not n.has_trait(F.has_pcb_routing_strategy): continue diff --git a/src/faebryk/libs/examples/buildutil.py b/src/faebryk/libs/examples/buildutil.py index b86c64f6..ded6cd7f 100644 --- a/src/faebryk/libs/examples/buildutil.py +++ b/src/faebryk/libs/examples/buildutil.py @@ -7,7 +7,6 @@ import faebryk.libs.picker.lcsc as lcsc from faebryk.core.module import Module -from faebryk.core.util import get_all_modules from faebryk.libs.app.checks import run_checks from faebryk.libs.app.parameters import replace_tbd_with_any from faebryk.libs.app.pcb import apply_design @@ -52,7 +51,7 @@ def apply_design_to_pcb(m: Module): # TODO this can be prettier # picking ---------------------------------------------------------------- - modules = {n.get_most_special() for n in get_all_modules(m)} + modules = m.get_children_modules() try: JLCPCB_DB() for n in modules: diff --git a/src/faebryk/libs/examples/pickers.py b/src/faebryk/libs/examples/pickers.py index f211b18c..ad59baa4 100644 --- a/src/faebryk/libs/examples/pickers.py +++ b/src/faebryk/libs/examples/pickers.py @@ -10,7 +10,6 @@ import faebryk.library._F as F from faebryk.core.module import Module -from faebryk.core.util import specialize_module from faebryk.libs.app.parameters import replace_tbd_with_any from faebryk.libs.picker.lcsc import LCSC_Part from faebryk.libs.picker.picker import PickerOption, pick_module_by_params @@ -239,7 +238,7 @@ def pick_battery(module: F.Battery): if not isinstance(module, F.ButtonCell): bcell = F.ButtonCell() replace_tbd_with_any(bcell, recursive=False) - specialize_module(module, bcell) + module.specialize(bcell) bcell.add( F.has_multi_picker(0, F.has_multi_picker.FunctionPicker(pick_battery)) ) diff --git a/src/faebryk/libs/picker/jlcpcb/jlcpcb.py b/src/faebryk/libs/picker/jlcpcb/jlcpcb.py index 630b17b8..0ddfddeb 100644 --- a/src/faebryk/libs/picker/jlcpcb/jlcpcb.py +++ b/src/faebryk/libs/picker/jlcpcb/jlcpcb.py @@ -23,7 +23,6 @@ import faebryk.library._F as F from faebryk.core.module import Module from faebryk.core.parameter import Parameter -from faebryk.core.util import pretty_param_tree, pretty_params from faebryk.libs.e_series import ( E_SERIES_VALUES, ParamNotResolvedError, @@ -362,7 +361,7 @@ def attach( logger.debug( f"Attached component {self.partno} to module {module}: \n" f"{indent(str(params), ' '*4)}\n--->\n" - f"{indent(pretty_params(module), ' '*4)}" + f"{indent(module.pretty_params(), ' '*4)}" ) @@ -410,7 +409,7 @@ def filter_by_value( if logger.isEnabledFor(logging.DEBUG): logger.debug( - f"Filtering by value:\n{indent(pretty_param_tree(value), ' '*4)}" + f"Filtering by value:\n{indent(value.get_tree_param().pretty(), ' '*4)}" ) if isinstance(value, F.ANY): diff --git a/src/faebryk/libs/picker/picker.py b/src/faebryk/libs/picker/picker.py index 166eeec8..8321a3a4 100644 --- a/src/faebryk/libs/picker/picker.py +++ b/src/faebryk/libs/picker/picker.py @@ -87,8 +87,6 @@ def get_all_children(self): class PickErrorParams(PickError): def __init__(self, module: Module, options: list[PickerOption]): - from faebryk.core.util import pretty_params - self.options = options MAX = 5 @@ -101,7 +99,7 @@ def __init__(self, module: Module, options: list[PickerOption]): message = ( f"Could not find part for {module}" - f"\nwith params:\n{indent(pretty_params(module), ' '*4)}" + f"\nwith params:\n{indent(module.pretty_params(), ' '*4)}" f"\nin options:\n {indent(options_str, ' '*4)}" ) super().__init__(message, module) @@ -182,7 +180,7 @@ def pick_module_by_params(module: Module, options: Iterable[PickerOption]): def _get_mif_top_level_modules(mif: ModuleInterface) -> set[Module]: - return mif.get_children(direct_only=True, types=Module) | { + return Module.get_children_modules(mif, direct_only=True) | { m for nmif in mif.get_children(direct_only=True, types=ModuleInterface) for m in _get_mif_top_level_modules(nmif) @@ -194,18 +192,18 @@ def __init__(self): self.progress = Progress() self.task = self.progress.add_task("Picking", total=1) + @staticmethod + def _get_total(module: Module): + return len(module.get_children_modules()) + @classmethod def from_module(cls, module: Module) -> "PickerProgress": self = cls() - from faebryk.core.util import get_all_modules - - self.progress.update(self.task, total=len(get_all_modules(module))) + self.progress.update(self.task, total=cls._get_total(module)) return self def advance(self, module: Module): - from faebryk.core.util import get_all_modules - - self.progress.advance(self.task, len(get_all_modules(module))) + self.progress.advance(self.task, self._get_total(module)) @contextmanager def context(self): @@ -248,7 +246,7 @@ def get_not_picked(m: Module): if m.has_trait(has_part_picked): return out - children = m.get_children(direct_only=True, types=Module) + children = m.get_children_modules(direct_only=True) if not children: return out + [m] @@ -282,7 +280,7 @@ def _pick_part_recursively(module: Module, progress: PickerProgress | None = Non # if no children, raise # This whole logic will be so much easier if the recursive # picker is just a normal picker - if not module.get_children(direct_only=True, types=Module): + if not module.get_children_modules(direct_only=True): raise e if module.has_trait(has_part_picked): diff --git a/src/faebryk/libs/util.py b/src/faebryk/libs/util.py index 26b3d7a8..2a2cd7c4 100644 --- a/src/faebryk/libs/util.py +++ b/src/faebryk/libs/util.py @@ -18,6 +18,7 @@ List, Optional, Self, + Sequence, SupportsFloat, SupportsInt, Type, @@ -302,8 +303,8 @@ def round_str(value: SupportsFloat, n=8): return str(f).rstrip("0").rstrip(".") -def _print_stack(stack): - from colorama import Fore +def _print_stack(stack) -> Iterable[str]: + from rich.text import Text for frame_info in stack: frame = frame_info[0] @@ -313,29 +314,41 @@ def _print_stack(stack): continue # if frame_info.function not in ["_connect_across_hierarchies"]: # continue - yield ( - f"{Fore.RED} Frame in {frame_info.filename} at line {frame_info.lineno}:" - f"{Fore.BLUE} {frame_info.function} {Fore.RESET}" + yield str( + Text.assemble( + ( + f" Frame in {frame_info.filename} at line {frame_info.lineno}:", + "red", + ), + (f" {frame_info.function} ", "blue"), + ) ) def pretty_val(value): if isinstance(value, dict): import pprint - return ( - ("\n" if len(value) > 1 else "") - + pprint.pformat( - {pretty_val(k): pretty_val(v) for k, v in value.items()}, - indent=2, - width=120, - ) - ).replace("\n", f"\n {Fore.RESET}") + formatted = pprint.pformat( + {pretty_val(k): pretty_val(v) for k, v in value.items()}, + indent=2, + width=120, + ) + return ("\n" if len(value) > 1 else "") + indent( + str(Text(formatted)), " " * 4 + ) elif isinstance(value, type): return f"" - return value + return str(value) for name, value in frame.f_locals.items(): - yield f" {Fore.GREEN}{name}{Fore.RESET} = {pretty_val(value)}" + yield str( + Text.assemble( + (" ", ""), + (f"{name}", "green"), + (" = ", ""), + (pretty_val(value), ""), + ) + ) def print_stack(stack): @@ -791,3 +804,52 @@ def __call__(self, *args: P.args, **kwds: P.kwargs) -> Any: return result return _once() + + +class PostInitCaller(type): + def __call__(cls, *args, **kwargs): + obj = type.__call__(cls, *args, **kwargs) + obj.__post_init__(*args, **kwargs) + return obj + + +class Tree[T](dict[T, "Tree[T]"]): + def iter_by_depth(self) -> Iterable[Sequence[T]]: + yield list(self.keys()) + + for level in zip_exhaust(*[v.iter_by_depth() for v in self.values()]): + # merge lists of parallel subtrees + yield [n for subtree in level for n in subtree] + + def pretty(self) -> str: + # TODO this is def broken for actual trees + + out = "" + next_levels = [self] + while next_levels: + if any(next_levels): + out += indent("\n|\nv\n", " " * 12) + for next_level in next_levels: + for p, _ in next_level.items(): + out += f"{p!r}" + next_levels = [ + children + for next_level in next_levels + for _, children in next_level.items() + ] + + return out + + +# type Tree[T] = dict[T, "Tree[T]"] + + +# zip iterators, but if one iterators stops producing, the rest continue +def zip_exhaust(*args): + while True: + out = [next(a, None) for a in args] + out = [a for a in out if a] + if not out: + return + + yield out diff --git a/test/core/test_hierarchy_connect.py b/test/core/test_hierarchy_connect.py index 3df37346..fbdf2b59 100644 --- a/test/core/test_hierarchy_connect.py +++ b/test/core/test_hierarchy_connect.py @@ -10,7 +10,6 @@ from faebryk.core.link import LinkDirect, LinkDirectShallow, _TLinkDirectShallow from faebryk.core.module import Module from faebryk.core.moduleinterface import ModuleInterface -from faebryk.core.util import specialize_interface from faebryk.libs.library import L from faebryk.libs.util import print_stack, times @@ -200,8 +199,8 @@ class Specialized(ModuleInterface): ... mifs[0].connect(mifs[1]) mifs[1].connect(mifs[2]) - specialize_interface(mifs[0], mifs_special[0]) - specialize_interface(mifs[2], mifs_special[2]) + mifs[0].specialize(mifs_special[0]) + mifs[2].specialize(mifs_special[2]) self.assertTrue(mifs_special[0].is_connected_to(mifs_special[2])) @@ -212,8 +211,8 @@ class Specialized(ModuleInterface): ... mifs_special[0].connect(mifs_special[1]) mifs_special[1].connect(mifs_special[2]) - specialize_interface(mifs[0], mifs_special[0]) - specialize_interface(mifs[2], mifs_special[2]) + mifs[0].specialize(mifs_special[0]) + mifs[2].specialize(mifs_special[2]) self.assertTrue(mifs[0].is_connected_to(mifs[2])) @@ -226,8 +225,8 @@ class _Link(LinkDirectShallow(lambda link, gif: True)): ... mifs[0].connect(mifs[1], linkcls=_Link) mifs[1].connect(mifs[2]) - specialize_interface(mifs[0], mifs_special[0]) - specialize_interface(mifs[2], mifs_special[2]) + mifs[0].specialize(mifs_special[0]) + mifs[2].specialize(mifs_special[2]) self.assertIsInstance(mifs_special[0].is_connected_to(mifs_special[2]), _Link) diff --git a/test/core/test_parameters.py b/test/core/test_parameters.py index fd8fbd9c..9db7050d 100644 --- a/test/core/test_parameters.py +++ b/test/core/test_parameters.py @@ -9,7 +9,6 @@ from faebryk.core.core import logger as core_logger from faebryk.core.module import Module from faebryk.core.parameter import Parameter -from faebryk.core.util import specialize_module from faebryk.libs.units import P logger = logging.getLogger(__name__) @@ -342,7 +341,7 @@ def __preinit__(self) -> None: app = App() - bcell = specialize_module(app.battery, F.ButtonCell()) + bcell = app.battery.specialize(F.ButtonCell()) bcell.voltage.merge(3 * P.V) bcell.capacity.merge(F.Range.from_center(225 * P.mAh, 50 * P.mAh)) bcell.material.merge(F.ButtonCell.Material.Lithium) diff --git a/test/core/test_performance.py b/test/core/test_performance.py index e4883167..2564a5c0 100644 --- a/test/core/test_performance.py +++ b/test/core/test_performance.py @@ -12,6 +12,7 @@ from faebryk.core.graphinterface import GraphInterface from faebryk.core.module import Module from faebryk.core.moduleinterface import ModuleInterface +from faebryk.core.node import Node from faebryk.libs.library import L from faebryk.libs.util import times @@ -69,9 +70,7 @@ def __init__(self, timings: Times) -> None: def __preinit__(self): self._timings.add("setup") - core_util.connect_all_interfaces( - r.unnamed[0] for r in self.resistors - ) + F.Electrical.connect(*(r.unnamed[0] for r in self.resistors)) self._timings.add("connect") return App @@ -95,18 +94,19 @@ def _common_timings( timings.add("get_all_nodes_graph") for n in [app, app.resistors[0]]: + assert isinstance(n, Module) name = type(n).__name__[0] - n.get_node_children_all() + n.get_children(direct_only=False, types=Node) timings.add(f"get_node_children_all {name}") - core_util.get_node_tree(n) + n.get_tree(types=Node) timings.add(f"get_node_tree {name}") core_util.get_mif_tree(n) timings.add(f"get_mif_tree {name}") - core_util.get_node_direct_mods_or_mifs(n) + n.get_children(direct_only=True, types=Node) timings.add(f"get_module_direct_children {name}") n.get_children(direct_only=True, types=ModuleInterface) diff --git a/test/core/test_util.py b/test/core/test_util.py index 7b10f1fe..60f47941 100644 --- a/test/core/test_util.py +++ b/test/core/test_util.py @@ -2,11 +2,11 @@ # SPDX-License-Identifier: MIT import unittest +from typing import Sequence from faebryk.core.module import Module from faebryk.core.moduleinterface import ModuleInterface from faebryk.core.node import Node -from faebryk.core.util import get_node_tree, iter_tree_by_depth from faebryk.libs.library import L @@ -29,13 +29,13 @@ def __init__(self, depth: int): level_count = 5 n = N(level_count) - def assertEqual(n1: list[Node], n2: list[Node]): + def assertEqual(n1: Sequence[Node], n2: Sequence[Node]): n1s = list(sorted(n1, key=id)) n2s = list(sorted(n2, key=id)) self.assertEqual(n1s, n2s) - tree = get_node_tree(n) - levels = list(iter_tree_by_depth(tree)) + tree = n.get_tree(types=(Module, ModuleInterface)) + levels = list(tree.iter_by_depth()) print(tree) for i, le in enumerate(levels): print(i, le)