From 8d12b33914ebdc1094561b07a56017a249d1f7d2 Mon Sep 17 00:00:00 2001 From: Sam Mellor Date: Sat, 26 Oct 2024 07:41:47 -0700 Subject: [PATCH] Picker: Add API picker (#92) * add api pickers * add LED test case * add file license headers * fix imports * add missing find by lcsc_id/mfr methods to api picker lib * fix for Component.PARAMs removal * fix for extra diode filters * more Component.PARAMs * update api tests to match jlcpcb test changes * fix for recent changes * fix capacitor e-series * add api picker perf test * fix manufacturer_name sometimes available * dedup picker tests * run gen_F.py * add lru cache on api queries * surface in examples/buildutil * fix typing issues * run gen_F.py * refactor */pickers.py * switch to fastapi backend * embed default API url * Minor improvements - Fix types - Improve ConfigFlags - Change Exceptions * api client init simplification * revert assertIsInstance * api picker test conditions cleanup * singleton api client * fix TestPickerPerformanceJlcpcb casing * bump default api timeout 10s -> 30s * custom domain * 404 -> PickError --------- Co-authored-by: iopapamanoglou --- .../has_descriptive_properties_defined.py | 11 +- src/faebryk/libs/examples/buildutil.py | 24 +- src/faebryk/libs/picker/api/api.py | 249 +++++++++++ src/faebryk/libs/picker/api/picker_lib.py | 318 ++++++++++++++ src/faebryk/libs/picker/api/pickers.py | 52 +++ src/faebryk/libs/picker/common.py | 70 ++++ src/faebryk/libs/picker/jlcpcb/jlcpcb.py | 58 +-- src/faebryk/libs/picker/jlcpcb/picker_lib.py | 392 ++++++++++-------- src/faebryk/libs/picker/jlcpcb/pickers.py | 49 +-- src/faebryk/libs/picker/util.py | 39 ++ src/faebryk/libs/util.py | 176 ++++---- .../{test_jlcpcb.py => test_pickers.py} | 104 ++++- 12 files changed, 1164 insertions(+), 378 deletions(-) create mode 100644 src/faebryk/libs/picker/api/api.py create mode 100644 src/faebryk/libs/picker/api/picker_lib.py create mode 100644 src/faebryk/libs/picker/api/pickers.py create mode 100644 src/faebryk/libs/picker/common.py create mode 100644 src/faebryk/libs/picker/util.py rename test/libs/picker/{test_jlcpcb.py => test_pickers.py} (86%) diff --git a/src/faebryk/library/has_descriptive_properties_defined.py b/src/faebryk/library/has_descriptive_properties_defined.py index 794c0fc2..3f4ffbfc 100644 --- a/src/faebryk/library/has_descriptive_properties_defined.py +++ b/src/faebryk/library/has_descriptive_properties_defined.py @@ -7,18 +7,25 @@ import faebryk.library._F as F from faebryk.core.node import Node from faebryk.core.trait import TraitImpl +from faebryk.libs.picker.picker import DescriptiveProperties class has_descriptive_properties_defined(F.has_descriptive_properties.impl()): - def __init__(self, properties: Mapping[str, str]) -> None: + def __init__( + self, + properties: Mapping[str, str] + | Mapping[DescriptiveProperties, str] + | Mapping[str | DescriptiveProperties, str], + ) -> None: super().__init__() - self.properties = dict(properties) + self.properties: dict[str, str] = dict(properties) def get_properties(self) -> dict[str, str]: return self.properties def handle_duplicate(self, other: TraitImpl, node: Node) -> bool: if not isinstance(other, has_descriptive_properties_defined): + assert isinstance(other, F.has_descriptive_properties) self.properties.update(other.get_properties()) return super().handle_duplicate(other, node) diff --git a/src/faebryk/libs/examples/buildutil.py b/src/faebryk/libs/examples/buildutil.py index c9fd6ab2..20a28d56 100644 --- a/src/faebryk/libs/examples/buildutil.py +++ b/src/faebryk/libs/examples/buildutil.py @@ -13,6 +13,9 @@ from faebryk.libs.app.parameters import replace_tbd_with_any from faebryk.libs.app.pcb import apply_design from faebryk.libs.examples.pickers import add_example_pickers +from faebryk.libs.picker.api.api import ApiNotConfiguredError +from faebryk.libs.picker.api.pickers import add_api_pickers +from faebryk.libs.picker.common import DB_PICKER_BACKEND, PickerType from faebryk.libs.picker.jlcpcb.jlcpcb import JLCPCB_DB from faebryk.libs.picker.jlcpcb.pickers import add_jlcpcb_pickers from faebryk.libs.picker.picker import pick_part_recursively @@ -59,12 +62,21 @@ def apply_design_to_pcb( # TODO this can be prettier # picking ---------------------------------------------------------------- modules = m.get_children_modules(types=Module) - try: - JLCPCB_DB() - for n in modules: - add_jlcpcb_pickers(n, base_prio=-10) - except FileNotFoundError: - logger.warning("JLCPCB database not found. Skipping JLCPCB pickers.") + + match DB_PICKER_BACKEND: + case PickerType.JLCPCB: + try: + JLCPCB_DB() + for n in modules: + add_jlcpcb_pickers(n, base_prio=-10) + except FileNotFoundError: + logger.warning("JLCPCB database not found. Skipping JLCPCB pickers.") + case PickerType.API: + try: + for n in modules: + add_api_pickers(n) + except ApiNotConfiguredError: + logger.warning("API not configured. Skipping API pickers.") for n in modules: add_example_pickers(n) diff --git a/src/faebryk/libs/picker/api/api.py b/src/faebryk/libs/picker/api/api.py new file mode 100644 index 00000000..59f27a74 --- /dev/null +++ b/src/faebryk/libs/picker/api/api.py @@ -0,0 +1,249 @@ +# This file is part of the faebryk project +# SPDX-License-Identifier: MIT + +import functools +import logging +import textwrap +from dataclasses import dataclass + +import requests +from dataclasses_json import dataclass_json +from pint import DimensionalityError + +from faebryk.core.module import Module + +# TODO: replace with API-specific data model +from faebryk.libs.picker.jlcpcb.jlcpcb import Component, MappingParameterDB +from faebryk.libs.picker.lcsc import LCSC_NoDataException, LCSC_PinmapException +from faebryk.libs.picker.picker import PickError +from faebryk.libs.util import ConfigFlagString, try_or + +logger = logging.getLogger(__name__) + +DEFAULT_API_URL = "https://components.atopileapi.com" +DEFAULT_API_TIMEOUT_SECONDS = 30 +API_URL = ConfigFlagString("PICKER_API_URL", DEFAULT_API_URL, "API URL") +API_KEY = ConfigFlagString("PICKER_API_KEY", "", "API key") + + +class ApiError(Exception): ... + + +class ApiNotConfiguredError(ApiError): ... + + +class ApiHTTPError(ApiError): + def __init__(self, error: requests.exceptions.HTTPError): + super().__init__() + self.response = error.response + + def __str__(self) -> str: + status_code = self.response.status_code + detail = self.response.json()["detail"] + return f"{super().__str__()}: {status_code} {detail}" + + +def check_compatible_parameters( + module: Module, component: Component, mapping: list[MappingParameterDB] +) -> bool: + """ + Check if the parameters of a component are compatible with the module + """ + # TODO: serialize the module and check compatibility in the backend + + params = component.get_params(mapping) + param_matches = [ + try_or( + lambda: p.is_subset_of(getattr(module, m.param_name)), + default=False, + catch=DimensionalityError, + ) + for p, m in zip(params, mapping) + ] + + if not (is_compatible := all(param_matches)): + logger.debug( + f"Component {component.lcsc} doesn't match: " + f"{[p for p, v in zip(params, param_matches) if not v]}" + ) + + return is_compatible + + +def try_attach( + cmp: Module, parts: list[Component], mapping: list[MappingParameterDB], qty: int +) -> bool: + failures = [] + for part in parts: + if not check_compatible_parameters(cmp, part, mapping): + continue + + try: + part.attach(cmp, mapping, qty, allow_TBD=False) + return True + except (ValueError, Component.ParseError) as e: + failures.append((part, e)) + except LCSC_NoDataException as e: + failures.append((part, e)) + except LCSC_PinmapException as e: + failures.append((part, e)) + + if failures: + fail_str = textwrap.indent( + "\n" + f"{'\n'.join(f'{c}: {e}' for c, e in failures)}", " " * 4 + ) + + raise PickError( + f"Failed to attach any components to module {cmp}: {len(failures)}" + f" {fail_str}", + cmp, + ) + + return False + + +type SIvalue = str + + +@dataclass_json +@dataclass(frozen=True) +class FootprintCandidate: + footprint: str + pin_count: int + + +@dataclass_json +@dataclass(frozen=True) +class BaseParams: + footprint_candidates: list[FootprintCandidate] + qty: int + + def convert_to_dict(self) -> dict: + return self.to_dict() # type: ignore + + +@dataclass(frozen=True) +class ResistorParams(BaseParams): + resistances: list[SIvalue] + + +@dataclass(frozen=True) +class CapacitorParams(BaseParams): + capacitances: list[SIvalue] + + +@dataclass(frozen=True) +class InductorParams(BaseParams): + inductances: list[SIvalue] + + +@dataclass(frozen=True) +class TVSParams(BaseParams): ... + + +@dataclass(frozen=True) +class DiodeParams(BaseParams): + max_currents: list[SIvalue] + reverse_working_voltages: list[SIvalue] + + +@dataclass(frozen=True) +class LEDParams(BaseParams): ... + + +@dataclass(frozen=True) +class MOSFETParams(BaseParams): ... + + +@dataclass(frozen=True) +class LDOParams(BaseParams): ... + + +@dataclass(frozen=True) +class LCSCParams: + lcsc: int + + +@dataclass(frozen=True) +class ManufacturerPartParams: + manufacturer_name: str + mfr: str + qty: int + + +class ApiClient: + @dataclass + class Config: + api_url: str = API_URL.get() + api_key: str = API_KEY.get() + + config = Config() + + def __init__(self): + self._client = requests.Session() + self._client.headers["Authorization"] = f"Bearer {self.config.api_key}" + + def _get(self, url: str, timeout: float = 10) -> requests.Response: + try: + response = self._client.get(f"{self.config.api_url}{url}", timeout=timeout) + response.raise_for_status() + except requests.exceptions.HTTPError as e: + raise ApiHTTPError(e) from e + + return response + + def _post( + self, url: str, data: dict, timeout: float = DEFAULT_API_TIMEOUT_SECONDS + ) -> requests.Response: + try: + response = self._client.post( + f"{self.config.api_url}{url}", json=data, timeout=timeout + ) + response.raise_for_status() + except requests.exceptions.HTTPError as e: + raise ApiHTTPError(e) from e + + return response + + @functools.lru_cache(maxsize=None) + def fetch_part_by_lcsc(self, lcsc: int) -> list[Component]: + response = self._get(f"/v0/component/lcsc/{lcsc}") + return [Component(**part) for part in response.json()["components"]] + + @functools.lru_cache(maxsize=None) + def fetch_part_by_mfr(self, mfr: str, mfr_pn: str) -> list[Component]: + response = self._get(f"/v0/component/mfr/{mfr}/{mfr_pn}") + return [Component(**part) for part in response.json()["components"]] + + def query_parts(self, method: str, params: BaseParams) -> list[Component]: + response = self._post(f"/v0/query/{method}", params.convert_to_dict()) + return [Component(**part) for part in response.json()["components"]] + + def fetch_resistors(self, params: ResistorParams) -> list[Component]: + return self.query_parts("resistors", params) + + def fetch_capacitors(self, params: CapacitorParams) -> list[Component]: + return self.query_parts("capacitors", params) + + def fetch_inductors(self, params: InductorParams) -> list[Component]: + return self.query_parts("inductors", params) + + def fetch_tvs(self, params: TVSParams) -> list[Component]: + return self.query_parts("tvs", params) + + def fetch_diodes(self, params: DiodeParams) -> list[Component]: + return self.query_parts("diodes", params) + + def fetch_leds(self, params: LEDParams) -> list[Component]: + return self.query_parts("leds", params) + + def fetch_mosfets(self, params: MOSFETParams) -> list[Component]: + return self.query_parts("mosfets", params) + + def fetch_ldos(self, params: LDOParams) -> list[Component]: + return self.query_parts("ldos", params) + + +@functools.lru_cache +def get_api_client() -> ApiClient: + return ApiClient() diff --git a/src/faebryk/libs/picker/api/picker_lib.py b/src/faebryk/libs/picker/api/picker_lib.py new file mode 100644 index 00000000..27328c4e --- /dev/null +++ b/src/faebryk/libs/picker/api/picker_lib.py @@ -0,0 +1,318 @@ +# This file is part of the faebryk project +# SPDX-License-Identifier: MIT + +import logging +import re +from typing import Callable, Type + +import faebryk.library._F as F +from faebryk.core.module import Module +from faebryk.libs.e_series import E_SERIES_VALUES +from faebryk.libs.picker.api.api import ( + CapacitorParams, + DiodeParams, + FootprintCandidate, + InductorParams, + LDOParams, + LEDParams, + MOSFETParams, + ResistorParams, + TVSParams, + check_compatible_parameters, + get_api_client, + try_attach, +) + +# re-use the existing model for components from the jlcparts dataset, but as the data +# schema diverges over time we'll migrate this to separate models +from faebryk.libs.picker.jlcpcb.jlcpcb import Component +from faebryk.libs.picker.jlcpcb.picker_lib import _MAPPINGS_BY_TYPE +from faebryk.libs.picker.picker import DescriptiveProperties, PickError +from faebryk.libs.picker.util import generate_si_values +from faebryk.libs.util import KeyErrorAmbiguous, KeyErrorNotFound + +logger = logging.getLogger(__name__) +client = get_api_client() + + +# TODO add trait to module that specifies the quantity of the part +qty: int = 1 + + +def find_component_by_lcsc_id(lcsc_id: str) -> Component: + def extract_numeric_id(lcsc_id: str) -> int: + match = re.match(r"C(\d+)", lcsc_id) + if match is None: + raise ValueError(f"Invalid LCSC part number {lcsc_id}") + return int(match[1]) + + parts = client.fetch_part_by_lcsc(extract_numeric_id(lcsc_id)) + + if len(parts) < 1: + raise KeyErrorNotFound(f"Could not find part with LCSC part number {lcsc_id}") + + if len(parts) > 1: + raise KeyErrorAmbiguous( + parts, f"Found multiple parts with LCSC part number {lcsc_id}" + ) + + return next(iter(parts)) + + +def find_and_attach_by_lcsc_id(module: Module): + """ + Find a part by LCSC part number + """ + if not module.has_trait(F.has_descriptive_properties): + raise PickError("Module does not have any descriptive properties", module) + if "LCSC" not in module.get_trait(F.has_descriptive_properties).get_properties(): + raise PickError("Module does not have an LCSC part number", module) + + lcsc_pn = module.get_trait(F.has_descriptive_properties).get_properties()["LCSC"] + + # TODO: pass through errors from API + try: + part = find_component_by_lcsc_id(lcsc_pn) + except KeyErrorNotFound as e: + raise PickError( + f"Could not find part with LCSC part number {lcsc_pn}", module + ) from e + except KeyErrorAmbiguous as e: + raise PickError( + f"Found no exact match for LCSC part number {lcsc_pn}", module + ) from e + + if part.stock < qty: + raise PickError( + f"Part with LCSC part number {lcsc_pn} has insufficient stock", + module, + ) + part.attach(module, []) + + +def find_component_by_mfr(mfr: str, mfr_pn: str) -> Component: + parts = client.fetch_part_by_mfr(mfr, mfr_pn) + + if len(parts) < 1: + raise KeyErrorNotFound( + f"Could not find part with manufacturer part number {mfr_pn}" + ) + + if len(parts) > 1: + raise KeyErrorAmbiguous( + parts, f"Found multiple parts with manufacturer part number {mfr_pn}" + ) + + return next(iter(parts)) + + +def find_and_attach_by_mfr(module: Module): + """ + Find a part by manufacturer and manufacturer part number + """ + if not module.has_trait(F.has_descriptive_properties): + raise PickError("Module does not have any descriptive properties", module) + + properties = module.get_trait(F.has_descriptive_properties).get_properties() + + if DescriptiveProperties.manufacturer not in properties: + raise PickError("Module does not have a manufacturer", module) + + if DescriptiveProperties.partno not in properties: + raise PickError("Module does not have a manufacturer part number", module) + + mfr = properties[DescriptiveProperties.manufacturer] + mfr_pn = properties[DescriptiveProperties.partno] + + try: + parts = [find_component_by_mfr(mfr, mfr_pn)] + except KeyErrorNotFound as e: + raise PickError( + f"Could not find part with manufacturer part number {mfr_pn}", module + ) from e + except KeyErrorAmbiguous as e: + parts = e.duplicates + + for part in parts: + try: + part.attach(module, []) + return + except ValueError as e: + logger.warning(f"Failed to attach component: {e}") + continue + + raise PickError( + f"Could not attach any part with manufacturer part number {mfr_pn}", module + ) + + +def _filter_by_module_params_and_attach( + cmp: Module, component_type: Type[Module], parts: list[Component] +): + """ + Find a component with matching parameters + """ + mapping = _MAPPINGS_BY_TYPE[component_type] + parts = [part for part in parts if check_compatible_parameters(cmp, part, mapping)] + + if not try_attach(cmp, parts, mapping, qty): + raise PickError( + "No components found that match the parameters and that can be attached", + cmp, + ) + + +def _get_footprint_candidates(module: Module) -> list[FootprintCandidate]: + if module.has_trait(F.has_footprint_requirement): + return [ + FootprintCandidate(footprint, pin_count) + for footprint, pin_count in module.get_trait( + F.has_footprint_requirement + ).get_footprint_requirement() + ] + return [] + + +def find_resistor(cmp: Module): + """ + Find a resistor with matching parameters + """ + if not isinstance(cmp, F.Resistor): + raise PickError("Module is not a resistor", cmp) + + parts = client.fetch_resistors( + ResistorParams( + resistances=generate_si_values(cmp.resistance, "Ω", E_SERIES_VALUES.E96), + footprint_candidates=_get_footprint_candidates(cmp), + qty=qty, + ), + ) + + _filter_by_module_params_and_attach(cmp, F.Resistor, parts) + + +def find_capacitor(cmp: Module): + """ + Find a capacitor with matching parameters + """ + if not isinstance(cmp, F.Capacitor): + raise PickError("Module is not a capacitor", cmp) + + parts = client.fetch_capacitors( + CapacitorParams( + capacitances=generate_si_values(cmp.capacitance, "F", E_SERIES_VALUES.E24), + footprint_candidates=_get_footprint_candidates(cmp), + qty=qty, + ), + ) + + _filter_by_module_params_and_attach(cmp, F.Capacitor, parts) + + +def find_inductor(cmp: Module): + """ + Find an inductor with matching parameters + """ + if not isinstance(cmp, F.Inductor): + raise PickError("Module is not an inductor", cmp) + + parts = client.fetch_inductors( + InductorParams( + inductances=generate_si_values(cmp.inductance, "H", E_SERIES_VALUES.E24), + footprint_candidates=_get_footprint_candidates(cmp), + qty=qty, + ), + ) + + _filter_by_module_params_and_attach(cmp, F.Inductor, parts) + + +def find_tvs(cmp: Module): + """ + Find a TVS diode with matching parameters + """ + if not isinstance(cmp, F.TVS): + raise PickError("Module is not a TVS diode", cmp) + + parts = client.fetch_tvs( + TVSParams(footprint_candidates=_get_footprint_candidates(cmp), qty=qty), + ) + + _filter_by_module_params_and_attach(cmp, F.TVS, parts) + + +def find_diode(cmp: Module): + """ + Find a diode with matching parameters + """ + if not isinstance(cmp, F.Diode): + raise PickError("Module is not a diode", cmp) + + parts = client.fetch_diodes( + DiodeParams( + max_currents=generate_si_values(cmp.max_current, "A", E_SERIES_VALUES.E3), + reverse_working_voltages=generate_si_values( + cmp.reverse_working_voltage, "V", E_SERIES_VALUES.E3 + ), + footprint_candidates=_get_footprint_candidates(cmp), + qty=qty, + ), + ) + + _filter_by_module_params_and_attach(cmp, F.Diode, parts) + + +def find_led(cmp: Module): + """ + Find an LED with matching parameters + """ + if not isinstance(cmp, F.LED): + raise PickError("Module is not an LED", cmp) + + parts = client.fetch_leds( + LEDParams(footprint_candidates=_get_footprint_candidates(cmp), qty=qty) + ) + + _filter_by_module_params_and_attach(cmp, F.LED, parts) + + +def find_mosfet(cmp: Module): + """ + Find a MOSFET with matching parameters + """ + + if not isinstance(cmp, F.MOSFET): + raise PickError("Module is not a MOSFET", cmp) + + parts = client.fetch_mosfets( + MOSFETParams(footprint_candidates=_get_footprint_candidates(cmp), qty=qty) + ) + + _filter_by_module_params_and_attach(cmp, F.MOSFET, parts) + + +def find_ldo(cmp: Module): + """ + Find an LDO with matching parameters + """ + + if not isinstance(cmp, F.LDO): + raise PickError("Module is not a LDO", cmp) + + parts = client.fetch_ldos( + LDOParams(footprint_candidates=_get_footprint_candidates(cmp), qty=qty) + ) + + _filter_by_module_params_and_attach(cmp, F.LDO, parts) + + +TYPE_SPECIFIC_LOOKUP: dict[type[Module], Callable[[Module], None]] = { + F.Resistor: find_resistor, + F.Capacitor: find_capacitor, + F.Inductor: find_inductor, + F.TVS: find_tvs, + F.LED: find_led, + F.Diode: find_diode, + F.MOSFET: find_mosfet, + F.LDO: find_ldo, +} diff --git a/src/faebryk/libs/picker/api/pickers.py b/src/faebryk/libs/picker/api/pickers.py new file mode 100644 index 00000000..40506ce7 --- /dev/null +++ b/src/faebryk/libs/picker/api/pickers.py @@ -0,0 +1,52 @@ +# This file is part of the faebryk project +# SPDX-License-Identifier: MIT + +import faebryk.library._F as F +import faebryk.libs.picker.api.picker_lib as picker_lib +from faebryk.core.module import Module +from faebryk.libs.picker.api.api import ApiHTTPError +from faebryk.libs.picker.common import StaticPartPicker +from faebryk.libs.picker.jlcpcb.jlcpcb import Component +from faebryk.libs.picker.picker import PickError + + +class ApiPicker(F.has_multi_picker.FunctionPicker): + def pick(self, module: Module): + try: + super().pick(module) + except ApiHTTPError as e: + if e.response.status_code == 404: + raise PickError(str(e), module) from e + else: + raise + + +class StaticApiPartPicker(StaticPartPicker): + """ + Picks a specific part by ID or manufacturer / part number + """ + + def _find_parts(self, module: Module) -> list[Component]: + match self.mfr, self.mfr_pn, self.lcsc_pn: + case (mfr, mfr_pn, None) if mfr is not None and mfr_pn is not None: + return [picker_lib.find_component_by_mfr(mfr, mfr_pn)] + case (None, None, lcsc_pn) if lcsc_pn is not None: + return [picker_lib.find_component_by_lcsc_id(lcsc_pn)] + case (None, None, None): + raise PickError("No parameters provided", module) + return [] + + +def add_api_pickers(module: Module, base_prio: int = 0) -> None: + # Generic pickers + module.add( + F.has_multi_picker(base_prio, ApiPicker(picker_lib.find_and_attach_by_lcsc_id)) + ) + module.add( + F.has_multi_picker(base_prio, ApiPicker(picker_lib.find_and_attach_by_mfr)) + ) + + # Type-specific pickers + F.has_multi_picker.add_pickers_by_type( + module, picker_lib.TYPE_SPECIFIC_LOOKUP, ApiPicker, base_prio + 1 + ) diff --git a/src/faebryk/libs/picker/common.py b/src/faebryk/libs/picker/common.py new file mode 100644 index 00000000..76f8aa81 --- /dev/null +++ b/src/faebryk/libs/picker/common.py @@ -0,0 +1,70 @@ +# This file is part of the faebryk project +# SPDX-License-Identifier: MIT + +from abc import ABC, abstractmethod +from enum import StrEnum + +import faebryk.library._F as F +from faebryk.core.module import Module +from faebryk.libs.picker.jlcpcb.jlcpcb import Component +from faebryk.libs.picker.picker import PickError +from faebryk.libs.util import ConfigFlagEnum + + +class PickerType(StrEnum): + JLCPCB = "jlcpcb" + API = "api" + + +DB_PICKER_BACKEND = ConfigFlagEnum( + PickerType, "PICKER", PickerType.JLCPCB, "Picker backend to use" +) + + +class StaticPartPicker(F.has_multi_picker.Picker, ABC): + def __init__( + self, + *, + mfr: str | None = None, + mfr_pn: str | None = None, + lcsc_pn: str | None = None, + ) -> None: + super().__init__() + self.mfr = mfr + self.mfr_pn = mfr_pn + self.lcsc_pn = lcsc_pn + + def _friendly_description(self) -> str: + desc = [] + if self.mfr: + desc.append(f"mfr={self.mfr}") + if self.mfr_pn: + desc.append(f"mfr_pn={self.mfr_pn}") + if self.lcsc_pn: + desc.append(f"lcsc_pn={self.lcsc_pn}") + return ", ".join(desc) or "" + + @abstractmethod + def _find_parts(self, module: Module) -> list[Component]: + pass + + def pick(self, module: Module): + parts = self._find_parts(module) + + if len(parts) > 1: + raise PickError( + f"Multiple parts found for {self._friendly_description()}", module + ) + + if len(parts) < 1: + raise PickError( + f"Could not find part for {self._friendly_description()}", module + ) + + (part,) = parts + try: + part.attach(module, []) + except ValueError as e: + raise PickError( + f"Could not attach part for {self._friendly_description()}", module + ) from e diff --git a/src/faebryk/libs/picker/jlcpcb/jlcpcb.py b/src/faebryk/libs/picker/jlcpcb/jlcpcb.py index 721754a8..80325be0 100644 --- a/src/faebryk/libs/picker/jlcpcb/jlcpcb.py +++ b/src/faebryk/libs/picker/jlcpcb/jlcpcb.py @@ -18,17 +18,12 @@ from rich.progress import track from tortoise import Tortoise from tortoise.expressions import Q -from tortoise.fields import CharField, IntField, JSONField +from tortoise.fields import CharField, DatetimeField, IntField, JSONField from tortoise.models import Model import faebryk.library._F as F from faebryk.core.module import Module from faebryk.core.parameter import Parameter -from faebryk.libs.e_series import ( - E_SERIES_VALUES, - ParamNotResolvedError, - e_series_intersect, -) from faebryk.libs.picker.lcsc import ( LCSC_NoDataException, LCSC_Part, @@ -40,8 +35,8 @@ PickError, has_part_picked_defined, ) -from faebryk.libs.units import P, Quantity, UndefinedUnitError, to_si_str -from faebryk.libs.util import at_exit, cast_assert, try_or +from faebryk.libs.units import P, UndefinedUnitError +from faebryk.libs.util import at_exit, try_or logger = logging.getLogger(__name__) @@ -140,19 +135,22 @@ async def get_from_id(self, manufacturer_id: int) -> str: class Component(Model): lcsc = IntField(primary_key=True) category_id = IntField() + category = CharField(max_length=255) + subcategory = CharField(max_length=255, optional=True) mfr = CharField(max_length=255) package = CharField(max_length=255) joints = IntField() manufacturer_id = IntField() + manufacturer_name = CharField(max_length=255, optional=True) basic = IntField() description = CharField(max_length=255) datasheet = CharField(max_length=255) stock = IntField() price = JSONField() - last_update = IntField() + last_update = DatetimeField() extra = JSONField() flag = IntField() - last_on_stock = IntField() + last_on_stock = DatetimeField() preferred = IntField() class Meta: @@ -216,6 +214,11 @@ def attribute_to_parameter( if ignore_at: value_field = value_field.split("@")[0] + # parse fields like "110mA;130mA" + # TODO: better data model so we can choose the appropriate value + if ";" in value_field: + value_field = value_field.split(";")[0] + value_field = value_field.replace("cd", "candela") # parse fields like "1.5V~2.5V" @@ -343,9 +346,7 @@ def attach( F.has_descriptive_properties_defined( { DescriptiveProperties.partno: self.mfr, - DescriptiveProperties.manufacturer: asyncio.run( - Manufacturers().get_from_id(self.manufacturer_id) - ), + DescriptiveProperties.manufacturer: self.mfr_name, DescriptiveProperties.datasheet: self.datasheet, "JLCPCB stock": str(self.stock), "JLCPCB price": f"{self.get_price(qty):.4f}", @@ -366,8 +367,11 @@ def attach( ) @property - def mfr_name(self) -> str: - return asyncio.run(Manufacturers().get_from_id(self.manufacturer_id)) + def mfr_name(self): + try: + return self.manufacturer_name + except AttributeError: + return asyncio.run(Manufacturers().get_from_id(self.manufacturer_id)) class ComponentQuery: @@ -414,37 +418,19 @@ def filter_by_description(self, *keywords: str) -> Self: return self - def filter_by_value( - self, - value: Parameter[Quantity], - si_unit: str, - e_series: set[float] | None = None, - ) -> Self: + def filter_by_si_values(self, value: Parameter, si_vals: list[str]) -> Self: assert self.Q - value = value.get_most_narrow() if logger.isEnabledFor(logging.DEBUG): logger.debug( f"Filtering by value:\n{indent(value.get_tree_param().pretty(), ' '*4)}" ) + logger.debug(f"Possible values: {si_vals}") if isinstance(value, F.ANY): return self assert not self.results - try: - intersection = F.Set( - [e_series_intersect(value, e_series or E_SERIES_VALUES.E_ALL)] - ).params - except ParamNotResolvedError as e: - raise ComponentQuery.ParamError( - value, f"Could not run e_series_intersect: {e}" - ) from e - si_vals = [ - to_si_str(cast_assert(F.Constant, r).value, si_unit) - .replace("µ", "u") - .replace("inf", "∞") - for r in intersection - ] + return self.filter_by_description(*si_vals) def filter_by_category(self, category: str, subcategory: str) -> Self: diff --git a/src/faebryk/libs/picker/jlcpcb/picker_lib.py b/src/faebryk/libs/picker/jlcpcb/picker_lib.py index 49a421b4..a4b2fdcf 100644 --- a/src/faebryk/libs/picker/jlcpcb/picker_lib.py +++ b/src/faebryk/libs/picker/jlcpcb/picker_lib.py @@ -14,6 +14,7 @@ DescriptiveProperties, PickError, ) +from faebryk.libs.picker.util import generate_si_values from faebryk.libs.util import KeyErrorAmbiguous, KeyErrorNotFound logger = logging.getLogger(__name__) @@ -29,9 +30,6 @@ # - should be classes instead of functions -# Generic pickers ---------------------------------------------------------------------- - - def str_to_enum[T: Enum](enum: type[T], x: str) -> F.Constant[T]: name = x.replace(" ", "_").replace("-", "_").upper() if name not in [e.name for e in enum]: @@ -46,6 +44,181 @@ def f(x: str) -> F.Constant[T]: return f +_MAPPINGS_BY_TYPE: dict[type[Module], list[MappingParameterDB]] = { + F.Resistor: [ + MappingParameterDB( + "resistance", + ["Resistance"], + "Tolerance", + ), + MappingParameterDB( + "rated_power", + ["Power(Watts)"], + ), + MappingParameterDB( + "rated_voltage", + ["Overload Voltage (Max)"], + ), + ], + F.Capacitor: [ + MappingParameterDB("capacitance", ["Capacitance"], "Tolerance"), + MappingParameterDB( + "rated_voltage", + ["Voltage Rated"], + ), + MappingParameterDB( + "temperature_coefficient", + ["Temperature Coefficient"], + transform_fn=lambda x: str_to_enum( + F.Capacitor.TemperatureCoefficient, x.replace("NP0", "C0G") + ), + ), + ], + F.Inductor: [ + MappingParameterDB( + "inductance", + ["Inductance"], + "Tolerance", + ), + MappingParameterDB( + "rated_current", + ["Rated Current"], + ), + MappingParameterDB( + "dc_resistance", + ["DC Resistance (DCR)", "DC Resistance"], + ), + MappingParameterDB( + "self_resonant_frequency", + ["Frequency - Self Resonant"], + ), + ], + F.TVS: [ + MappingParameterDB( + "forward_voltage", + ["Breakdown Voltage"], + ), + # TODO: think about the difference of meaning for max_current between Diode + # and TVS + MappingParameterDB( + "max_current", + ["Peak Pulse Current (Ipp)@10/1000us"], + ), + MappingParameterDB( + "reverse_working_voltage", + ["Reverse Voltage (Vr)", "Reverse Stand-Off Voltage (Vrwm)"], + ), + MappingParameterDB( + "reverse_leakage_current", + ["Reverse Leakage Current", "Reverse Leakage Current (Ir)"], + ), + MappingParameterDB( + "reverse_breakdown_voltage", + ["Breakdown Voltage"], + ), + ], + F.Diode: [ + MappingParameterDB( + "forward_voltage", + ["Forward Voltage", "Forward Voltage (Vf@If)"], + ), + MappingParameterDB( + "max_current", + ["Average Rectified Current (Io)"], + ), + MappingParameterDB( + "reverse_working_voltage", + ["Reverse Voltage (Vr)", "Reverse Stand-Off Voltage (Vrwm)"], + ), + MappingParameterDB( + "reverse_leakage_current", + ["Reverse Leakage Current", "Reverse Leakage Current (Ir)"], + ), + ], + F.LED: [ + MappingParameterDB( + "color", + ["Emitted Color"], + transform_fn=str_to_enum_func(F.LED.Color), + ), + MappingParameterDB( + "max_brightness", + ["Luminous Intensity"], + ), + MappingParameterDB( + "max_current", + ["Forward Current"], + ), + MappingParameterDB( + "forward_voltage", + ["Forward Voltage", "Forward Voltage (VF)"], + ), + ], + F.MOSFET: [ + MappingParameterDB( + "max_drain_source_voltage", + ["Drain Source Voltage (Vdss)"], + ), + MappingParameterDB( + "max_continuous_drain_current", + ["Continuous Drain Current (Id)"], + ), + MappingParameterDB( + "channel_type", + ["Type"], + transform_fn=str_to_enum_func(F.MOSFET.ChannelType), + ), + MappingParameterDB( + "gate_source_threshold_voltage", + ["Gate Threshold Voltage (Vgs(th)@Id)"], + ), + MappingParameterDB( + "on_resistance", + ["Drain Source On Resistance (RDS(on)@Vgs,Id)"], + ), + ], + F.LDO: [ + MappingParameterDB( + "output_polarity", + ["Output Polarity"], + transform_fn=str_to_enum_func(F.LDO.OutputPolarity), + ), + MappingParameterDB( + "max_input_voltage", + ["Maximum Input Voltage"], + ), + MappingParameterDB( + "output_type", + ["Output Type"], + transform_fn=str_to_enum_func(F.LDO.OutputType), + ), + MappingParameterDB( + "output_current", + ["Output Current"], + ), + MappingParameterDB( + "dropout_voltage", + ["Dropout Voltage"], + ), + MappingParameterDB( + "output_voltage", + ["Output Voltage"], + ), + MappingParameterDB( + "quiescent_current", + [ + "Quiescent Current", + "standby current", + "Quiescent Current (Ground Current)", + ], + ), + ], +} + + +# Generic pickers ---------------------------------------------------------------------- + + def find_component_by_lcsc_id(lcsc_id: str) -> Component: parts = ComponentQuery().filter_by_lcsc_pn(lcsc_id).get() @@ -78,8 +251,10 @@ def find_and_attach_by_lcsc_id(module: Module): raise PickError( f"Could not find part with LCSC part number {lcsc_pn}", module ) from e - except KeyErrorAmbiguous: - raise PickError(f"Found no exact match for LCSC part number {lcsc_pn}", module) + except KeyErrorAmbiguous as e: + raise PickError( + f"Found no exact match for LCSC part number {lcsc_pn}", module + ) from e if part.stock < qty: raise PickError( @@ -168,28 +343,16 @@ def find_resistor(cmp: Module): provided resistor """ assert isinstance(cmp, F.Resistor) - - mapping = [ - MappingParameterDB( - "resistance", - ["Resistance"], - "Tolerance", - ), - MappingParameterDB( - "rated_power", - ["Power(Watts)"], - ), - MappingParameterDB( - "rated_voltage", - ["Overload Voltage (Max)"], - ), - ] + mapping = _MAPPINGS_BY_TYPE[F.Resistor] ( ComponentQuery() .filter_by_category("Resistors", "Chip Resistor - Surface Mount") .filter_by_stock(qty) - .filter_by_value(cmp.resistance, "Ω", E_SERIES_VALUES.E96) + .filter_by_si_values( + cmp.resistance, + generate_si_values(cmp.resistance, "Ω", E_SERIES_VALUES.E96), + ) .filter_by_traits(cmp) .sort_by_price(qty) .filter_by_module_params_and_attach(cmp, mapping, qty) @@ -203,21 +366,7 @@ def find_capacitor(cmp: Module): """ assert isinstance(cmp, F.Capacitor) - - mapping = [ - MappingParameterDB("capacitance", ["Capacitance"], "Tolerance"), - MappingParameterDB( - "rated_voltage", - ["Voltage Rated"], - ), - MappingParameterDB( - "temperature_coefficient", - ["Temperature Coefficient"], - transform_fn=lambda x: str_to_enum( - F.Capacitor.TemperatureCoefficient, x.replace("NP0", "C0G") - ), - ), - ] + mapping = _MAPPINGS_BY_TYPE[F.Capacitor] # TODO: add support for electrolytic capacitors. ( @@ -227,7 +376,10 @@ def find_capacitor(cmp: Module): ) .filter_by_stock(qty) .filter_by_traits(cmp) - .filter_by_value(cmp.capacitance, "F", E_SERIES_VALUES.E24) + .filter_by_si_values( + cmp.capacitance, + generate_si_values(cmp.capacitance, "F", E_SERIES_VALUES.E24), + ) .sort_by_price(qty) .filter_by_module_params_and_attach(cmp, mapping, qty) ) @@ -243,26 +395,7 @@ def find_inductor(cmp: Module): """ assert isinstance(cmp, F.Inductor) - - mapping = [ - MappingParameterDB( - "inductance", - ["Inductance"], - "Tolerance", - ), - MappingParameterDB( - "rated_current", - ["Rated Current"], - ), - MappingParameterDB( - "dc_resistance", - ["DC Resistance (DCR)", "DC Resistance"], - ), - MappingParameterDB( - "self_resonant_frequency", - ["Frequency - Self Resonant"], - ), - ] + mapping = _MAPPINGS_BY_TYPE[F.Inductor] ( ComponentQuery() @@ -271,7 +404,10 @@ def find_inductor(cmp: Module): .filter_by_category("Inductors", "Inductors") .filter_by_stock(qty) .filter_by_traits(cmp) - .filter_by_value(cmp.inductance, "H", E_SERIES_VALUES.E24) + .filter_by_si_values( + cmp.inductance, + generate_si_values(cmp.inductance, "H", E_SERIES_VALUES.E24), + ) .sort_by_price(qty) .filter_by_module_params_and_attach(cmp, mapping, qty) ) @@ -288,30 +424,7 @@ def find_tvs(cmp: Module): # TODO: handle bidirectional TVS diodes # "Bidirectional Channels": "1" in extra['attributes'] - mapping = [ - MappingParameterDB( - "forward_voltage", - ["Breakdown Voltage"], - ), - # TODO: think about the difference of meaning for max_current between Diode - # and TVS - MappingParameterDB( - "max_current", - ["Peak Pulse Current (Ipp)@10/1000us"], - ), - MappingParameterDB( - "reverse_working_voltage", - ["Reverse Voltage (Vr)", "Reverse Stand-Off Voltage (Vrwm)"], - ), - MappingParameterDB( - "reverse_leakage_current", - ["Reverse Leakage Current", "Reverse Leakage Current (Ir)"], - ), - MappingParameterDB( - "reverse_breakdown_voltage", - ["Breakdown Voltage"], - ), - ] + mapping = _MAPPINGS_BY_TYPE[F.TVS] ( ComponentQuery() @@ -331,31 +444,20 @@ def find_diode(cmp: Module): assert isinstance(cmp, F.Diode) - mapping = [ - MappingParameterDB( - "forward_voltage", - ["Forward Voltage", "Forward Voltage (Vf@If)"], - ), - MappingParameterDB( - "max_current", - ["Average Rectified Current (Io)"], - ), - MappingParameterDB( - "reverse_working_voltage", - ["Reverse Voltage (Vr)", "Reverse Stand-Off Voltage (Vrwm)"], - ), - MappingParameterDB( - "reverse_leakage_current", - ["Reverse Leakage Current", "Reverse Leakage Current (Ir)"], - ), - ] + mapping = _MAPPINGS_BY_TYPE[F.Diode] ( ComponentQuery() .filter_by_category("Diodes", "") .filter_by_stock(qty) - .filter_by_value(cmp.max_current, "A", E_SERIES_VALUES.E3) - .filter_by_value(cmp.reverse_working_voltage, "V", E_SERIES_VALUES.E3) + .filter_by_si_values( + cmp.max_current, + generate_si_values(cmp.max_current, "A", E_SERIES_VALUES.E3), + ) + .filter_by_si_values( + cmp.reverse_working_voltage, + generate_si_values(cmp.reverse_working_voltage, "V", E_SERIES_VALUES.E3), + ) .filter_by_traits(cmp) .sort_by_price(qty) .filter_by_module_params_and_attach(cmp, mapping, qty) @@ -369,26 +471,7 @@ def find_led(cmp: Module): """ assert isinstance(cmp, F.LED) - - mapping = [ - MappingParameterDB( - "color", - ["Emitted Color"], - transform_fn=str_to_enum_func(F.LED.Color), - ), - MappingParameterDB( - "max_brightness", - ["Luminous Intensity"], - ), - MappingParameterDB( - "max_current", - ["Forward Current"], - ), - MappingParameterDB( - "forward_voltage", - ["Forward Voltage", "Forward Voltage (VF)"], - ), - ] + mapping = _MAPPINGS_BY_TYPE[F.LED] ( ComponentQuery() @@ -407,30 +490,7 @@ def find_mosfet(cmp: Module): """ assert isinstance(cmp, F.MOSFET) - - mapping = [ - MappingParameterDB( - "max_drain_source_voltage", - ["Drain Source Voltage (Vdss)"], - ), - MappingParameterDB( - "max_continuous_drain_current", - ["Continuous Drain Current (Id)"], - ), - MappingParameterDB( - "channel_type", - ["Type"], - transform_fn=str_to_enum_func(F.MOSFET.ChannelType), - ), - MappingParameterDB( - "gate_source_threshold_voltage", - ["Gate Threshold Voltage (Vgs(th)@Id)"], - ), - MappingParameterDB( - "on_resistance", - ["Drain Source On Resistance (RDS(on)@Vgs,Id)"], - ), - ] + mapping = _MAPPINGS_BY_TYPE[F.MOSFET] ( ComponentQuery() @@ -449,43 +509,7 @@ def find_ldo(cmp: Module): """ assert isinstance(cmp, F.LDO) - - mapping = [ - MappingParameterDB( - "output_polarity", - ["Output Polarity"], - transform_fn=str_to_enum_func(F.LDO.OutputPolarity), - ), - MappingParameterDB( - "max_input_voltage", - ["Maximum Input Voltage"], - ), - MappingParameterDB( - "output_type", - ["Output Type"], - transform_fn=str_to_enum_func(F.LDO.OutputType), - ), - MappingParameterDB( - "output_current", - ["Output Current"], - ), - MappingParameterDB( - "dropout_voltage", - ["Dropout Voltage"], - ), - MappingParameterDB( - "output_voltage", - ["Output Voltage"], - ), - MappingParameterDB( - "quiescent_current", - [ - "Quiescent Current", - "standby current", - "Quiescent Current (Ground Current)", - ], - ), - ] + mapping = _MAPPINGS_BY_TYPE[F.LDO] ( ComponentQuery() diff --git a/src/faebryk/libs/picker/jlcpcb/pickers.py b/src/faebryk/libs/picker/jlcpcb/pickers.py index 376f4d3c..86a5a9c7 100644 --- a/src/faebryk/libs/picker/jlcpcb/pickers.py +++ b/src/faebryk/libs/picker/jlcpcb/pickers.py @@ -3,6 +3,7 @@ import faebryk.library._F as F import faebryk.libs.picker.jlcpcb.picker_lib as P from faebryk.core.module import Module +from faebryk.libs.picker.common import StaticPartPicker from faebryk.libs.picker.jlcpcb.jlcpcb import JLCPCB_DB, ComponentQuery from faebryk.libs.picker.picker import PickError @@ -19,37 +20,13 @@ def pick(self, module: Module): raise PickError(e.args[0], module) from e -class StaticJLCPCBPartPicker(F.has_multi_picker.Picker): +class StaticJLCPCBPartPicker(StaticPartPicker): """ Use this if you want to specify a specific part to the multi-picker eg. a default switch """ - def __init__( - self, - *, - mfr: str | None = None, - mfr_pn: str | None = None, - lcsc_pn: str | None = None, - ) -> None: - super().__init__() - self.mfr = mfr - self.mfr_pn = mfr_pn - self.lcsc_pn = lcsc_pn - - def _friendly_description(self) -> str: - desc = "" - if self.mfr: - desc += f"mfr={self.mfr}" - if self.mfr_pn: - desc += f"mfr_pn={self.mfr_pn}" - if self.lcsc_pn: - desc += f"lcsc_pn={self.lcsc_pn}" - if not desc: - return "" - return desc - - def pick(self, module: Module): + def _find_parts(self, module: Module): q = ComponentQuery() if self.mfr: @@ -59,25 +36,7 @@ def pick(self, module: Module): if self.lcsc_pn: q.filter_by_lcsc_pn(self.lcsc_pn) - parts = q.get() - - if len(parts) > 1: - raise PickError( - f"Multiple parts found for {self._friendly_description()}", module - ) - - if len(parts) < 1: - raise PickError( - f"Could not find part for {self._friendly_description()}", module - ) - - try: - parts[0].attach(module, []) - return - except ValueError as e: - raise PickError( - f"Could not attach part for {self._friendly_description()}", module - ) from e + return q.get() def add_jlcpcb_pickers(module: Module, base_prio: int = 0) -> None: diff --git a/src/faebryk/libs/picker/util.py b/src/faebryk/libs/picker/util.py new file mode 100644 index 00000000..1ca8e3fb --- /dev/null +++ b/src/faebryk/libs/picker/util.py @@ -0,0 +1,39 @@ +# This file is part of the faebryk project +# SPDX-License-Identifier: MIT + +import faebryk.library._F as F +from faebryk.core.parameter import Parameter +from faebryk.library.Set import Set +from faebryk.libs.e_series import ( + E_SERIES_VALUES, + ParamNotResolvedError, + e_series_intersect, +) +from faebryk.libs.picker.picker import PickError +from faebryk.libs.units import Quantity, to_si_str +from faebryk.libs.util import cast_assert + + +def generate_si_values( + value: Parameter[Quantity], si_unit: str, e_series: set[float] | None = None +): + """ + Generate a list of permissible SI values for the given parameter from an + E-series + """ + + value = value.get_most_narrow() + + try: + intersection = Set( + [e_series_intersect(value, e_series or E_SERIES_VALUES.E_ALL)] + ).params + except ParamNotResolvedError as e: + raise PickError(value, f"Could not run e_series_intersect: {e}") from e + + return [ + to_si_str(cast_assert(F.Constant, r).value, si_unit) + .replace("µ", "u") + .replace("inf", "∞") + for r in intersection + ] diff --git a/src/faebryk/libs/util.py b/src/faebryk/libs/util.py index dabb8b1f..cc10d422 100644 --- a/src/faebryk/libs/util.py +++ b/src/faebryk/libs/util.py @@ -5,13 +5,13 @@ import collections.abc import inspect import logging +import os import sys from abc import abstractmethod from collections import defaultdict from contextlib import contextmanager from dataclasses import dataclass, fields from enum import StrEnum -from functools import cache from itertools import chain from textwrap import indent from typing import ( @@ -711,68 +711,121 @@ def __init_subclass__(cls) -> None: lazy_construct(cls) -class ConfigFlag: - def __init__(self, name: str, default: bool = False, descr: str = "") -> None: - self.name = name - self.default = default - self.descr = descr +def once[T, **P](f: Callable[P, T]) -> Callable[P, T]: + def wrapper(*args: P.args, **kwargs: P.kwargs) -> Any: + lookup = (args, tuple(kwargs.items())) + if lookup in wrapper.cache: + return wrapper.cache[lookup] - @cache - def __bool__(self): - import os + result = f(*args, **kwargs) + wrapper.cache[lookup] = result + return result - key = f"FBRK_{self.name}" + wrapper.cache = {} + wrapper._is_once_wrapper = True + return wrapper - if key not in os.environ: - return self.default - matches = [ - (True, ["1", "true", "yes", "y"]), - (False, ["0", "false", "no", "n"]), - ] - val = os.environ[key].lower() +def assert_once[T, O, **P]( + f: Callable[Concatenate[O, P], T], +) -> Callable[Concatenate[O, P], T]: + def wrapper(obj: O, *args: P.args, **kwargs: P.kwargs) -> T: + if not hasattr(obj, "_assert_once_called"): + setattr(obj, "_assert_once_called", set()) - res = find(matches, lambda x: val in x[1])[0] + wrapper_set = getattr(obj, "_assert_once_called") - if res != self.default: - logger.warning(f"Config flag |{self.name}={res}|") + if wrapper not in wrapper_set: + wrapper_set.add(wrapper) + return f(obj, *args, **kwargs) + else: + raise AssertionError(f"{f.__name__} called on {obj} more than once") - return res + return wrapper -class ConfigFlagEnum[E: StrEnum]: - def __init__(self, enum: type[E], name: str, default: E, descr: str = "") -> None: - self.enum = enum +def assert_once_global[T, **P](f: Callable[P, T]) -> Callable[P, T]: + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + if not wrapper.called: + wrapper.called = True + return f(*args, **kwargs) + else: + raise AssertionError("Function called more than once") + + wrapper.called = False + return wrapper + + +class _ConfigFlagBase[T]: + def __init__(self, name: str, default: T, descr: str = ""): self._name = name self.default = default self.descr = descr + self._type: type[T] = type(default) - self._resolved = None - - def get(self): - if self._resolved is not None: - return self._resolved - - import os + @property + def name(self) -> str: + return f"FBRK_{self._name}" - key = f"FBRK_{self._name}" + @property + def raw_value(self) -> str | None: + return os.getenv(self.name, None) - if key not in os.environ: - return self.default + @once + def get(self) -> T: + raw_val = self.raw_value - val = os.environ[key].upper() - res = self.enum[val] + if raw_val is None: + res = self.default + else: + res = self._convert(raw_val) if res != self.default: - logger.warning(f"Config flag |{self._name}={res}|") + logger.warning(f"Config flag |{self.name}={res}|") - self._resolved = res return res - def __eq__(self, other) -> Any: + def __hash__(self) -> int: + return hash(self.name) + + @abstractmethod + def _convert(self, raw_val: str) -> T: ... + + def __eq__(self, other) -> bool: return self.get() == other +class ConfigFlag(_ConfigFlagBase[bool]): + def __init__(self, name: str, default: bool = False, descr: str = "") -> None: + super().__init__(name, default, descr) + + def _convert(self, raw_val: str) -> bool: + matches = [ + (True, ["1", "true", "yes", "y"]), + (False, ["0", "false", "no", "n"]), + ] + val = raw_val.lower() + + return find(matches, lambda x: val in x[1])[0] + + def __bool__(self): + return self.get() + + +class ConfigFlagEnum[E: StrEnum](_ConfigFlagBase[E]): + def __init__(self, enum: type[E], name: str, default: E, descr: str = "") -> None: + super().__init__(name, default, descr) + self.enum = enum + + def _convert(self, raw_val: str) -> E: + return self.enum[raw_val.upper()] + + +class ConfigFlagString(_ConfigFlagBase[str]): + def _convert(self, raw_val: str) -> str: + return raw_val + + def zip_dicts_by_key(*dicts): keys = {k for d in dicts for k in d} return {k: tuple(d.get(k) for d in dicts) for k in keys} @@ -807,51 +860,6 @@ def __() -> T: return _ -def once[T, **P](f: Callable[P, T]) -> Callable[P, T]: - def wrapper(*args: P.args, **kwargs: P.kwargs) -> Any: - lookup = (args, tuple(kwargs.items())) - if lookup in wrapper.cache: - return wrapper.cache[lookup] - - result = f(*args, **kwargs) - wrapper.cache[lookup] = result - return result - - wrapper.cache = {} - wrapper._is_once_wrapper = True - return wrapper - - -def assert_once[T, O, **P]( - f: Callable[Concatenate[O, P], T], -) -> Callable[Concatenate[O, P], T]: - def wrapper(obj: O, *args: P.args, **kwargs: P.kwargs) -> T: - if not hasattr(obj, "_assert_once_called"): - setattr(obj, "_assert_once_called", set()) - - wrapper_set = getattr(obj, "_assert_once_called") - - if wrapper not in wrapper_set: - wrapper_set.add(wrapper) - return f(obj, *args, **kwargs) - else: - raise AssertionError(f"{f.__name__} called on {obj} more than once") - - return wrapper - - -def assert_once_global[T, **P](f: Callable[P, T]) -> Callable[P, T]: - def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: - if not wrapper.called: - wrapper.called = True - return f(*args, **kwargs) - else: - raise AssertionError("Function called more than once") - - wrapper.called = False - return wrapper - - class PostInitCaller(type): def __call__(cls, *args, **kwargs): obj = type.__call__(cls, *args, **kwargs) diff --git a/test/libs/picker/test_jlcpcb.py b/test/libs/picker/test_pickers.py similarity index 86% rename from test/libs/picker/test_jlcpcb.py rename to test/libs/picker/test_pickers.py index 7997221b..6b996761 100644 --- a/test/libs/picker/test_jlcpcb.py +++ b/test/libs/picker/test_pickers.py @@ -1,7 +1,9 @@ # This file is part of the faebryk project # SPDX-License-Identifier: MIT + import logging import unittest +from abc import ABC, abstractmethod from pathlib import Path from tempfile import mkdtemp @@ -9,7 +11,9 @@ import faebryk.libs.picker.lcsc as lcsc from faebryk.core.module import Module from faebryk.core.parameter import Parameter +from faebryk.libs.brightness import TypicalLuminousIntensity from faebryk.libs.logging import setup_basic_logging +from faebryk.libs.picker.api.pickers import add_api_pickers from faebryk.libs.picker.jlcpcb.jlcpcb import JLCPCB_DB from faebryk.libs.picker.jlcpcb.pickers import add_jlcpcb_pickers from faebryk.libs.picker.picker import DescriptiveProperties, has_part_picked @@ -18,23 +22,27 @@ logger = logging.getLogger(__name__) - lcsc.LIB_FOLDER = Path(mkdtemp()) -@unittest.skipIf(not JLCPCB_DB.config.db_path.exists(), reason="Requires large db") -class TestPickerJlcpcb(unittest.TestCase): +class TestPickerBase(unittest.TestCase, ABC): + @abstractmethod + def add_pickers(self, module: Module): + pass + class TestRequirements: def __init__( self, test_case: unittest.TestCase, requirement: Module, footprint: list[tuple[str, int]], + add_pickers_func, ): self.test_case = test_case self.result = requirement self.requirement = requirement self.footprint = footprint + self.add_pickers_func = add_pickers_func self.req_lcsc_pn = None if self.requirement.has_trait(F.has_descriptive_properties) and "LCSC" in ( @@ -116,7 +124,7 @@ def satisfies_requirements(self): ) def test(self): - add_jlcpcb_pickers(self.result) + self.add_pickers_func(self.result) self.result.get_trait(F.has_picker).pick() self.test_case.assertTrue(self.result.has_trait(has_part_picked)) @@ -183,6 +191,7 @@ def test_find_manufacturer_partnumber(self): self, requirement=requirement, footprint=[("SOT-23-5", 5)], + add_pickers_func=self.add_pickers, ) def test_find_lcsc_partnumber(self): @@ -199,17 +208,12 @@ def test_find_lcsc_partnumber(self): r.slew_rate.merge(F.Range.upper_bound(1 * P.MV / P.us)), ) ) - requirement.add( - F.has_descriptive_properties_defined( - { - "LCSC": "C7972", - } - ) - ) + requirement.add(F.has_descriptive_properties_defined({"LCSC": "C7972"})) self.TestRequirements( self, requirement=requirement, footprint=[("SOT-23-5", 5)], + add_pickers_func=self.add_pickers, ) def test_find_resistor(self): @@ -223,6 +227,7 @@ def test_find_resistor(self): ) ), footprint=[("0402", 2)], + add_pickers_func=self.add_pickers, ) self.TestRequirements( @@ -235,6 +240,7 @@ def test_find_resistor(self): ) ), footprint=[("0603", 2)], + add_pickers_func=self.add_pickers, ) def test_find_capacitor(self): @@ -250,6 +256,7 @@ def test_find_capacitor(self): ) ), footprint=[("0603", 2)], + add_pickers_func=self.add_pickers, ) self.TestRequirements( @@ -264,6 +271,7 @@ def test_find_capacitor(self): ) ), footprint=[("0402", 2)], + add_pickers_func=self.add_pickers, ) def test_find_inductor(self): @@ -280,6 +288,7 @@ def test_find_inductor(self): ) ), footprint=[("0603", 2)], + add_pickers_func=self.add_pickers, ) def test_find_mosfet(self): @@ -298,6 +307,7 @@ def test_find_mosfet(self): ) ), footprint=[("SOT-23", 3)], + add_pickers_func=self.add_pickers, ) def test_find_diode(self): @@ -313,6 +323,28 @@ def test_find_diode(self): ) ), footprint=[("SOD-123", 2)], + add_pickers_func=self.add_pickers, + ) + + def test_find_led(self): + self.TestRequirements( + self, + requirement=F.LED().builder( + lambda led: ( + led.color.merge(F.LED.Color.RED), + led.brightness.merge( + TypicalLuminousIntensity.APPLICATION_LED_INDICATOR_INSIDE.value.value + ), + # TODO: check semantics of F.ANY vs F.TBD + led.reverse_leakage_current.merge(F.ANY()), + led.reverse_working_voltage.merge(F.ANY()), + led.max_brightness.merge(F.Range.lower_bound(100 * P.millicandela)), + led.forward_voltage.merge(F.Range.upper_bound(2.5 * P.V)), + led.max_current.merge(F.Range.upper_bound(20 * P.mA)), + ) + ), + footprint=[("0805", 2)], + add_pickers_func=self.add_pickers, ) def test_find_tvs(self): @@ -331,6 +363,7 @@ def test_find_tvs(self): ) ), footprint=[("SMB(DO-214AA)", 2)], + add_pickers_func=self.add_pickers, ) def test_find_ldo(self): @@ -354,19 +387,15 @@ def test_find_ldo(self): ("SOT-23-3", 3), ("SOT-23-3L", 3), ], + add_pickers_func=self.add_pickers, ) - def tearDown(self): - # in test atexit not triggered, thus need to close DB manually - JLCPCB_DB.get().close() - - -def is_db_available(): - return JLCPCB_DB.config.db_path.exists() +class TestPickerPerformanceBase(unittest.TestCase, ABC): + @abstractmethod + def add_pickers(self, module: Module): + pass -@unittest.skipIf(not is_db_available(), reason="Requires large db") -class TestPickerPerformanceJLCPCB(unittest.TestCase): def test_simple_full(self): # conclusions # - first pick overall is slow, need to load sqlite into buffer cache @@ -415,7 +444,7 @@ def c_builder(capacitance_pf: float): mods = resistors + caps + resistors_10k for mod in mods: - add_jlcpcb_pickers(mod) + self.add_pickers(mod) with timings.context("resistors"): for i, r in enumerate(resistors): @@ -445,6 +474,39 @@ def c_builder(capacitance_pf: float): print(timings) +def is_db_available(): + return JLCPCB_DB.config.db_path.exists() + + +@unittest.skipIf(not is_db_available(), reason="Requires large db") +class TestPickerJlcpcb(TestPickerBase): + def add_pickers(self, module): + add_jlcpcb_pickers(module) + + def tearDown(self): + # in test atexit not triggered, thus need to close DB manually + JLCPCB_DB.get().close() + + +@unittest.skipIf(not is_db_available(), reason="Requires large db") +class TestPickerPerformanceJlcpcb(TestPickerPerformanceBase): + def add_pickers(self, module): + add_jlcpcb_pickers(module) + + def tearDown(self): + JLCPCB_DB.get().close() + + +class TestPickerApi(TestPickerBase): + def add_pickers(self, module): + add_api_pickers(module) + + +class TestPickerPerformanceApi(TestPickerPerformanceBase): + def add_pickers(self, module): + add_api_pickers(module) + + if __name__ == "__main__": setup_basic_logging() logger.setLevel(logging.DEBUG)