From 8e658ac0cccefdac1520f5cc75b7cba906f0a1c8 Mon Sep 17 00:00:00 2001 From: Saelyos Date: Fri, 17 Nov 2023 15:28:20 +0100 Subject: [PATCH 1/3] Add custom pint wrapper --- roseau/load_flow/models/branches.py | 8 +- roseau/load_flow/models/buses.py | 14 +- roseau/load_flow/models/grounds.py | 2 +- roseau/load_flow/models/lines/lines.py | 18 +-- roseau/load_flow/models/lines/parameters.py | 18 +-- .../models/loads/flexible_parameters.py | 48 +++--- roseau/load_flow/models/loads/loads.py | 22 +-- roseau/load_flow/models/potential_refs.py | 2 +- roseau/load_flow/models/sources.py | 10 +- .../models/transformers/parameters.py | 24 +-- roseau/load_flow/tests/test_wrapper.py | 105 ++++++++++++ roseau/load_flow/units.py | 6 +- roseau/load_flow/wrapper.py | 152 ++++++++++++++++++ 13 files changed, 342 insertions(+), 87 deletions(-) create mode 100644 roseau/load_flow/tests/test_wrapper.py create mode 100644 roseau/load_flow/wrapper.py diff --git a/roseau/load_flow/models/branches.py b/roseau/load_flow/models/branches.py index 6e515fee..ea5fb888 100644 --- a/roseau/load_flow/models/branches.py +++ b/roseau/load_flow/models/branches.py @@ -80,7 +80,7 @@ def _res_currents_getter(self, warning: bool) -> tuple[ComplexArray, ComplexArra return self._res_getter(value=self._res_currents, warning=warning) @property - @ureg_wraps(("A", "A"), (None,), strict=False) + @ureg_wraps(("A", "A"), (None,)) def res_currents(self) -> tuple[Q_[ComplexArray], Q_[ComplexArray]]: """The load flow result of the branch currents (A).""" return self._res_currents_getter(warning=True) @@ -93,7 +93,7 @@ def _res_powers_getter(self, warning: bool) -> tuple[ComplexArray, ComplexArray] return powers1, powers2 @property - @ureg_wraps(("VA", "VA"), (None,), strict=False) + @ureg_wraps(("VA", "VA"), (None,)) def res_powers(self) -> tuple[Q_[ComplexArray], Q_[ComplexArray]]: """The load flow result of the branch powers (VA).""" return self._res_powers_getter(warning=True) @@ -104,7 +104,7 @@ def _res_potentials_getter(self, warning: bool) -> tuple[ComplexArray, ComplexAr return pot1, pot2 @property - @ureg_wraps(("V", "V"), (None,), strict=False) + @ureg_wraps(("V", "V"), (None,)) def res_potentials(self) -> tuple[Q_[ComplexArray], Q_[ComplexArray]]: """The load flow result of the branch potentials (V).""" return self._res_potentials_getter(warning=True) @@ -114,7 +114,7 @@ def _res_voltages_getter(self, warning: bool) -> tuple[ComplexArray, ComplexArra return calculate_voltages(pot1, self.phases1), calculate_voltages(pot2, self.phases2) @property - @ureg_wraps(("V", "V"), (None,), strict=False) + @ureg_wraps(("V", "V"), (None,)) def res_voltages(self) -> tuple[Q_[ComplexArray], Q_[ComplexArray]]: """The load flow result of the branch voltages (V).""" return self._res_voltages_getter(warning=True) diff --git a/roseau/load_flow/models/buses.py b/roseau/load_flow/models/buses.py index e2347db7..b84b256b 100644 --- a/roseau/load_flow/models/buses.py +++ b/roseau/load_flow/models/buses.py @@ -91,13 +91,13 @@ def __repr__(self) -> str: return f"{type(self).__name__}(id={self.id!r}, phases={self.phases!r})" @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def potentials(self) -> Q_[ComplexArray]: """An array of initial potentials of the bus (V).""" return self._potentials @potentials.setter - @ureg_wraps(None, (None, "V"), strict=False) + @ureg_wraps(None, (None, "V")) def potentials(self, value: ComplexArrayLike1D) -> None: if len(value) != len(self.phases): msg = f"Incorrect number of potentials: {len(value)} instead of {len(self.phases)}" @@ -110,7 +110,7 @@ def _res_potentials_getter(self, warning: bool) -> ComplexArray: return self._res_getter(value=self._res_potentials, warning=warning) @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def res_potentials(self) -> Q_[ComplexArray]: """The load flow result of the bus potentials (V).""" return self._res_potentials_getter(warning=True) @@ -120,7 +120,7 @@ def _res_voltages_getter(self, warning: bool) -> ComplexArray: return calculate_voltages(potentials, self.phases) @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def res_voltages(self) -> Q_[ComplexArray]: """The load flow result of the bus voltages (V). @@ -146,7 +146,7 @@ def min_voltage(self) -> Optional[Q_[float]]: return None if self._min_voltage is None else Q_(self._min_voltage, "V") @min_voltage.setter - @ureg_wraps(None, (None, "V"), strict=False) + @ureg_wraps(None, (None, "V")) def min_voltage(self, value: Optional[Union[float, Q_[float]]]) -> None: if value is not None and self._max_voltage is not None and value > self._max_voltage: msg = ( @@ -165,7 +165,7 @@ def max_voltage(self) -> Optional[Q_[float]]: return None if self._max_voltage is None else Q_(self._max_voltage, "V") @max_voltage.setter - @ureg_wraps(None, (None, "V"), strict=False) + @ureg_wraps(None, (None, "V")) def max_voltage(self, value: Optional[Union[float, Q_[float]]]) -> None: if value is not None and self._min_voltage is not None and value < self._min_voltage: msg = ( @@ -284,7 +284,7 @@ def get_connected_buses(self) -> Iterator[Id]: to_add = set(element._connected_elements).difference(visited) remaining.update(to_add) - @ureg_wraps("percent", (None,), strict=False) + @ureg_wraps("percent", (None,)) def res_voltage_unbalance(self) -> Q_[float]: """Calculate the voltage unbalance on this bus according to the IEC definition. diff --git a/roseau/load_flow/models/grounds.py b/roseau/load_flow/models/grounds.py index ab1f12c4..083534af 100644 --- a/roseau/load_flow/models/grounds.py +++ b/roseau/load_flow/models/grounds.py @@ -53,7 +53,7 @@ def _res_potential_getter(self, warning: bool) -> complex: return self._res_getter(self._res_potential, warning) @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def res_potential(self) -> Q_[complex]: """The load flow result of the ground potential (V).""" return self._res_potential_getter(warning=True) diff --git a/roseau/load_flow/models/lines/lines.py b/roseau/load_flow/models/lines/lines.py index 3fe052af..6a0a4d3e 100644 --- a/roseau/load_flow/models/lines/lines.py +++ b/roseau/load_flow/models/lines/lines.py @@ -225,12 +225,12 @@ def __init__( self._connect(self.ground) @property - @ureg_wraps("km", (None,), strict=False) + @ureg_wraps("km", (None,)) def length(self) -> Q_[float]: return self._length @length.setter - @ureg_wraps(None, (None, "km"), strict=False) + @ureg_wraps(None, (None, "km")) def length(self, value: Union[float, Q_[float]]) -> None: if value <= 0: msg = f"A line length must be greater than 0. {value:.2f} km provided." @@ -274,13 +274,13 @@ def parameters(self, value: LineParameters) -> None: self._invalidate_network_results() @property - @ureg_wraps("ohm", (None,), strict=False) + @ureg_wraps("ohm", (None,)) def z_line(self) -> Q_[ComplexArray]: """Impedance of the line in Ohm""" return self.parameters._z_line * self._length @property - @ureg_wraps("S", (None,), strict=False) + @ureg_wraps("S", (None,)) def y_shunt(self) -> Q_[ComplexArray]: """Shunt admittance of the line in Siemens""" return self.parameters._y_shunt * self._length @@ -307,7 +307,7 @@ def _res_series_currents_getter(self, warning: bool) -> ComplexArray: return i_line @property - @ureg_wraps("A", (None,), strict=False) + @ureg_wraps("A", (None,)) def res_series_currents(self) -> Q_[ComplexArray]: """Get the current in the series elements of the line (A).""" return self._res_series_currents_getter(warning=True) @@ -317,7 +317,7 @@ def _res_series_power_losses_getter(self, warning: bool) -> ComplexArray: return du_line * i_line.conj() # Sₗ = ΔU.Iₗ* @property - @ureg_wraps("VA", (None,), strict=False) + @ureg_wraps("VA", (None,)) def res_series_power_losses(self) -> Q_[ComplexArray]: """Get the power losses in the series elements of the line (VA).""" return self._res_series_power_losses_getter(warning=True) @@ -341,7 +341,7 @@ def _res_shunt_currents_getter(self, warning: bool) -> tuple[ComplexArray, Compl return cur1, cur2 @property - @ureg_wraps(("A", "A"), (None,), strict=False) + @ureg_wraps(("A", "A"), (None,)) def res_shunt_currents(self) -> tuple[Q_[ComplexArray], Q_[ComplexArray]]: """Get the currents in the shunt elements of the line (A).""" return self._res_shunt_currents_getter(warning=True) @@ -353,7 +353,7 @@ def _res_shunt_power_losses_getter(self, warning: bool) -> ComplexArray: return pot1 * cur1.conj() + pot2 * cur2.conj() @property - @ureg_wraps("VA", (None,), strict=False) + @ureg_wraps("VA", (None,)) def res_shunt_power_losses(self) -> Q_[ComplexArray]: """Get the power losses in the shunt elements of the line (VA).""" return self._res_shunt_power_losses_getter(warning=True) @@ -364,7 +364,7 @@ def _res_power_losses_getter(self, warning: bool) -> ComplexArray: return series_losses + shunt_losses @property - @ureg_wraps("VA", (None,), strict=False) + @ureg_wraps("VA", (None,)) def res_power_losses(self) -> Q_[ComplexArray]: """Get the power losses in the line (VA).""" return self._res_power_losses_getter(warning=True) diff --git a/roseau/load_flow/models/lines/parameters.py b/roseau/load_flow/models/lines/parameters.py index ca44b702..a2776fe9 100644 --- a/roseau/load_flow/models/lines/parameters.py +++ b/roseau/load_flow/models/lines/parameters.py @@ -39,7 +39,7 @@ class LineParameters(Identifiable, JsonMixin): rf"^({_type_re})_({_material_re})_{_section_re}$", flags=re.IGNORECASE ) - @ureg_wraps(None, (None, None, "ohm/km", "S/km", "A"), strict=False) + @ureg_wraps(None, (None, None, "ohm/km", "S/km", "A")) def __init__( self, id: Id, @@ -92,12 +92,12 @@ def __eq__(self, other: object) -> bool: ) @property - @ureg_wraps("ohm/km", (None,), strict=False) + @ureg_wraps("ohm/km", (None,)) def z_line(self) -> Q_[ComplexArray]: return self._z_line @property - @ureg_wraps("S/km", (None,), strict=False) + @ureg_wraps("S/km", (None,)) def y_shunt(self) -> Q_[ComplexArray]: return self._y_shunt @@ -111,14 +111,12 @@ def max_current(self) -> Optional[Q_[float]]: return None if self._max_current is None else Q_(self._max_current, "A") @max_current.setter - @ureg_wraps(None, (None, "A"), strict=False) + @ureg_wraps(None, (None, "A")) def max_current(self, value: Optional[Union[float, Q_[float]]]) -> None: self._max_current = value @classmethod - @ureg_wraps( - None, (None, None, "ohm/km", "ohm/km", "S/km", "S/km", "ohm/km", "ohm/km", "S/km", "S/km", "A"), strict=False - ) + @ureg_wraps(None, (None, None, "ohm/km", "ohm/km", "S/km", "S/km", "ohm/km", "ohm/km", "S/km", "S/km", "A")) def from_sym( cls, id: Id, @@ -297,7 +295,7 @@ def _sym_to_zy( return z_line, y_shunt @classmethod - @ureg_wraps(None, (None, None, None, None, None, "mm**2", "mm**2", "m", "m", "A"), strict=False) + @ureg_wraps(None, (None, None, None, None, None, "mm**2", "mm**2", "m", "m", "A")) def from_geometry( cls, id: Id, @@ -498,7 +496,7 @@ def _geometry_to_zy( "version. Use LineParameters.from_geometry() instead.", category=FutureWarning, ) - @ureg_wraps(None, (None, None, "mm²", "m", "mm", "A"), strict=False) + @ureg_wraps(None, (None, None, "mm²", "m", "mm", "A")) def from_name_lv( cls, name: str, @@ -567,7 +565,7 @@ def from_name_lv( ) @classmethod - @ureg_wraps(None, (None, None, "A"), strict=False) + @ureg_wraps(None, (None, None, "A")) def from_name_mv(cls, name: str, max_current: Optional[Union[float, Q_[float]]] = None) -> Self: """Method to get the electrical parameters of a MV line from its canonical name. diff --git a/roseau/load_flow/models/loads/flexible_parameters.py b/roseau/load_flow/models/loads/flexible_parameters.py index 44579c16..d687fbae 100644 --- a/roseau/load_flow/models/loads/flexible_parameters.py +++ b/roseau/load_flow/models/loads/flexible_parameters.py @@ -44,7 +44,7 @@ class Control(JsonMixin): _DEFAULT_ALPHA: float = 1000.0 - @ureg_wraps(None, (None, None, "V", "V", "V", "V", None), strict=False) + @ureg_wraps(None, (None, None, "V", "V", "V", "V", None)) def __init__( self, type: ControlType, @@ -152,25 +152,25 @@ def _check_values(self) -> None: raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_CONTROL_VALUE) @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def u_min(self) -> Q_[float]: """The minimum voltage i.e. the one the control reached the maximum action.""" return self._u_min @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def u_down(self) -> Q_[float]: """The voltage which starts to trigger the control (lower value).""" return self._u_down @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def u_up(self) -> Q_[float]: """TThe voltage which starts to trigger the control (upper value).""" return self._u_up @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def u_max(self) -> Q_[float]: """The maximum voltage i.e. the one the control reached its maximum action.""" return self._u_max @@ -187,7 +187,7 @@ def constant(cls) -> Self: return cls(type="constant", u_min=0.0, u_down=0.0, u_up=0.0, u_max=0.0) @classmethod - @ureg_wraps(None, (None, "V", "V", None), strict=False) + @ureg_wraps(None, (None, "V", "V", None)) def p_max_u_production( cls, u_up: Union[float, Q_[float]], u_max: Union[float, Q_[float]], alpha: float = _DEFAULT_ALPHA ) -> Self: @@ -218,7 +218,7 @@ def p_max_u_production( return cls(type="p_max_u_production", u_min=0.0, u_down=0.0, u_up=u_up, u_max=u_max, alpha=alpha) @classmethod - @ureg_wraps(None, (None, "V", "V", None), strict=False) + @ureg_wraps(None, (None, "V", "V", None)) def p_max_u_consumption( cls, u_min: Union[float, Q_[float]], u_down: Union[float, Q_[float]], alpha: float = _DEFAULT_ALPHA ) -> Self: @@ -249,7 +249,7 @@ def p_max_u_consumption( return cls(type="p_max_u_consumption", u_min=u_min, u_down=u_down, u_up=0.0, u_max=0.0, alpha=alpha) @classmethod - @ureg_wraps(None, (None, "V", "V", "V", "V", None), strict=False) + @ureg_wraps(None, (None, "V", "V", "V", "V", None)) def q_u( cls, u_min: Union[float, Q_[float]], @@ -458,7 +458,7 @@ class FlexibleParameter(JsonMixin): _control_class: type[Control] = Control _projection_class: type[Projection] = Projection - @ureg_wraps(None, (None, None, None, None, "VA", "VAr", "VAr"), strict=False) + @ureg_wraps(None, (None, None, None, None, "VA", "VAr", "VAr")) def __init__( self, control_p: Control, @@ -501,13 +501,13 @@ def __init__( self.q_max = q_max @property - @ureg_wraps("VA", (None,), strict=False) + @ureg_wraps("VA", (None,)) def s_max(self) -> Q_[float]: """The apparent power of the flexible load (VA). It is the radius of the feasible circle.""" return self._s_max @s_max.setter - @ureg_wraps(None, (None, "VA"), strict=False) + @ureg_wraps(None, (None, "VA")) def s_max(self, value: Union[float, Q_[float]]) -> None: if value <= 0: s_max = Q_(value, "VA") @@ -523,13 +523,13 @@ def s_max(self, value: Union[float, Q_[float]]) -> None: self._q_min = -self._s_max @property - @ureg_wraps("VAr", (None,), strict=False) + @ureg_wraps("VAr", (None,)) def q_min(self) -> Q_[float]: """The minimum reactive power of the flexible load (VAr).""" return self._q_min if self._q_min is not None else -self._s_max @q_min.setter - @ureg_wraps(None, (None, "VAr"), strict=False) + @ureg_wraps(None, (None, "VAr")) def q_min(self, value: Optional[Union[float, Q_[float]]]) -> None: if value is not None and value < -self._s_max: q_min = Q_(value, "VAr") @@ -544,13 +544,13 @@ def q_min(self, value: Optional[Union[float, Q_[float]]]) -> None: self._q_min = value @property - @ureg_wraps("VAr", (None,), strict=False) + @ureg_wraps("VAr", (None,)) def q_max(self) -> Q_[float]: """The maximum reactive power of the flexible load (VAr).""" return self._q_max if self._q_max is not None else self._s_max @q_max.setter - @ureg_wraps(None, (None, "VAr"), strict=False) + @ureg_wraps(None, (None, "VAr")) def q_max(self, value: Optional[Union[float, Q_[float]]]) -> None: if value is not None and value > self._s_max: q_max = Q_(value, "VAr") @@ -579,7 +579,7 @@ def constant(cls) -> Self: ) @classmethod - @ureg_wraps(None, (None, "V", "V", "VA", None, None, None, None), strict=False) + @ureg_wraps(None, (None, "V", "V", "VA", None, None, None, None)) def p_max_u_production( cls, u_up: Union[float, Q_[float]], @@ -635,7 +635,7 @@ def p_max_u_production( ) @classmethod - @ureg_wraps(None, (None, "V", "V", "VA", None, None, None, None), strict=False) + @ureg_wraps(None, (None, "V", "V", "VA", None, None, None, None)) def p_max_u_consumption( cls, u_min: Union[float, Q_[float]], @@ -688,7 +688,7 @@ def p_max_u_consumption( ) @classmethod - @ureg_wraps(None, (None, "V", "V", "V", "V", "VA", "Var", "Var", None, None, None, None), strict=False) + @ureg_wraps(None, (None, "V", "V", "V", "V", "VA", "Var", "Var", None, None, None, None)) def q_u( cls, u_min: Union[float, Q_[float]], @@ -762,7 +762,7 @@ def q_u( ) @classmethod - @ureg_wraps(None, (None, "V", "V", "V", "V", "V", "V", "VA", "VAr", "VAr", None, None, None, None), strict=False) + @ureg_wraps(None, (None, "V", "V", "V", "V", "V", "V", "VA", "VAr", "VAr", None, None, None, None)) def pq_u_production( cls, up_up: Union[float, Q_[float]], @@ -849,7 +849,7 @@ def pq_u_production( ) @classmethod - @ureg_wraps(None, (None, "V", "V", "V", "V", "V", "V", "VA", "VAr", "VAr", None, None, None, None), strict=False) + @ureg_wraps(None, (None, "V", "V", "V", "V", "V", "V", "VA", "VAr", "VAr", None, None, None, None)) def pq_u_consumption( cls, up_min: Union[float, Q_[float]], @@ -980,7 +980,7 @@ def results_from_dict(self, data: JsonDict) -> NoReturn: # # Equivalent Python method # - @ureg_wraps("VA", (None, None, "V", "VA", None), strict=False) + @ureg_wraps("VA", (None, None, "V", "VA", None)) def compute_powers( self, auth: Authentication, @@ -1035,7 +1035,7 @@ def _compute_powers( return np.array(res_flexible_powers, dtype=np.complex128) - @ureg_wraps((None, "VA"), (None, None, "V", "VA", None, None, None, "VA"), strict=False) + @ureg_wraps((None, "VA"), (None, None, "V", "VA", None, None, None, "VA")) def plot_pq( self, auth: Authentication, @@ -1148,7 +1148,7 @@ def plot_pq( return ax, res_flexible_powers - @ureg_wraps((None, "VA"), (None, None, "V", "VA", None, None, "VA"), strict=False) + @ureg_wraps((None, "VA"), (None, None, "V", "VA", None, None, "VA")) def plot_control_p( self, auth: Authentication, @@ -1214,7 +1214,7 @@ def plot_control_p( return ax, res_flexible_powers - @ureg_wraps((None, "VA"), (None, None, "V", "VA", None, None, "VA"), strict=False) + @ureg_wraps((None, "VA"), (None, None, "V", "VA", None, None, "VA")) def plot_control_q( self, auth: Authentication, diff --git a/roseau/load_flow/models/loads/loads.py b/roseau/load_flow/models/loads/loads.py index b178b767..dd92dc4d 100644 --- a/roseau/load_flow/models/loads/loads.py +++ b/roseau/load_flow/models/loads/loads.py @@ -98,7 +98,7 @@ def _res_currents_getter(self, warning: bool) -> ComplexArray: return self._res_getter(value=self._res_currents, warning=warning) @property - @ureg_wraps("A", (None,), strict=False) + @ureg_wraps("A", (None,)) def res_currents(self) -> Q_[ComplexArray]: """The load flow result of the load currents (A).""" return self._res_currents_getter(warning=True) @@ -122,7 +122,7 @@ def _res_potentials_getter(self, warning: bool) -> ComplexArray: return self.bus._get_potentials_of(self.phases, warning) @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def res_potentials(self) -> Q_[ComplexArray]: """The load flow result of the load potentials (V).""" return self._res_potentials_getter(warning=True) @@ -132,7 +132,7 @@ def _res_voltages_getter(self, warning: bool) -> ComplexArray: return calculate_voltages(potentials, self.phases) @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def res_voltages(self) -> Q_[ComplexArray]: """The load flow result of the load voltages (V).""" return self._res_voltages_getter(warning=True) @@ -143,7 +143,7 @@ def _res_powers_getter(self, warning: bool) -> ComplexArray: return pots * curs.conj() @property - @ureg_wraps("VA", (None,), strict=False) + @ureg_wraps("VA", (None,)) def res_powers(self) -> Q_[ComplexArray]: """The load flow result of the load powers (VA).""" return self._res_powers_getter(warning=True) @@ -265,13 +265,13 @@ def is_flexible(self) -> bool: return self._flexible_params is not None @property - @ureg_wraps("VA", (None,), strict=False) + @ureg_wraps("VA", (None,)) def powers(self) -> Q_[ComplexArray]: """The powers of the load (VA).""" return self._powers @powers.setter - @ureg_wraps(None, (None, "VA"), strict=False) + @ureg_wraps(None, (None, "VA")) def powers(self, value: ComplexArrayLike1D) -> None: value = self._validate_value(value) if self.is_flexible: @@ -309,7 +309,7 @@ def _res_flexible_powers_getter(self, warning: bool) -> ComplexArray: return self._res_getter(value=self._res_flexible_powers, warning=warning) @property - @ureg_wraps("VA", (None,), strict=False) + @ureg_wraps("VA", (None,)) def res_flexible_powers(self) -> Q_[ComplexArray]: """The load flow result of the load flexible powers (VA).""" return self._res_flexible_powers_getter(warning=True) @@ -375,13 +375,13 @@ def __init__( self.currents = currents # handles size checks and unit conversion @property - @ureg_wraps("A", (None,), strict=False) + @ureg_wraps("A", (None,)) def currents(self) -> Q_[ComplexArray]: """The currents of the load (Amps).""" return self._currents @currents.setter - @ureg_wraps(None, (None, "A"), strict=False) + @ureg_wraps(None, (None, "A")) def currents(self, value: ComplexArrayLike1D) -> None: self._currents = self._validate_value(value) self._invalidate_network_results() @@ -427,13 +427,13 @@ def __init__( self.impedances = impedances @property - @ureg_wraps("ohm", (None,), strict=False) + @ureg_wraps("ohm", (None,)) def impedances(self) -> Q_[ComplexArray]: """The impedances of the load (Ohms).""" return self._impedances @impedances.setter - @ureg_wraps(None, (None, "ohm"), strict=False) + @ureg_wraps(None, (None, "ohm")) def impedances(self, impedances: ComplexArrayLike1D) -> None: self._impedances = self._validate_value(impedances) self._invalidate_network_results() diff --git a/roseau/load_flow/models/potential_refs.py b/roseau/load_flow/models/potential_refs.py index cdca1565..daba1a7f 100644 --- a/roseau/load_flow/models/potential_refs.py +++ b/roseau/load_flow/models/potential_refs.py @@ -68,7 +68,7 @@ def _res_current_getter(self, warning: bool) -> complex: return self._res_getter(self._res_current, warning) @property - @ureg_wraps("A", (None,), strict=False) + @ureg_wraps("A", (None,)) def res_current(self) -> Q_[complex]: """The sum of the currents (A) of the connection associated to the potential reference. diff --git a/roseau/load_flow/models/sources.py b/roseau/load_flow/models/sources.py index 819a78b7..49a5e7bc 100644 --- a/roseau/load_flow/models/sources.py +++ b/roseau/load_flow/models/sources.py @@ -84,13 +84,13 @@ def __repr__(self) -> str: ) @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def voltages(self) -> Q_[ComplexArray]: """The voltages of the source (V).""" return self._voltages @voltages.setter - @ureg_wraps(None, (None, "V"), strict=False) + @ureg_wraps(None, (None, "V")) def voltages(self, voltages: ComplexArrayLike1D) -> None: if len(voltages) != self._size: msg = f"Incorrect number of voltages: {len(voltages)} instead of {self._size}" @@ -108,7 +108,7 @@ def _res_currents_getter(self, warning: bool) -> ComplexArray: return self._res_getter(value=self._res_currents, warning=warning) @property - @ureg_wraps("A", (None,), strict=False) + @ureg_wraps("A", (None,)) def res_currents(self) -> Q_[ComplexArray]: """The load flow result of the source currents (A).""" return self._res_currents_getter(warning=True) @@ -118,7 +118,7 @@ def _res_potentials_getter(self, warning: bool) -> ComplexArray: return self.bus._get_potentials_of(self.phases, warning) @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def res_potentials(self) -> Q_[ComplexArray]: """The load flow result of the source potentials (V).""" return self._res_potentials_getter(warning=True) @@ -129,7 +129,7 @@ def _res_powers_getter(self, warning: bool) -> ComplexArray: return pots * curs.conj() @property - @ureg_wraps("VA", (None,), strict=False) + @ureg_wraps("VA", (None,)) def res_powers(self) -> Q_[ComplexArray]: """The load flow result of the source powers (VA).""" return self._res_powers_getter(warning=True) diff --git a/roseau/load_flow/models/transformers/parameters.py b/roseau/load_flow/models/transformers/parameters.py index 2e389a0a..31ec8d07 100644 --- a/roseau/load_flow/models/transformers/parameters.py +++ b/roseau/load_flow/models/transformers/parameters.py @@ -37,7 +37,7 @@ class TransformerParameters(Identifiable, JsonMixin, CatalogueMixin[pd.DataFrame ) """The pattern to extract the winding of the primary and of the secondary of the transformer.""" - @ureg_wraps(None, (None, None, None, "V", "V", "VA", "W", "", "W", "", "VA"), strict=False) + @ureg_wraps(None, (None, None, None, "V", "V", "VA", "W", "", "W", "", "VA")) def __init__( self, id: Id, @@ -156,43 +156,43 @@ def __eq__(self, other: object) -> bool: ) @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def uhv(self) -> Q_[float]: """Phase-to-phase nominal voltages of the high voltages side (V)""" return self._uhv @property - @ureg_wraps("V", (None,), strict=False) + @ureg_wraps("V", (None,)) def ulv(self) -> Q_[float]: """Phase-to-phase nominal voltages of the low voltages side (V)""" return self._ulv @property - @ureg_wraps("VA", (None,), strict=False) + @ureg_wraps("VA", (None,)) def sn(self) -> Q_[float]: """The nominal power of the transformer (VA)""" return self._sn @property - @ureg_wraps("W", (None,), strict=False) + @ureg_wraps("W", (None,)) def p0(self) -> Q_[float]: """Losses during off-load test (W)""" return self._p0 @property - @ureg_wraps("", (None,), strict=False) + @ureg_wraps("", (None,)) def i0(self) -> Q_[float]: """Current during off-load test (%)""" return self._i0 @property - @ureg_wraps("W", (None,), strict=False) + @ureg_wraps("W", (None,)) def psc(self) -> Q_[float]: """Losses during short-circuit test (W)""" return self._psc @property - @ureg_wraps("", (None,), strict=False) + @ureg_wraps("", (None,)) def vsc(self) -> Q_[float]: """Voltages on LV side during short-circuit test (%)""" return self._vsc @@ -203,11 +203,11 @@ def max_power(self) -> Optional[Q_[float]]: return None if self._max_power is None else Q_(self._max_power, "VA") @max_power.setter - @ureg_wraps(None, (None, "VA"), strict=False) + @ureg_wraps(None, (None, "VA")) def max_power(self, value: Optional[Union[float, Q_[float]]]) -> None: self._max_power = value - @ureg_wraps(("ohm", "S", "", None), (None,), strict=False) + @ureg_wraps(("ohm", "S", "", None), (None,)) def to_zyk(self) -> tuple[Q_[complex], Q_[complex], Q_[float], float]: """Compute the transformer parameters ``z2``, ``ym``, ``k`` and ``orientation`` mandatory for some models. @@ -315,7 +315,7 @@ def catalogue_data(cls) -> pd.DataFrame: return pd.read_csv(cls.catalogue_path() / "Catalogue.csv") @classmethod - @ureg_wraps(None, (None, None, None, None, None, None, "VA", "V", "V"), strict=False) + @ureg_wraps(None, (None, None, None, None, None, None, "VA", "V", "V")) def from_catalogue( cls, id: Optional[Union[str, re.Pattern[str]]] = None, @@ -460,7 +460,7 @@ def from_catalogue( return cls.from_json(path=path) @classmethod - @ureg_wraps(None, (None, None, None, None, None, None, "VA", "V", "V"), strict=False) + @ureg_wraps(None, (None, None, None, None, None, None, "VA", "V", "V")) def print_catalogue( cls, id: Optional[Union[str, re.Pattern[str]]] = None, diff --git a/roseau/load_flow/tests/test_wrapper.py b/roseau/load_flow/tests/test_wrapper.py new file mode 100644 index 00000000..89f5c9d2 --- /dev/null +++ b/roseau/load_flow/tests/test_wrapper.py @@ -0,0 +1,105 @@ +import operator + +import pytest +from pint import DimensionalityError + +from roseau.load_flow import ureg +from roseau.load_flow.units import ureg_wraps + + +def test_wraps(): + def func(x): + return x + + with pytest.raises(TypeError): + ureg_wraps((3 * ureg.meter, [None])) + with pytest.raises(TypeError): + ureg_wraps((None, [3 * ureg.meter])) + + f0 = ureg_wraps(None, [None])(func) + assert f0(3.0) == 3.0 + + f0 = ureg_wraps(None, None)(func) + assert f0(3.0) == 3.0 + + f1 = ureg_wraps(None, ["meter"])(func) + assert f1(3.0 * ureg.centimeter) == 0.03 + assert f1(3.0 * ureg.meter) == 3.0 + with pytest.raises(DimensionalityError): + f1(3 * ureg.second) + + f1b = ureg_wraps(None, [ureg.meter])(func) + assert f1b(3.0 * ureg.centimeter) == 0.03 + assert f1b(3.0 * ureg.meter) == 3.0 + with pytest.raises(DimensionalityError): + f1b(3 * ureg.second) + + f1c = ureg_wraps("meter", [ureg.meter])(func) + assert f1c(3.0 * ureg.centimeter) == 0.03 * ureg.meter + assert f1c(3.0 * ureg.meter) == 3.0 * ureg.meter + with pytest.raises(DimensionalityError): + f1c(3 * ureg.second) + + f1d = ureg_wraps(ureg.meter, [ureg.meter])(func) + assert f1d(3.0 * ureg.centimeter) == 0.03 * ureg.meter + assert f1d(3.0 * ureg.meter) == 3.0 * ureg.meter + with pytest.raises(DimensionalityError): + f1d(3 * ureg.second) + + f1 = ureg_wraps(None, "meter")(func) + assert f1(3.0 * ureg.centimeter) == 0.03 + assert f1(3.0 * ureg.meter) == 3.0 + with pytest.raises(DimensionalityError): + f1(3 * ureg.second) + + f2 = ureg_wraps("centimeter", ["meter"])(func) + assert f2(3.0 * ureg.centimeter) == 0.03 * ureg.centimeter + assert f2(3.0 * ureg.meter) == 3 * ureg.centimeter + assert f2(3) == 3 * ureg.centimeter + + gfunc = operator.add + + g0 = ureg_wraps(None, [None, None])(gfunc) + assert g0(3, 1) == 4 + + g1 = ureg_wraps(None, ["meter", "centimeter"])(gfunc) + assert g1(3 * ureg.meter, 1 * ureg.centimeter) == 4 + assert g1(3 * ureg.meter, 1 * ureg.meter) == 3 + 100 + + def hfunc(x, y): + return x, y + + h0 = ureg_wraps(None, [None, None])(hfunc) + assert h0(3, 1) == (3, 1) + + h1 = ureg_wraps(["meter", "centimeter"], [None, None])(hfunc) + assert h1(3, 1) == [3 * ureg.meter, 1 * ureg.cm] + + h2 = ureg_wraps(("meter", "centimeter"), [None, None])(hfunc) + assert h2(3, 1) == (3 * ureg.meter, 1 * ureg.cm) + + h3 = ureg_wraps((None,), (None, None))(hfunc) + assert h3(3, 1) == (3, 1) + + def kfunc(a, /, b, c=5, *, d=6): + return a, b, c, d + + k1 = ureg_wraps((None,), (None, None, None, None))(kfunc) + assert k1(1, 2, 3, d=4) == (1, 2, 3, 4) + assert k1(1, 2, c=3, d=4) == (1, 2, 3, 4) + assert k1(1, b=2, c=3, d=4) == (1, 2, 3, 4) + assert k1(1, d=4, b=2, c=3) == (1, 2, 3, 4) + assert k1(1, 2, c=3) == (1, 2, 3, 6) + assert k1(1, 2, d=4) == (1, 2, 5, 4) + assert k1(1, 2) == (1, 2, 5, 6) + + k2 = ureg_wraps((None,), ("meter", "centimeter", "meter", "centimeter"))(kfunc) + assert k2(1 * ureg.meter, 2 * ureg.centimeter, 3 * ureg.meter, d=4 * ureg.centimeter) == (1, 2, 3, 4) + + def lfunc(a): + return a[0] + + l1 = ureg_wraps("centimeter", ("meter",))(lfunc) + assert l1([1, 2]) == 1 * ureg.centimeter + assert l1([1, 2] * ureg.meter) == 1 * ureg.centimeter + assert l1([1 * ureg.meter, 2 * ureg.meter]) == 1 * ureg.centimeter diff --git a/roseau/load_flow/units.py b/roseau/load_flow/units.py index a1e5ec51..c05896ff 100644 --- a/roseau/load_flow/units.py +++ b/roseau/load_flow/units.py @@ -28,6 +28,8 @@ from pint.facets.plain import PlainQuantity from typing_extensions import TypeAlias +from roseau.load_flow.wrapper import wraps + T = TypeVar("T") FuncT = TypeVar("FuncT", bound=Callable) @@ -53,8 +55,6 @@ def ureg_wraps( """Wraps a function to become pint-aware. Args: - ureg: - a UnitRegistry instance. ret: Units of each of the return values. Use `None` to skip argument conversion. args: @@ -62,4 +62,4 @@ def ureg_wraps( strict: Indicates that only quantities are accepted. (Default value = True) """ - return ureg.wraps(ret, args, strict) + return wraps(ureg, ret, args) diff --git a/roseau/load_flow/wrapper.py b/roseau/load_flow/wrapper.py new file mode 100644 index 00000000..98f76252 --- /dev/null +++ b/roseau/load_flow/wrapper.py @@ -0,0 +1,152 @@ +import collections +import functools +from collections.abc import Iterable +from inspect import Parameter, Signature, signature +from itertools import zip_longest +from typing import Any, Callable, Optional, TypeVar, Union + +from pint import Quantity, Unit +from pint.registry import UnitRegistry +from pint.util import to_units_container + +T = TypeVar("T") +FuncT = TypeVar("FuncT", bound=Callable) + + +def _parse_wrap_args(args: Iterable[Optional[Union[str, Unit]]]) -> Callable: + """Create a converter function for the wrapper""" + # Arguments which have units. + unit_args_ndx = set() + + # _to_units_container + args_as_uc = [to_units_container(arg) for arg in args] + + # Check for references in args, remove None values + for ndx_, arg in enumerate(args_as_uc): + if arg is not None: + unit_args_ndx.add(ndx_) + + def _converter(ureg: UnitRegistry, sig: Signature, values: list[Any], kw: dict[Any]): + len_initial_values = len(values) + + # pack kwargs + for i, param_name in enumerate(sig.parameters): + if i >= len_initial_values: + values.append(kw[param_name]) + + # convert arguments + for ndx in unit_args_ndx: + if isinstance(values[ndx], ureg.Quantity): + values[ndx] = ureg.convert(values[ndx].magnitude, values[ndx].units, args_as_uc[ndx]) + elif isinstance(values[ndx], collections.abc.MutableSequence): + for i, val in enumerate(values[ndx]): + if isinstance(val, ureg.Quantity): + values[ndx][i] = ureg.convert(val.magnitude, val.units, args_as_uc[ndx]) + + # unpack kwargs + for i, param_name in enumerate(sig.parameters): + if i >= len_initial_values: + kw[param_name] = values[i] + + return values[:len_initial_values], kw + + return _converter + + +def _apply_defaults(sig: Signature, args: tuple[Any], kwargs: dict[Any]) -> tuple[list[Any], dict[Any]]: + """Apply default keyword arguments. + + Named keywords may have been left blank. This function applies the default + values so that every argument is defined. + """ + for i, param in enumerate(sig.parameters.values()): + if i >= len(args) and param.default != Parameter.empty and param.name not in kwargs: + kwargs[param.name] = param.default + return list(args), kwargs + + +def wraps( + ureg: UnitRegistry, + ret: Optional[Union[str, Unit, Iterable[Optional[Union[str, Unit]]]]], + args: Optional[Union[str, Unit, Iterable[Optional[Union[str, Unit]]]]], +) -> Callable[[FuncT], FuncT]: + """Wraps a function to become pint-aware. + + Use it when a function requires a numerical value but in some specific + units. The wrapper function will take a pint quantity, convert to the units + specified in `args` and then call the wrapped function with the resulting + magnitude. + + The value returned by the wrapped function will be converted to the units + specified in `ret`. + + Args: + ureg: + A UnitRegistry instance. + + ret: + Units of each of the return values. Use `None` to skip argument conversion. + + args: + Units of each of the input arguments. Use `None` to skip argument conversion. + + Returns: + The wrapper function. + + Raises: + TypeError + if the number of given arguments does not match the number of function parameters. + if any of the provided arguments is not a unit a string or Quantity + """ + if not isinstance(args, (list, tuple)): + args = (args,) + + for arg in args: + if arg is not None and not isinstance(arg, (ureg.Unit, str)): + raise TypeError(f"wraps arguments must by of type str or Unit, not {type(arg)} ({arg})") + + converter = _parse_wrap_args(args) + + is_ret_container = isinstance(ret, (list, tuple)) + if is_ret_container: + for arg in ret: + if arg is not None and not isinstance(arg, (ureg.Unit, str)): + raise TypeError(f"wraps 'ret' argument must by of type str or Unit, not {type(arg)} ({arg})") + ret = ret.__class__([to_units_container(arg, ureg) for arg in ret]) + else: + if ret is not None and not isinstance(ret, (ureg.Unit, str)): + raise TypeError(f"wraps 'ret' argument must by of type str or Unit, not {type(ret)} ({ret})") + ret = to_units_container(ret, ureg) + + def decorator(func: Callable[..., Any]) -> Callable[..., Quantity]: + sig = signature(func) + count_params = len(sig.parameters) + if len(args) != count_params: + raise TypeError(f"{func.__name__} takes {count_params} parameters, but {len(args)} units were passed") + + assigned = tuple(attr for attr in functools.WRAPPER_ASSIGNMENTS if hasattr(func, attr)) + updated = tuple(attr for attr in functools.WRAPPER_UPDATES if hasattr(func, attr)) + + @functools.wraps(func, assigned=assigned, updated=updated) + def wrapper(*values, **kw) -> Quantity: + values, kw = _apply_defaults(sig, values, kw) + + # In principle, the values are used as is + # When then extract the magnitudes when needed. + new_values, new_kw = converter(ureg, sig, values, kw) + + result = func(*new_values, **new_kw) + + if is_ret_container: + return ret.__class__( + res if unit is None else ureg.Quantity(res, unit) for unit, res in zip_longest(ret, result) + ) + + if ret is None: + return result + + return ureg.Quantity(result, ret) + + return wrapper + + return decorator From e10880d6c167d134ae2bdde9dbfa9e4135755caf Mon Sep 17 00:00:00 2001 From: Saelyos Date: Fri, 17 Nov 2023 16:04:45 +0100 Subject: [PATCH 2/3] Update changelog --- doc/Changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/Changelog.md b/doc/Changelog.md index 74a08479..75870b35 100644 --- a/doc/Changelog.md +++ b/doc/Changelog.md @@ -4,6 +4,7 @@ **In development** +- {gh-pr}`149` {gh-issue}`145` Add custom pint wrapper for better handling of pint arrays. - {gh-pr}`148` {gh-issue}`122` deprecate `LineParameters.from_name_lv()` in favor of the more generic `LineParameters.from_geometry()`. The method will be removed in a future release. - {gh-pr}`142` {gh-issue}`136` Add `Bus.res_voltage_unbalance()` method to get the Voltage Unbalance From 94073bdbb4337e47a8f5b68cf7de7fb12d7b8fec Mon Sep 17 00:00:00 2001 From: Saelyos Date: Fri, 17 Nov 2023 16:11:35 +0100 Subject: [PATCH 3/3] Make wrapper private, minor changes --- roseau/load_flow/{wrapper.py => _wrapper.py} | 26 +++++++++----------- roseau/load_flow/units.py | 2 +- 2 files changed, 12 insertions(+), 16 deletions(-) rename roseau/load_flow/{wrapper.py => _wrapper.py} (86%) diff --git a/roseau/load_flow/wrapper.py b/roseau/load_flow/_wrapper.py similarity index 86% rename from roseau/load_flow/wrapper.py rename to roseau/load_flow/_wrapper.py index 98f76252..3f0153f7 100644 --- a/roseau/load_flow/wrapper.py +++ b/roseau/load_flow/_wrapper.py @@ -1,6 +1,5 @@ -import collections import functools -from collections.abc import Iterable +from collections.abc import Iterable, MutableSequence from inspect import Parameter, Signature, signature from itertools import zip_longest from typing import Any, Callable, Optional, TypeVar, Union @@ -15,16 +14,11 @@ def _parse_wrap_args(args: Iterable[Optional[Union[str, Unit]]]) -> Callable: """Create a converter function for the wrapper""" - # Arguments which have units. - unit_args_ndx = set() - # _to_units_container args_as_uc = [to_units_container(arg) for arg in args] # Check for references in args, remove None values - for ndx_, arg in enumerate(args_as_uc): - if arg is not None: - unit_args_ndx.add(ndx_) + unit_args_ndx = {ndx for ndx, arg in enumerate(args_as_uc) if arg is not None} def _converter(ureg: UnitRegistry, sig: Signature, values: list[Any], kw: dict[Any]): len_initial_values = len(values) @@ -36,12 +30,13 @@ def _converter(ureg: UnitRegistry, sig: Signature, values: list[Any], kw: dict[A # convert arguments for ndx in unit_args_ndx: - if isinstance(values[ndx], ureg.Quantity): - values[ndx] = ureg.convert(values[ndx].magnitude, values[ndx].units, args_as_uc[ndx]) - elif isinstance(values[ndx], collections.abc.MutableSequence): - for i, val in enumerate(values[ndx]): + value = values[ndx] + if isinstance(value, ureg.Quantity): + values[ndx] = ureg.convert(value.magnitude, value.units, args_as_uc[ndx]) + elif isinstance(value, MutableSequence): + for i, val in enumerate(value): if isinstance(val, ureg.Quantity): - values[ndx][i] = ureg.convert(val.magnitude, val.units, args_as_uc[ndx]) + value[i] = ureg.convert(val.magnitude, val.units, args_as_uc[ndx]) # unpack kwargs for i, param_name in enumerate(sig.parameters): @@ -53,14 +48,15 @@ def _converter(ureg: UnitRegistry, sig: Signature, values: list[Any], kw: dict[A return _converter -def _apply_defaults(sig: Signature, args: tuple[Any], kwargs: dict[Any]) -> tuple[list[Any], dict[Any]]: +def _apply_defaults(sig: Signature, args: tuple[Any], kwargs: dict[str, Any]) -> tuple[list[Any], dict[str, Any]]: """Apply default keyword arguments. Named keywords may have been left blank. This function applies the default values so that every argument is defined. """ + n = len(args) for i, param in enumerate(sig.parameters.values()): - if i >= len(args) and param.default != Parameter.empty and param.name not in kwargs: + if i >= n and param.default != Parameter.empty and param.name not in kwargs: kwargs[param.name] = param.default return list(args), kwargs diff --git a/roseau/load_flow/units.py b/roseau/load_flow/units.py index c05896ff..f580f0e9 100644 --- a/roseau/load_flow/units.py +++ b/roseau/load_flow/units.py @@ -28,7 +28,7 @@ from pint.facets.plain import PlainQuantity from typing_extensions import TypeAlias -from roseau.load_flow.wrapper import wraps +from roseau.load_flow._wrapper import wraps T = TypeVar("T") FuncT = TypeVar("FuncT", bound=Callable)