From 41ca94816d284490b1a57b48c1c92ab369bd1b76 Mon Sep 17 00:00:00 2001 From: iopapamanoglou Date: Wed, 4 Sep 2024 15:15:26 +0200 Subject: [PATCH] WIP Move graph funcs --- examples/iterative_design_nand.py | 3 +- src/faebryk/core/graphinterface.py | 40 ++++- src/faebryk/core/util.py | 147 ------------------ src/faebryk/exporters/esphome/esphome.py | 4 +- src/faebryk/exporters/netlist/graph.py | 3 +- src/faebryk/exporters/netlist/netlist.py | 5 +- .../exporters/pcb/kicad/transformer.py | 10 +- src/faebryk/exporters/pcb/routing/routing.py | 4 +- src/faebryk/libs/app/designators.py | 12 +- src/faebryk/libs/app/erc.py | 10 +- test/core/test_performance.py | 2 +- 11 files changed, 55 insertions(+), 185 deletions(-) delete mode 100644 src/faebryk/core/util.py diff --git a/examples/iterative_design_nand.py b/examples/iterative_design_nand.py index 91038587..d510dd78 100644 --- a/examples/iterative_design_nand.py +++ b/examples/iterative_design_nand.py @@ -18,7 +18,6 @@ import faebryk.library._F as F from faebryk.core.module import Module -from faebryk.core.util import get_all_nodes_with_trait from faebryk.libs.brightness import TypicalLuminousIntensity from faebryk.libs.examples.buildutil import apply_design_to_pcb from faebryk.libs.library import L @@ -120,7 +119,7 @@ def App(): app.add(c) # parametrizing - for _, t in get_all_nodes_with_trait(app.get_graph(), F.ElectricLogic.has_pulls): + for _, t in app.get_graph().nodes_with_trait(F.ElectricLogic.has_pulls): for pull_resistor in (r for r in t.get_pulls() if r): pull_resistor.resistance.merge(100 * P.kohm) power_source.power.voltage.merge(3 * P.V) diff --git a/src/faebryk/core/graphinterface.py b/src/faebryk/core/graphinterface.py index 742f16cd..cc0655cb 100644 --- a/src/faebryk/core/graphinterface.py +++ b/src/faebryk/core/graphinterface.py @@ -1,7 +1,7 @@ # This file is part of the faebryk project # SPDX-License-Identifier: MIT import logging -from typing import TYPE_CHECKING, Mapping, Optional +from typing import TYPE_CHECKING, Iterable, Mapping, Optional from typing_extensions import Self, deprecated @@ -15,11 +15,47 @@ if TYPE_CHECKING: from faebryk.core.node import Node + from faebryk.core.trait import Trait logger = logging.getLogger(__name__) -class Graph(GraphImpl["GraphInterface"]): ... +class Graph(GraphImpl["GraphInterface"]): + # Make all kinds of graph filtering functions so we can optimize them in the future + # Avoid letting user query all graph nodes always because quickly very slow + + def node_projection(self) -> set["Node"]: + return Node.get_nodes_from_gifs(self.subgraph_type(GraphInterfaceSelf)) + + def nodes_with_trait[T: "Trait"](self, trait: type[T]) -> list[tuple[Node, T]]: + return [ + (n, n.get_trait(trait)) + for n in self.node_projection() + if n.has_trait(trait) + ] + + # Waiting for python to add support for type mapping + def nodes_with_traits[*Ts]( + self, traits: tuple[*Ts] + ): # -> list[tuple[Node, tuple[*Ts]]]: + return [ + (n, tuple(n.get_trait(trait) for trait in traits)) + for n in self.node_projection() + if all(n.has_trait(trait) for trait in traits) + ] + + def nodes_by_names(self, names: Iterable[str]) -> list[tuple[Node, str]]: + return [ + (n, node_name) + for n in self.node_projection() + if (node_name := n.get_full_name()) in names + ] + + def nodes_of_type[T: Node](self, t: type[T]) -> set[T]: + return {n for n in self.node_projection() if isinstance(n, t)} + + def nodes_of_types(self, t: tuple[type[Node], ...]) -> set[Node]: + return {n for n in self.node_projection() if isinstance(n, t)} class GraphInterface(FaebrykLibObject): diff --git a/src/faebryk/core/util.py b/src/faebryk/core/util.py deleted file mode 100644 index aa5b8309..00000000 --- a/src/faebryk/core/util.py +++ /dev/null @@ -1,147 +0,0 @@ -# This file is part of the faebryk project -# SPDX-License-Identifier: MIT - -import logging -from typing import Iterable - -from typing_extensions import deprecated - -import faebryk.library._F as F -from faebryk.core.graphinterface import Graph, GraphInterfaceSelf -from faebryk.core.moduleinterface import ModuleInterface -from faebryk.core.node import Node -from faebryk.core.trait import Trait - -logger = logging.getLogger(__name__) - -# Graph Querying ----------------------------------------------------------------------- - - -# Make all kinds of graph filtering functions so we can optimize them in the future -# Avoid letting user query all graph nodes always because quickly very slow - - -def node_projected_graph(g: Graph) -> set[Node]: - """ - Don't call this directly, use get_all_nodes_by/of/with instead - """ - return Node.get_nodes_from_gifs(g.subgraph_type(GraphInterfaceSelf)) - - -@deprecated("Use node_projected_graph or get_all_nodes_by/of/with") -def get_all_nodes_graph(g: Graph): - return node_projected_graph(g) - - -def get_all_nodes_with_trait[T: Trait]( - g: Graph, trait: type[T] -) -> list[tuple[Node, T]]: - return [ - (n, n.get_trait(trait)) for n in node_projected_graph(g) if n.has_trait(trait) - ] - - -# Waiting for python to add support for type mapping -def get_all_nodes_with_traits[*Ts]( - g: Graph, traits: tuple[*Ts] -): # -> list[tuple[Node, tuple[*Ts]]]: - return [ - (n, tuple(n.get_trait(trait) for trait in traits)) - for n in node_projected_graph(g) - if all(n.has_trait(trait) for trait in traits) - ] - - -def get_all_nodes_by_names(g: Graph, names: Iterable[str]) -> list[tuple[Node, str]]: - return [ - (n, node_name) - for n in node_projected_graph(g) - if (node_name := n.get_full_name()) in names - ] - - -def get_all_nodes_of_type[T: Node](g: Graph, t: type[T]) -> set[T]: - return {n for n in node_projected_graph(g) if isinstance(n, t)} - - -def get_all_nodes_of_types(g: Graph, t: tuple[type[Node], ...]) -> set[Node]: - return {n for n in node_projected_graph(g) if isinstance(n, t)} - - -# -------------------------------------------------------------------------------------- - - -def use_interface_names_as_net_names(node: Node, name: str | None = None): - if not name: - p = node.get_parent() - assert p - name = p[1] - - name_prefix = node.get_full_name() - - el_ifs = node.get_children(types=F.Electrical, direct_only=False) - - # for el_if in el_ifs: - # print(el_if) - # print("=" * 80) - - # performance - resolved: set[ModuleInterface] = set() - - # get representative interfaces that determine the name of the Net - to_use: set[F.Electrical] = set() - for el_if in el_ifs: - # performance - if el_if in resolved: - continue - - connections = el_if.get_direct_connections() | {el_if} - - # skip ifs with Nets - if matched_nets := { # noqa: F841 - n - for c in connections - if (p := c.get_parent()) - and isinstance(n := p[0], F.Net) - and n.part_of in connections - }: - # logger.warning(f"Skipped, attached to Net: {el_if}: {matched_nets!r}") - resolved.update(connections) - continue - - group = {mif for mif in connections if mif in el_ifs} - - # heuristic: choose shortest name - picked = min(group, key=lambda x: len(x.get_full_name())) - to_use.add(picked) - - # for _el_if in group: - # print(_el_if if _el_if is not picked else f"{_el_if} <-") - # print("-" * 80) - - # performance - resolved.update(group) - - nets: dict[str, tuple[F.Net, F.Electrical]] = {} - for el_if in to_use: - net_name = f"{name}{el_if.get_full_name().removeprefix(name_prefix)}" - - # name collision - if net_name in nets: - net, other_el = nets[net_name] - raise Exception( - f"{el_if} resolves to {net_name} but not connected" - + f"\nwhile attaching nets to {node}={name} (connected via {other_el})" - + "\n" - + "\nConnections\n\t" - + "\n\t".join(map(str, el_if.get_direct_connections())) - + f"\n{'-'*80}" - + "\nNet Connections\n\t" - + "\n\t".join(map(str, net.part_of.get_direct_connections())) - ) - - net = F.Net() - net.add(F.has_overriden_name_defined(net_name)) - net.part_of.connect(el_if) - logger.debug(f"Created {net_name} for {el_if}") - nets[net_name] = net, el_if diff --git a/src/faebryk/exporters/esphome/esphome.py b/src/faebryk/exporters/esphome/esphome.py index 4496f9f4..e33cacfe 100644 --- a/src/faebryk/exporters/esphome/esphome.py +++ b/src/faebryk/exporters/esphome/esphome.py @@ -53,9 +53,7 @@ def merge_dicts(*dicts: dict) -> dict: def make_esphome_config(G: Graph) -> dict: - from faebryk.core.util import get_all_nodes_with_trait - - esphome_components = get_all_nodes_with_trait(G, F.has_esphome_config) + esphome_components = G.nodes_with_trait(F.has_esphome_config) esphome_config = merge_dicts(*[t.get_config() for _, t in esphome_components]) diff --git a/src/faebryk/exporters/netlist/graph.py b/src/faebryk/exporters/netlist/graph.py index f8270d91..a67c8e18 100644 --- a/src/faebryk/exporters/netlist/graph.py +++ b/src/faebryk/exporters/netlist/graph.py @@ -106,7 +106,6 @@ def add_or_get_net(interface: F.Electrical): def attach_nets_and_kicad_info(g: Graph): - from faebryk.core.util import get_all_nodes_with_trait # g has to be closed Gclosed = g @@ -116,7 +115,7 @@ def attach_nets_and_kicad_info(g: Graph): n: t.get_footprint() # TODO maybe nicer to just look for footprints # and get their respective components instead - for n, t in get_all_nodes_with_trait(Gclosed, F.has_footprint) + for n, t in Gclosed.nodes_with_trait(F.has_footprint) if isinstance(n, Module) } diff --git a/src/faebryk/exporters/netlist/netlist.py b/src/faebryk/exporters/netlist/netlist.py index 1e39a40c..149b2215 100644 --- a/src/faebryk/exporters/netlist/netlist.py +++ b/src/faebryk/exporters/netlist/netlist.py @@ -36,10 +36,9 @@ class Vertex: def make_t2_netlist_from_graph(G: Graph) -> T2Netlist: - from faebryk.core.util import get_all_nodes_of_type, get_all_nodes_with_trait from faebryk.exporters.netlist.graph import can_represent_kicad_footprint - nets = get_all_nodes_of_type(G, F.Net) + nets = G.nodes_of_type(F.Net) t2_nets = [ T2Netlist.Net( @@ -61,7 +60,7 @@ def make_t2_netlist_from_graph(G: Graph) -> T2Netlist: comps = { t.get_footprint().get_trait(can_represent_kicad_footprint).get_kicad_obj() - for _, t in get_all_nodes_with_trait(G, F.has_footprint) + for _, t in G.nodes_with_trait(F.has_footprint) } not_found = [ diff --git a/src/faebryk/exporters/pcb/kicad/transformer.py b/src/faebryk/exporters/pcb/kicad/transformer.py index 8411b1aa..7012b9ad 100644 --- a/src/faebryk/exporters/pcb/kicad/transformer.py +++ b/src/faebryk/exporters/pcb/kicad/transformer.py @@ -224,9 +224,7 @@ def attach(self): footprints = { (f.propertys["Reference"].value, f.name): f for f in self.pcb.footprints } - from faebryk.core.util import get_all_nodes_with_trait - - for node, fpt in get_all_nodes_with_trait(self.graph, F.has_footprint): + for node, fpt in self.graph.nodes_with_trait(F.has_footprint): if not node.has_trait(F.has_overriden_name): continue g_fp = fpt.get_footprint() @@ -692,11 +690,9 @@ def insert_zone(self, net: Net, layers: str | list[str], polygon: list[Point2D]) # Positioning ---------------------------------------------------------------------- def move_footprints(self): - from faebryk.core.util import get_all_nodes_with_traits - # position modules with defined positions - pos_mods = get_all_nodes_with_traits( - self.graph, (F.has_pcb_position, self.has_linked_kicad_footprint) + pos_mods = self.graph.nodes_with_traits( + (F.has_pcb_position, self.has_linked_kicad_footprint) ) logger.info(f"Positioning {len(pos_mods)} footprints") diff --git a/src/faebryk/exporters/pcb/routing/routing.py b/src/faebryk/exporters/pcb/routing/routing.py index 6d2d7aa0..57bdaf34 100644 --- a/src/faebryk/exporters/pcb/routing/routing.py +++ b/src/faebryk/exporters/pcb/routing/routing.py @@ -124,9 +124,7 @@ def draw_circle(self, coord: OutCoord, size=0.5, layer="User.9"): ) def route_all(self): - from faebryk.core.util import get_all_nodes_of_type - - nets = get_all_nodes_of_type(self.transformer.graph, F.Net) + nets = self.transformer.graph.nodes_of_type(F.Net) # TODO add net picking heuristic for net in nets: diff --git a/src/faebryk/libs/app/designators.py b/src/faebryk/libs/app/designators.py index 7692b62f..ca3f79ad 100644 --- a/src/faebryk/libs/app/designators.py +++ b/src/faebryk/libs/app/designators.py @@ -10,10 +10,6 @@ import faebryk.library._F as F from faebryk.core.graphinterface import Graph -from faebryk.core.util import ( - get_all_nodes_by_names, - get_all_nodes_with_trait, -) from faebryk.exporters.netlist.netlist import T2Netlist from faebryk.libs.kicad.fileformats import C_kicad_pcb_file from faebryk.libs.util import duplicates, get_key, groupby @@ -26,7 +22,7 @@ def attach_random_designators(graph: Graph): sorts nodes by path and then sequentially assigns designators """ - nodes = {n for n, _ in get_all_nodes_with_trait(graph, F.has_footprint)} + nodes = {n for n, _ in graph.nodes_with_trait(F.has_footprint)} in_use = { n.get_trait(F.has_designator).get_designator() @@ -77,7 +73,7 @@ def _get_first_hole(used: list[int]): def override_names_with_designators(graph: Graph): - for n, t in get_all_nodes_with_trait(graph, F.has_designator): + for n, t in graph.nodes_with_trait(F.has_designator): name = t.get_designator() if n.has_trait(F.has_overriden_name): logger.warning( @@ -102,7 +98,7 @@ def load_designators_from_netlist( matched_nodes = { node_name: (n, designators[node_name]) - for n, node_name in get_all_nodes_by_names(graph, designators.keys()) + for n, node_name in graph.nodes_by_names(designators.keys()) } for _, (n, designator) in matched_nodes.items(): @@ -126,7 +122,7 @@ def replace_faebryk_names_with_designators_in_kicad_pcb(graph: Graph, pcbfile: P pattern = re.compile(r"^(.*)\[[^\]]*\]$") translation = { n.get_full_name(): t.get_name() - for n, t in get_all_nodes_with_trait(graph, F.has_overriden_name) + for n, t in graph.nodes_with_trait(F.has_overriden_name) } for fp in pcb.kicad_pcb.footprints: diff --git a/src/faebryk/libs/app/erc.py b/src/faebryk/libs/app/erc.py index 85b68f69..dc52a3de 100644 --- a/src/faebryk/libs/app/erc.py +++ b/src/faebryk/libs/app/erc.py @@ -9,10 +9,6 @@ from faebryk.core.graphinterface import Graph from faebryk.core.module import Module from faebryk.core.moduleinterface import ModuleInterface -from faebryk.core.util import ( - get_all_nodes_of_type, - get_all_nodes_of_types, -) from faebryk.libs.picker.picker import has_part_picked from faebryk.libs.util import groupby, print_stack @@ -59,14 +55,14 @@ def simple_erc(G: Graph): logger.info("Checking graph for ERC violations") # power short - electricpower = get_all_nodes_of_type(G, F.ElectricPower) + electricpower = G.nodes_of_type(F.ElectricPower) logger.info(f"Checking {len(electricpower)} Power") for ep in electricpower: if ep.lv.is_connected_to(ep.hv): raise ERCFaultShort([ep], "shorted power") # shorted nets - nets = get_all_nodes_of_type(G, F.Net) + nets = G.nodes_of_type(F.Net) logger.info(f"Checking {len(nets)} nets") for net in nets: collisions = { @@ -107,7 +103,7 @@ def simple_erc(G: Graph): # checked.add(mif) # if any(mif.is_connected_to(other) for other in (mifs - checked)): # raise ERCFault([mif], "shorted symmetric footprint") - comps = get_all_nodes_of_types(G, (F.Resistor, F.Capacitor, F.Fuse)) + comps = G.nodes_of_types((F.Resistor, F.Capacitor, F.Fuse)) for comp in comps: assert isinstance(comp, (F.Resistor, F.Capacitor, F.Fuse)) # TODO make prettier diff --git a/test/core/test_performance.py b/test/core/test_performance.py index a34cc744..4ee04c9c 100644 --- a/test/core/test_performance.py +++ b/test/core/test_performance.py @@ -90,7 +90,7 @@ def _common_timings( G = app.get_graph() timings.add("graph") - core_util.node_projected_graph(G) + G.node_projection() timings.add("get_all_nodes_graph") for n in [app, app.resistors[0]]: