Skip to content
This repository has been archived by the owner on Dec 10, 2024. It is now read-only.

Commit

Permalink
Core: OO style functions (#36)
Browse files Browse the repository at this point in the history
Move core/util.py functions into respective classes (Node, Module, ...)
Improves ergonomics by a lot.
  • Loading branch information
iopapamanoglou authored Sep 4, 2024
1 parent 170bc6b commit 207420d
Show file tree
Hide file tree
Showing 59 changed files with 778 additions and 1,026 deletions.
26 changes: 10 additions & 16 deletions examples/iterative_design_nand.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

import faebryk.library._F as F
from faebryk.core.module import Module
from faebryk.core.util import (
get_all_nodes_with_trait,
specialize_interface,
specialize_module,
)
from faebryk.libs.brightness import TypicalLuminousIntensity
from faebryk.libs.examples.buildutil import apply_design_to_pcb
from faebryk.libs.library import L
Expand Down Expand Up @@ -89,10 +84,10 @@ def App():
logic_in.connect_via(switch, on)

# bring logic signals into electrical domain
e_in = specialize_interface(logic_in, F.ElectricLogic())
e_out = specialize_interface(logic_out, F.ElectricLogic())
e_on = specialize_interface(on, F.ElectricLogic())
e_off = specialize_interface(off, F.ElectricLogic())
e_in = logic_in.specialize(F.ElectricLogic())
e_out = logic_out.specialize(F.ElectricLogic())
e_on = on.specialize(F.ElectricLogic())
e_off = off.specialize(F.ElectricLogic())
e_in.reference.connect(power)
e_out.reference.connect(power)
e_on.reference.connect(power)
Expand All @@ -103,13 +98,12 @@ def App():

e_out.connect(led.logic_in)

nxor = specialize_module(xor, XOR_with_NANDS())
battery = specialize_module(power_source, F.Battery())
nxor = xor.specialize(XOR_with_NANDS())
battery = power_source.specialize(F.Battery())

el_switch = specialize_module(switch, F.Switch(F.ElectricLogic)())
el_switch = switch.specialize(F.Switch(F.ElectricLogic)())
e_switch = F.Switch(F.Electrical)()
e_switch = specialize_module(
el_switch,
e_switch = el_switch.specialize(
e_switch,
matrix=[(e, el.signal) for e, el in zip(e_switch.unnamed, el_switch.unnamed)],
)
Expand All @@ -125,7 +119,7 @@ def App():
app.add(c)

# parametrizing
for _, t in get_all_nodes_with_trait(app.get_graph(), F.ElectricLogic.has_pulls):
for _, t in app.get_graph().nodes_with_trait(F.ElectricLogic.has_pulls):
for pull_resistor in (r for r in t.get_pulls() if r):
pull_resistor.resistance.merge(100 * P.kohm)
power_source.power.voltage.merge(3 * P.V)
Expand All @@ -136,7 +130,7 @@ def App():
# packages single nands as explicit IC
nand_ic = F.TI_CD4011BE()
for ic_nand, xor_nand in zip(nand_ic.gates, nxor.nands):
specialize_module(xor_nand, ic_nand)
xor_nand.specialize(ic_nand)

app.add(nand_ic)

Expand Down
3 changes: 1 addition & 2 deletions examples/signal_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import faebryk.library._F as F
from faebryk.core.module import Module
from faebryk.core.util import specialize_module
from faebryk.libs.examples.buildutil import apply_design_to_pcb
from faebryk.libs.logging import setup_basic_logging
from faebryk.libs.units import P
Expand All @@ -30,7 +29,7 @@ def __preinit__(self) -> None:
self.lowpass.response.merge(F.Filter.Response.LOWPASS)

# Specialize
special = specialize_module(self.lowpass, F.FilterElectricalLC())
special = self.lowpass.specialize(F.FilterElectricalLC())

# Construct
special.get_trait(F.has_construction_dependency).construct()
Expand Down
55 changes: 53 additions & 2 deletions src/faebryk/core/graphinterface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is part of the faebryk project
# SPDX-License-Identifier: MIT
import logging
from typing import TYPE_CHECKING, Mapping, Optional
from typing import TYPE_CHECKING, Iterable, Mapping, Optional

from typing_extensions import Self, deprecated

Expand All @@ -15,10 +15,49 @@

if TYPE_CHECKING:
from faebryk.core.node import Node
from faebryk.core.trait import Trait

logger = logging.getLogger(__name__)

Graph = GraphImpl["GraphInterface"]

class Graph(GraphImpl["GraphInterface"]):
# Make all kinds of graph filtering functions so we can optimize them in the future
# Avoid letting user query all graph nodes always because quickly very slow

def node_projection(self) -> set["Node"]:
from faebryk.core.node import Node

return Node.get_nodes_from_gifs(self.subgraph_type(GraphInterfaceSelf))

def nodes_with_trait[T: "Trait"](self, trait: type[T]) -> list[tuple["Node", T]]:
return [
(n, n.get_trait(trait))
for n in self.node_projection()
if n.has_trait(trait)
]

# Waiting for python to add support for type mapping
def nodes_with_traits[*Ts](
self, traits: tuple[*Ts]
): # -> list[tuple[Node, tuple[*Ts]]]:
return [
(n, tuple(n.get_trait(trait) for trait in traits))
for n in self.node_projection()
if all(n.has_trait(trait) for trait in traits)
]

def nodes_by_names(self, names: Iterable[str]) -> list[tuple["Node", str]]:
return [
(n, node_name)
for n in self.node_projection()
if (node_name := n.get_full_name()) in names
]

def nodes_of_type[T: "Node"](self, t: type[T]) -> set[T]:
return {n for n in self.node_projection() if isinstance(n, t)}

def nodes_of_types(self, t: tuple[type["Node"], ...]) -> set["Node"]:
return {n for n in self.node_projection() if isinstance(n, t)}


class GraphInterface(FaebrykLibObject):
Expand Down Expand Up @@ -105,6 +144,18 @@ def __repr__(self) -> str:
else "| <No None>"
)

def get_connected_nodes[T: "Node"](self, types: type[T]) -> set[T]:
from faebryk.core.link import LinkDirect
from faebryk.core.node import Node

return {
n
for n in Node.get_nodes_from_gifs(
g for g, t in self.edges.items() if isinstance(t, LinkDirect)
)
if isinstance(n, types)
}


class GraphInterfaceHierarchical(GraphInterface):
def __init__(self, is_parent: bool) -> None:
Expand Down
97 changes: 97 additions & 0 deletions src/faebryk/core/module.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# This file is part of the faebryk project
# SPDX-License-Identifier: MIT
import logging
from typing import TYPE_CHECKING, Iterable

from faebryk.core.graphinterface import GraphInterface
from faebryk.core.node import Node
from faebryk.core.trait import Trait
from faebryk.libs.util import unique_ref

if TYPE_CHECKING:
from faebryk.core.moduleinterface import ModuleInterface

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -34,3 +38,96 @@ def get_most_special(self) -> "Module":
len(specialest_next) == 1
), f"Ambiguous specialest {specialest_next} for {self}"
return next(iter(specialest_next))

def get_children_modules(
self: Node, most_special: bool = True, direct_only: bool = False
):
out = self.get_children(direct_only=direct_only, types=Module)
if most_special:
out = {n.get_most_special() for n in out}
return out

def specialize[T: Module](
self,
special: T,
matrix: list[tuple["ModuleInterface", "ModuleInterface"]] | None = None,
attach_to: Node | None = None,
) -> T:
from faebryk.core.moduleinterface import ModuleInterface
from faebryk.core.parameter import Parameter

logger.debug(f"Specializing Module {self} with {special}" + " " + "=" * 20)

def get_node_prop_matrix[N: Node](sub_type: type[N]):
return list(self.zip_children_by_name_with(special, sub_type).values())

if matrix is None:
matrix = get_node_prop_matrix(ModuleInterface)

# TODO add warning if not all src interfaces used

param_matrix = get_node_prop_matrix(Parameter)

for src, dst in matrix:
if src is None:
continue
if dst is None:
raise Exception(f"Special module misses interface: {src.get_name()}")
src.specialize(dst)

for src, dst in param_matrix:
if src is None:
continue
if dst is None:
raise Exception(f"Special module misses parameter: {src.get_name()}")
dst.merge(src)

# TODO this cant work
# for t in self.traits:
# # TODO needed?
# if special.has_trait(t.trait):
# continue
# special.add(t)

self.specialized.connect(special.specializes)

# Attach to new parent
has_parent = special.get_parent() is not None
assert not has_parent or attach_to is None
if not has_parent:
if attach_to:
attach_to.add(special, container=attach_to.specialized)
else:
gen_parent = self.get_parent()
if gen_parent:
gen_parent[0].add(special, name=f"{gen_parent[1]}_specialized")

return special

@staticmethod
def connect_all_interfaces_by_name(
src: Iterable["Module"] | "Module",
dst: Iterable["Module"] | "Module",
allow_partial: bool = False,
):
from faebryk.core.moduleinterface import ModuleInterface

if isinstance(src, Module):
src = [src]
if isinstance(dst, Module):
dst = [dst]

for src_, dst_ in zip(src, dst):
for k, (src_m, dst_m) in src_.zip_children_by_name_with(
dst_, ModuleInterface
).items():
if src_m is None or dst_m is None:
if not allow_partial:
raise Exception(f"Node with name {k} not present in both")
continue
src_m.connect(dst_m)

def connect_interfaces_by_name(self, *dst: "Module", allow_partial: bool = False):
type(self).connect_all_interfaces_by_name(
self, dst, allow_partial=allow_partial
)
Loading

0 comments on commit 207420d

Please sign in to comment.