Skip to content
This repository has been archived by the owner on Dec 10, 2024. It is now read-only.

Core: OO style functions #36

Merged
merged 7 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions examples/iterative_design_nand.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

import faebryk.library._F as F
from faebryk.core.module import Module
from faebryk.core.util import (
get_all_nodes_with_trait,
specialize_interface,
specialize_module,
)
from faebryk.libs.brightness import TypicalLuminousIntensity
from faebryk.libs.examples.buildutil import apply_design_to_pcb
from faebryk.libs.library import L
Expand Down Expand Up @@ -89,10 +84,10 @@ def App():
logic_in.connect_via(switch, on)

# bring logic signals into electrical domain
e_in = specialize_interface(logic_in, F.ElectricLogic())
e_out = specialize_interface(logic_out, F.ElectricLogic())
e_on = specialize_interface(on, F.ElectricLogic())
e_off = specialize_interface(off, F.ElectricLogic())
e_in = logic_in.specialize(F.ElectricLogic())
e_out = logic_out.specialize(F.ElectricLogic())
e_on = on.specialize(F.ElectricLogic())
e_off = off.specialize(F.ElectricLogic())
e_in.reference.connect(power)
e_out.reference.connect(power)
e_on.reference.connect(power)
Expand All @@ -103,13 +98,12 @@ def App():

e_out.connect(led.logic_in)

nxor = specialize_module(xor, XOR_with_NANDS())
battery = specialize_module(power_source, F.Battery())
nxor = xor.specialize(XOR_with_NANDS())
battery = power_source.specialize(F.Battery())

el_switch = specialize_module(switch, F.Switch(F.ElectricLogic)())
el_switch = switch.specialize(F.Switch(F.ElectricLogic)())
e_switch = F.Switch(F.Electrical)()
e_switch = specialize_module(
el_switch,
e_switch = el_switch.specialize(
e_switch,
matrix=[(e, el.signal) for e, el in zip(e_switch.unnamed, el_switch.unnamed)],
)
Expand All @@ -125,7 +119,7 @@ def App():
app.add(c)

# parametrizing
for _, t in get_all_nodes_with_trait(app.get_graph(), F.ElectricLogic.has_pulls):
for _, t in app.get_graph().nodes_with_trait(F.ElectricLogic.has_pulls):
for pull_resistor in (r for r in t.get_pulls() if r):
pull_resistor.resistance.merge(100 * P.kohm)
power_source.power.voltage.merge(3 * P.V)
Expand All @@ -136,7 +130,7 @@ def App():
# packages single nands as explicit IC
nand_ic = F.TI_CD4011BE()
for ic_nand, xor_nand in zip(nand_ic.gates, nxor.nands):
specialize_module(xor_nand, ic_nand)
xor_nand.specialize(ic_nand)

app.add(nand_ic)

Expand Down
3 changes: 1 addition & 2 deletions examples/signal_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import faebryk.library._F as F
from faebryk.core.module import Module
from faebryk.core.util import specialize_module
from faebryk.libs.examples.buildutil import apply_design_to_pcb
from faebryk.libs.logging import setup_basic_logging
from faebryk.libs.units import P
Expand All @@ -30,7 +29,7 @@ def __preinit__(self) -> None:
self.lowpass.response.merge(F.Filter.Response.LOWPASS)

# Specialize
special = specialize_module(self.lowpass, F.FilterElectricalLC())
special = self.lowpass.specialize(F.FilterElectricalLC())

# Construct
special.get_trait(F.has_construction_dependency).construct()
Expand Down
55 changes: 53 additions & 2 deletions src/faebryk/core/graphinterface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is part of the faebryk project
# SPDX-License-Identifier: MIT
import logging
from typing import TYPE_CHECKING, Mapping, Optional
from typing import TYPE_CHECKING, Iterable, Mapping, Optional

from typing_extensions import Self, deprecated

Expand All @@ -15,10 +15,49 @@

if TYPE_CHECKING:
from faebryk.core.node import Node
from faebryk.core.trait import Trait

logger = logging.getLogger(__name__)

Graph = GraphImpl["GraphInterface"]

class Graph(GraphImpl["GraphInterface"]):
# Make all kinds of graph filtering functions so we can optimize them in the future
# Avoid letting user query all graph nodes always because quickly very slow

def node_projection(self) -> set["Node"]:
from faebryk.core.node import Node

return Node.get_nodes_from_gifs(self.subgraph_type(GraphInterfaceSelf))

def nodes_with_trait[T: "Trait"](self, trait: type[T]) -> list[tuple["Node", T]]:
return [
(n, n.get_trait(trait))
for n in self.node_projection()
if n.has_trait(trait)
]

# Waiting for python to add support for type mapping
def nodes_with_traits[*Ts](
self, traits: tuple[*Ts]
): # -> list[tuple[Node, tuple[*Ts]]]:
return [
(n, tuple(n.get_trait(trait) for trait in traits))
for n in self.node_projection()
if all(n.has_trait(trait) for trait in traits)
]

def nodes_by_names(self, names: Iterable[str]) -> list[tuple["Node", str]]:
return [
(n, node_name)
for n in self.node_projection()
if (node_name := n.get_full_name()) in names
]

def nodes_of_type[T: "Node"](self, t: type[T]) -> set[T]:
return {n for n in self.node_projection() if isinstance(n, t)}

def nodes_of_types(self, t: tuple[type["Node"], ...]) -> set["Node"]:
return {n for n in self.node_projection() if isinstance(n, t)}


class GraphInterface(FaebrykLibObject):
Expand Down Expand Up @@ -105,6 +144,18 @@ def __repr__(self) -> str:
else "| <No None>"
)

def get_connected_nodes[T: "Node"](self, types: type[T]) -> set[T]:
from faebryk.core.link import LinkDirect
from faebryk.core.node import Node

return {
n
for n in Node.get_nodes_from_gifs(
g for g, t in self.edges.items() if isinstance(t, LinkDirect)
)
if isinstance(n, types)
}


class GraphInterfaceHierarchical(GraphInterface):
def __init__(self, is_parent: bool) -> None:
Expand Down
97 changes: 97 additions & 0 deletions src/faebryk/core/module.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# This file is part of the faebryk project
# SPDX-License-Identifier: MIT
import logging
from typing import TYPE_CHECKING, Iterable

from faebryk.core.graphinterface import GraphInterface
from faebryk.core.node import Node
from faebryk.core.trait import Trait
from faebryk.libs.util import unique_ref

if TYPE_CHECKING:
from faebryk.core.moduleinterface import ModuleInterface

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -34,3 +38,96 @@ def get_most_special(self) -> "Module":
len(specialest_next) == 1
), f"Ambiguous specialest {specialest_next} for {self}"
return next(iter(specialest_next))

def get_children_modules(
self: Node, most_special: bool = True, direct_only: bool = False
):
out = self.get_children(direct_only=direct_only, types=Module)
if most_special:
out = {n.get_most_special() for n in out}
return out

def specialize[T: Module](
self,
special: T,
matrix: list[tuple["ModuleInterface", "ModuleInterface"]] | None = None,
attach_to: Node | None = None,
) -> T:
from faebryk.core.moduleinterface import ModuleInterface
from faebryk.core.parameter import Parameter

logger.debug(f"Specializing Module {self} with {special}" + " " + "=" * 20)

def get_node_prop_matrix[N: Node](sub_type: type[N]):
return list(self.zip_children_by_name_with(special, sub_type).values())

if matrix is None:
matrix = get_node_prop_matrix(ModuleInterface)

# TODO add warning if not all src interfaces used

param_matrix = get_node_prop_matrix(Parameter)

for src, dst in matrix:
if src is None:
continue
if dst is None:
raise Exception(f"Special module misses interface: {src.get_name()}")
src.specialize(dst)

for src, dst in param_matrix:
if src is None:
continue
if dst is None:
raise Exception(f"Special module misses parameter: {src.get_name()}")
dst.merge(src)

# TODO this cant work
# for t in self.traits:
# # TODO needed?
# if special.has_trait(t.trait):
# continue
# special.add(t)

self.specialized.connect(special.specializes)

# Attach to new parent
has_parent = special.get_parent() is not None
assert not has_parent or attach_to is None
if not has_parent:
if attach_to:
attach_to.add(special, container=attach_to.specialized)
else:
gen_parent = self.get_parent()
if gen_parent:
gen_parent[0].add(special, name=f"{gen_parent[1]}_specialized")

return special

@staticmethod
def connect_all_interfaces_by_name(
src: Iterable["Module"] | "Module",
dst: Iterable["Module"] | "Module",
allow_partial: bool = False,
):
from faebryk.core.moduleinterface import ModuleInterface

if isinstance(src, Module):
src = [src]
if isinstance(dst, Module):
dst = [dst]

for src_, dst_ in zip(src, dst):
for k, (src_m, dst_m) in src_.zip_children_by_name_with(
dst_, ModuleInterface
).items():
if src_m is None or dst_m is None:
if not allow_partial:
raise Exception(f"Node with name {k} not present in both")
continue
src_m.connect(dst_m)

def connect_interfaces_by_name(self, *dst: "Module", allow_partial: bool = False):
type(self).connect_all_interfaces_by_name(
self, dst, allow_partial=allow_partial
)
Loading