Skip to content
This repository has been archived by the owner on Dec 10, 2024. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
iopapamanoglou committed Sep 4, 2024
1 parent 170bc6b commit 5f5bf1c
Show file tree
Hide file tree
Showing 38 changed files with 493 additions and 575 deletions.
25 changes: 10 additions & 15 deletions examples/iterative_design_nand.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@

import faebryk.library._F as F
from faebryk.core.module import Module
from faebryk.core.util import (
get_all_nodes_with_trait,
specialize_interface,
specialize_module,
)
from faebryk.core.util import get_all_nodes_with_trait
from faebryk.libs.brightness import TypicalLuminousIntensity
from faebryk.libs.examples.buildutil import apply_design_to_pcb
from faebryk.libs.library import L
Expand Down Expand Up @@ -89,10 +85,10 @@ def App():
logic_in.connect_via(switch, on)

# bring logic signals into electrical domain
e_in = specialize_interface(logic_in, F.ElectricLogic())
e_out = specialize_interface(logic_out, F.ElectricLogic())
e_on = specialize_interface(on, F.ElectricLogic())
e_off = specialize_interface(off, F.ElectricLogic())
e_in = logic_in.specialize(F.ElectricLogic())
e_out = logic_out.specialize(F.ElectricLogic())
e_on = on.specialize(F.ElectricLogic())
e_off = off.specialize(F.ElectricLogic())
e_in.reference.connect(power)
e_out.reference.connect(power)
e_on.reference.connect(power)
Expand All @@ -103,13 +99,12 @@ def App():

e_out.connect(led.logic_in)

nxor = specialize_module(xor, XOR_with_NANDS())
battery = specialize_module(power_source, F.Battery())
nxor = xor.specialize(XOR_with_NANDS())
battery = power_source.specialize(F.Battery())

el_switch = specialize_module(switch, F.Switch(F.ElectricLogic)())
el_switch = switch.specialize(F.Switch(F.ElectricLogic)())
e_switch = F.Switch(F.Electrical)()
e_switch = specialize_module(
el_switch,
e_switch = el_switch.specialize(
e_switch,
matrix=[(e, el.signal) for e, el in zip(e_switch.unnamed, el_switch.unnamed)],
)
Expand All @@ -136,7 +131,7 @@ def App():
# packages single nands as explicit IC
nand_ic = F.TI_CD4011BE()
for ic_nand, xor_nand in zip(nand_ic.gates, nxor.nands):
specialize_module(xor_nand, ic_nand)
xor_nand.specialize(ic_nand)

app.add(nand_ic)

Expand Down
3 changes: 1 addition & 2 deletions examples/signal_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import faebryk.library._F as F
from faebryk.core.module import Module
from faebryk.core.util import specialize_module
from faebryk.libs.examples.buildutil import apply_design_to_pcb
from faebryk.libs.logging import setup_basic_logging
from faebryk.libs.units import P
Expand All @@ -30,7 +29,7 @@ def __preinit__(self) -> None:
self.lowpass.response.merge(F.Filter.Response.LOWPASS)

# Specialize
special = specialize_module(self.lowpass, F.FilterElectricalLC())
special = self.lowpass.specialize(F.FilterElectricalLC())

# Construct
special.get_trait(F.has_construction_dependency).construct()
Expand Down
13 changes: 12 additions & 1 deletion src/faebryk/core/graphinterface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is part of the faebryk project
# SPDX-License-Identifier: MIT
import logging
from typing import TYPE_CHECKING, Mapping, Optional
from typing import TYPE_CHECKING, Mapping, Optional, cast

Check failure on line 4 in src/faebryk/core/graphinterface.py

View workflow job for this annotation

GitHub Actions / test

Ruff (F401)

src/faebryk/core/graphinterface.py:4:54: F401 `typing.cast` imported but unused

from typing_extensions import Self, deprecated

Expand Down Expand Up @@ -105,6 +105,17 @@ def __repr__(self) -> str:
else "| <No None>"
)

def get_connected_nodes[T: Node](self, types: type[T]) -> set[T]:
from faebryk.core.link import LinkDirect

return {
n
for n in Node.get_nodes_from_gifs(
g for g, t in self.edges.items() if isinstance(t, LinkDirect)
)
if isinstance(n, types)
}


class GraphInterfaceHierarchical(GraphInterface):
def __init__(self, is_parent: bool) -> None:
Expand Down
94 changes: 94 additions & 0 deletions src/faebryk/core/module.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# This file is part of the faebryk project
# SPDX-License-Identifier: MIT
import logging
from typing import TYPE_CHECKING, Iterable

from faebryk.core.graphinterface import GraphInterface
from faebryk.core.node import Node
from faebryk.core.trait import Trait
from faebryk.libs.util import unique_ref

if TYPE_CHECKING:
from faebryk.core.moduleinterface import ModuleInterface

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -34,3 +38,93 @@ def get_most_special(self) -> "Module":
len(specialest_next) == 1
), f"Ambiguous specialest {specialest_next} for {self}"
return next(iter(specialest_next))

def get_children_modules(
self: Node, most_special: bool = True, direct_only: bool = False
):
out = self.get_children(direct_only=direct_only, types=Module)
if most_special:
out = {n.get_most_special() for n in out}
return out

def specialize[T: Module](
self,
special: T,
matrix: list[tuple["ModuleInterface", "ModuleInterface"]] | None = None,
attach_to: Node | None = None,
) -> T:
from faebryk.core.parameter import Parameter

logger.debug(f"Specializing Module {self} with {special}" + " " + "=" * 20)

def get_node_prop_matrix[N: Node](sub_type: type[N]):
return list(self.zip_children_by_name_with(special, sub_type).values())

if matrix is None:
matrix = get_node_prop_matrix(ModuleInterface)

# TODO add warning if not all src interfaces used

param_matrix = get_node_prop_matrix(Parameter)

for src, dst in matrix:
if src is None:
continue
if dst is None:
raise Exception(f"Special module misses interface: {src.get_name()}")
src.specialize(dst)

for src, dst in param_matrix:
if src is None:
continue
if dst is None:
raise Exception(f"Special module misses parameter: {src.get_name()}")
dst.merge(src)

# TODO this cant work
# for t in self.traits:
# # TODO needed?
# if special.has_trait(t.trait):
# continue
# special.add(t)

self.specialized.connect(special.specializes)

# Attach to new parent
has_parent = special.get_parent() is not None
assert not has_parent or attach_to is None
if not has_parent:
if attach_to:
attach_to.add(special, container=attach_to.specialized)
else:
gen_parent = self.get_parent()
if gen_parent:
gen_parent[0].add(special, name=f"{gen_parent[1]}_specialized")

return special

@staticmethod
def connect_all_interfaces_by_name(
src: Iterable["Module"] | "Module",
dst: Iterable["Module"] | "Module",
allow_partial: bool = False,
):
if isinstance(src, Module):
src = [src]
if isinstance(dst, Module):
dst = [dst]

for src_, dst_ in zip(src, dst):
for k, (src_m, dst_m) in src_.zip_children_by_name_with(
dst_, ModuleInterface
).items():
if src_m is None or dst_m is None:
if not allow_partial:
raise Exception(f"Node with name {k} not present in both")
continue
src_m.connect(dst_m)

def connect_interfaces_by_name(self, *dst: "Module", allow_partial: bool = False):
type(self).connect_all_interfaces_by_name(
self, dst, allow_partial=allow_partial
)
86 changes: 57 additions & 29 deletions src/faebryk/core/moduleinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from faebryk.core.node import Node
from faebryk.core.trait import Trait
from faebryk.libs.util import once, print_stack
from faebryk.libs.util import cast_assert, once, print_stack

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -137,11 +137,32 @@ class _LinkDirectShallowMif(

def __preinit__(self) -> None: ...

@staticmethod
def _get_connected(gif: GraphInterface):
assert isinstance(gif.node, ModuleInterface)
connections = gif.edges.items()

# check if ambiguous links between mifs
assert len(connections) == len({c[0] for c in connections})

return {
cast_assert(ModuleInterface, s.node): link
for s, link in connections
if s.node is not gif.node
}

def get_connected(self):
return self._get_connected(self.connected)

def get_specialized(self):
return self._get_connected(self.specialized)

def get_specializes(self):
return self._get_connected(self.specializes)

def _connect_siblings_and_connections(
self, other: "ModuleInterface", linkcls: type[Link]
) -> Self:
from faebryk.core.util import get_connected_mifs_with_link

if other is self:
return self

Expand All @@ -158,41 +179,34 @@ def _connect_siblings_and_connections(
logger.debug(f"MIF connection: {self} to {other}")

def cross_connect(
s_group: dict[ModuleInterface, type[Link]],
d_group: dict[ModuleInterface, type[Link]],
s_group: dict[ModuleInterface, type[Link] | Link],
d_group: dict[ModuleInterface, type[Link] | Link],
hint=None,
):
if logger.isEnabledFor(logging.DEBUG) and hint is not None:
logger.debug(f"Connect {hint} {s_group} -> {d_group}")

for s, slink in s_group.items():
if isinstance(slink, Link):
slink = type(slink)
for d, dlink in d_group.items():
if isinstance(dlink, Link):
dlink = type(dlink)
# can happen while connection trees are resolving
if s is d:
continue
link = _resolve_link_transitive([slink, dlink, linkcls])

s._connect_across_hierarchies(d, linkcls=link)

def _get_connected_mifs(gif: GraphInterface):
return {k: type(v) for k, v in get_connected_mifs_with_link(gif).items()}

# Connect to all connections
s_con = _get_connected_mifs(self.connected) | {self: linkcls}
d_con = _get_connected_mifs(other.connected) | {other: linkcls}
s_con = self.get_connected() | {self: linkcls}
d_con = other.get_connected() | {other: linkcls}
cross_connect(s_con, d_con, "connections")

# Connect to all siblings
s_sib = (
_get_connected_mifs(self.specialized)
| _get_connected_mifs(self.specializes)
| {self: linkcls}
)
d_sib = (
_get_connected_mifs(other.specialized)
| _get_connected_mifs(other.specializes)
| {other: linkcls}
)
s_sib = self.get_specialized() | self.get_specializes() | {self: linkcls}
d_sib = other.get_specialized() | other.get_specializes() | {other: linkcls}
cross_connect(s_sib, d_sib, "siblings")

return self
Expand All @@ -202,12 +216,12 @@ def _on_connect(self, other: "ModuleInterface"):
...

def _try_connect_down(self, other: "ModuleInterface", linkcls: type[Link]) -> None:
from faebryk.core.util import zip_children_by_name

if not isinstance(other, type(self)):
return

for _, (src, dst) in zip_children_by_name(self, other, ModuleInterface).items():
for _, (src, dst) in self.zip_children_by_name_with(
other, ModuleInterface
).items():
if src is None or dst is None:
continue
src.connect(dst, linkcls=linkcls)
Expand All @@ -233,12 +247,10 @@ def _is_connected(a, b):
assert isinstance(b, ModuleInterface)
return a.is_connected_to(b)

from faebryk.core.util import zip_children_by_name

connection_map = [
(src_i, dst_i, _is_connected(src_i, dst_i))
for src_i, dst_i in zip_children_by_name(
src_m, dst_m, sub_type=ModuleInterface
for src_i, dst_i in src_m.zip_children_by_name_with(
dst_m, sub_type=ModuleInterface
).values()
]

Expand Down Expand Up @@ -311,12 +323,15 @@ def get_direct_connections(self) -> set["ModuleInterface"]:
if isinstance(gif.node, ModuleInterface) and gif.node is not self
}

def connect(self, other: Self, linkcls=None) -> Self:
def connect(self: Self, *other: Self, linkcls=None) -> Self:
# TODO consider some type of check at the end within the graph instead
# assert type(other) is type(self)
if linkcls is None:
linkcls = LinkDirect
return self._connect_siblings_and_connections(other, linkcls=linkcls)

for o in other:
self._connect_siblings_and_connections(o, linkcls=linkcls)
return other[-1] if other else self

def connect_via(
self, bridge: Node | Sequence[Node], other: Self | None = None, linkcls=None
Expand All @@ -338,3 +353,16 @@ def connect_shallow(self, other: Self) -> Self:

def is_connected_to(self, other: "ModuleInterface"):
return self.connected.is_connected(other.connected)

def specialize[T: ModuleInterface](self, special: T) -> T:
logger.debug(f"Specializing MIF {self} with {special}")

assert isinstance(special, type(self))

# This is doing the heavy lifting
self.connect(special)

# Establish sibling relationship
self.specialized.connect(special.specializes)

return special
Loading

0 comments on commit 5f5bf1c

Please sign in to comment.