diff --git a/utils/measure/.env.dist b/utils/measure/.env.dist index d0acf2bfa..268f87664 100644 --- a/utils/measure/.env.dist +++ b/utils/measure/.env.dist @@ -93,4 +93,5 @@ MYSTROM_DEVICE_IP=x.x.x.x # MODEL_NAME=xx # DUMMY_LOAD=true # POWERMETER_ENTITY_ID=sensor.my_power +# VOLTAGEMETER_ENTITY_ID=sensor.my_voltage # RESUME=true diff --git a/utils/measure/measure/const.py b/utils/measure/measure/const.py index dcfa2a647..24fe0dfd7 100644 --- a/utils/measure/measure/const.py +++ b/utils/measure/measure/const.py @@ -1,5 +1,5 @@ import os -from enum import Enum +from enum import Enum, StrEnum from pathlib import Path QUESTION_GENERATE_MODEL_JSON = "generate_model_json" @@ -21,3 +21,14 @@ class MeasureType(str, Enum): RECORDER = "Recorder" AVERAGE = "Average" CHARGING = "Charging device" + + +class Trend(StrEnum): + INCREASING = "increasing" + DECREASING = "decreasing" + STEADY = "steady" + + +dummy_load_measurement_count = 20 +dummy_load_measurements_duration = 30 +RETRY_COUNT_LIMIT = 100 diff --git a/utils/measure/measure/powermeter/const.py b/utils/measure/measure/powermeter/const.py index ea17a571d..808bf9ebf 100644 --- a/utils/measure/measure/powermeter/const.py +++ b/utils/measure/measure/powermeter/const.py @@ -14,3 +14,4 @@ class PowerMeterType(str, Enum): QUESTION_POWERMETER_ENTITY_ID = "powermeter_entity_id" +QUESTION_VOLTAGEMETER_ENTITY_ID = "voltagemeter_entity_id" diff --git a/utils/measure/measure/powermeter/dummy.py b/utils/measure/measure/powermeter/dummy.py index 83da7e42a..a215e04a6 100644 --- a/utils/measure/measure/powermeter/dummy.py +++ b/utils/measure/measure/powermeter/dummy.py @@ -7,8 +7,17 @@ class DummyPowerMeter(PowerMeter): - def get_power(self) -> PowerMeasurementResult: - return PowerMeasurementResult(20.5, time.time()) + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult: + if include_voltage: + return PowerMeasurementResult( + power=20.5, + voltage=233.0, + updated=time.time(), + ) + return PowerMeasurementResult(power=20.5, updated=time.time()) + + def has_voltage_support(self) -> bool: + return True def process_answers(self, answers: dict[str, Any]) -> None: pass diff --git a/utils/measure/measure/powermeter/errors.py b/utils/measure/measure/powermeter/errors.py index a4410c173..1d5458235 100644 --- a/utils/measure/measure/powermeter/errors.py +++ b/utils/measure/measure/powermeter/errors.py @@ -12,3 +12,7 @@ class ZeroReadingError(PowerMeterError): class ApiConnectionError(PowerMeterError): pass + + +class UnsupportedFeatureError(PowerMeterError): + pass diff --git a/utils/measure/measure/powermeter/hass.py b/utils/measure/measure/powermeter/hass.py index e27360fef..cc2d7ca9a 100644 --- a/utils/measure/measure/powermeter/hass.py +++ b/utils/measure/measure/powermeter/hass.py @@ -4,10 +4,11 @@ from typing import Any import inquirer -from homeassistant_api import Client +from homeassistant_api import Client, Entity -from measure.powermeter.const import QUESTION_POWERMETER_ENTITY_ID -from measure.powermeter.errors import PowerMeterError +from measure.const import QUESTION_DUMMY_LOAD +from measure.powermeter.const import QUESTION_POWERMETER_ENTITY_ID, QUESTION_VOLTAGEMETER_ENTITY_ID +from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter @@ -15,12 +16,15 @@ class HassPowerMeter(PowerMeter): def __init__(self, api_url: str, token: str, call_update_entity: bool) -> None: self._call_update_entity = call_update_entity self._entity_id: str | None = None + self._voltage_entity_id: str | None = None + self._entities: list[Entity] | None = None try: self.client = Client(api_url, token, cache_session=False) except Exception as e: raise PowerMeterError(f"Failed to connect to HA API: {e}") from e - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult: + """Get a new power reading from Hass-API. Optionally include voltage.""" if self._call_update_entity: self.client.trigger_service( "homeassistant", @@ -33,24 +37,100 @@ def get_power(self) -> PowerMeasurementResult: if state == "unavailable": raise PowerMeterError(f"Power sensor {self._entity_id} unavailable") last_updated = state.last_updated.timestamp() - return PowerMeasurementResult(float(state.state), last_updated) + power_value = float(state.state) + + if include_voltage and not self.has_voltage_support(): + raise UnsupportedFeatureError("Voltage sensor entity not found.") + + if include_voltage: + voltage_state = self.client.get_state(entity_id=self._voltage_entity_id) + if voltage_state == "unavailable": + raise PowerMeterError(f"Voltage sensor {self._voltage_entity_id} unavailable") + + voltage_value = float(voltage_state.state) + return PowerMeasurementResult( + power=power_value, + voltage=voltage_value, + updated=last_updated, + ) + + return PowerMeasurementResult(power=power_value, updated=last_updated) + + def has_voltage_support(self) -> bool: + if not self._voltage_entity_id: + return False + + voltage_state = self.client.get_state(entity_id=self._voltage_entity_id) + if voltage_state == "unavailable": + raise PowerMeterError(f"Voltage sensor {self._voltage_entity_id} unavailable") + return True + + def autodetect_voltage_entity(self, power_entity: str) -> bool: + """Try to find a matching voltage entity for the current power entity.""" + matched_sensors = self.match_power_and_voltage_sensors() + + if not matched_sensors or power_entity not in matched_sensors: + # no match found for our power sensor + return False + + self._voltage_entity_id = matched_sensors.get(power_entity) + return True def get_questions(self) -> list[inquirer.questions.Question]: - power_sensor_list = self.get_power_sensors() + def _should_skip_voltage_sensor_question(answers: dict[str, Any]) -> bool: + """Determine if the voltage sensor question should be asked.""" + if not answers.get(QUESTION_DUMMY_LOAD, False): + return True + return self.autodetect_voltage_entity(answers.get(QUESTION_POWERMETER_ENTITY_ID)) + power_sensor_list = self.get_power_sensors() return [ inquirer.List( name=QUESTION_POWERMETER_ENTITY_ID, message="Select the powermeter", choices=power_sensor_list, ), + inquirer.List( + name=QUESTION_VOLTAGEMETER_ENTITY_ID, + message="Select the voltage sensor", + choices=lambda answers: self.get_voltage_sensors(), + ignore=_should_skip_voltage_sensor_question, + ), ] def get_power_sensors(self) -> list[str]: - entities = self.client.get_entities() - sensors = entities["sensor"].entities.values() - power_sensors = [entity.entity_id for entity in sensors if entity.state.attributes.get("unit_of_measurement") == "W"] - return sorted(power_sensors) + return self.get_entities_by_unit_of_measurement("W") + + def get_voltage_sensors(self) -> list[str]: + return self.get_entities_by_unit_of_measurement("V") + + def get_entities_by_unit_of_measurement(self, unit_of_measurement: str) -> list[str]: + return sorted( + [entity.entity_id for entity in self.get_entities() if entity.state.attributes.get("unit_of_measurement") == unit_of_measurement], + ) + + def get_entities(self) -> list[Entity]: + if not self._entities: + entities = self.client.get_entities() + self._entities = list(entities["sensor"].entities.values()) + return self._entities + + def match_power_and_voltage_sensors(self) -> dict[str, str]: + power_sensors = self.get_power_sensors() + voltage_sensors = self.get_voltage_sensors() + + # Create mappings based on base names + power_map = {sensor.rsplit("_power", 1)[0]: sensor for sensor in power_sensors} + voltage_map = {sensor.rsplit("_voltage", 1)[0]: sensor for sensor in voltage_sensors} + + matched_sensors = {} + for base_name, power_sensor in power_map.items(): + if base_name in voltage_map: + matched_sensors[power_sensor] = voltage_map[base_name] + + return matched_sensors def process_answers(self, answers: dict[str, Any]) -> None: self._entity_id = answers[QUESTION_POWERMETER_ENTITY_ID] + if QUESTION_VOLTAGEMETER_ENTITY_ID in answers: + self._voltage_entity_id = answers[QUESTION_VOLTAGEMETER_ENTITY_ID] diff --git a/utils/measure/measure/powermeter/kasa.py b/utils/measure/measure/powermeter/kasa.py index 051dab1ba..3879a3565 100644 --- a/utils/measure/measure/powermeter/kasa.py +++ b/utils/measure/measure/powermeter/kasa.py @@ -6,6 +6,7 @@ from kasa import SmartPlug +from measure.powermeter.errors import UnsupportedFeatureError from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter @@ -13,14 +14,23 @@ class KasaPowerMeter(PowerMeter): def __init__(self, device_ip: str) -> None: self._smartplug = SmartPlug(device_ip) - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult: + """Get a new power reading from the Kasa device. Optionally include voltage (FIXME: not yet implemented).""" + if include_voltage: + # FIXME: Not yet implemented # noqa: FIX001 + raise UnsupportedFeatureError("Voltage measurement is not yet implemented for Kasa devices.") + loop = asyncio.get_event_loop() power = loop.run_until_complete(self.async_read_power_meter()) - return PowerMeasurementResult(power, time.time()) + + return PowerMeasurementResult(power=power, updated=time.time()) async def async_read_power_meter(self) -> None: await self._smartplug.update() return self._smartplug.emeter_realtime["power"] + def has_voltage_support(self) -> bool: + return False + def process_answers(self, answers: dict[str, Any]) -> None: pass diff --git a/utils/measure/measure/powermeter/manual.py b/utils/measure/measure/powermeter/manual.py index 1045437da..925085b93 100644 --- a/utils/measure/measure/powermeter/manual.py +++ b/utils/measure/measure/powermeter/manual.py @@ -7,9 +7,23 @@ class ManualPowerMeter(PowerMeter): - def get_power(self) -> PowerMeasurementResult: - power = input("Input power measurement:") - return PowerMeasurementResult(float(power), time.time()) + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult: + """Manually enter power readings. Optionally enter voltage readings as well.""" + if include_voltage: + power = input("Input power measurement: ") + voltage = input("Input voltage measurement: ") + + return PowerMeasurementResult( + power=float(power), + voltage=float(voltage), + updated=time.time(), + ) + + power = input("Input power measurement: ") + return PowerMeasurementResult(power=float(power), updated=time.time()) + + def has_voltage_support(self) -> bool: + return True def process_answers(self, answers: dict[str, Any]) -> None: pass diff --git a/utils/measure/measure/powermeter/mystrom.py b/utils/measure/measure/powermeter/mystrom.py index 3fa1f4f9b..d1d06b42a 100644 --- a/utils/measure/measure/powermeter/mystrom.py +++ b/utils/measure/measure/powermeter/mystrom.py @@ -5,7 +5,7 @@ import requests -from measure.powermeter.errors import PowerMeterError +from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter @@ -13,7 +13,12 @@ class MyStromPowerMeter(PowerMeter): def __init__(self, device_ip: str) -> None: self._device_ip = device_ip - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult: + """Get a new power reading from the MyStrom device. Optionally include voltage (FIXME: not yet implemented).""" + if include_voltage: + # FIXME: Not yet implemented # noqa: FIX001 + raise UnsupportedFeatureError("Voltage measurement is not yet implemented for MyStrom devices.") + r = requests.get( f"http://{self._device_ip}/report", timeout=10, @@ -25,7 +30,10 @@ def get_power(self) -> PowerMeasurementResult: except KeyError as error: raise PowerMeterError("Unexpected JSON response format") from error - return PowerMeasurementResult(float(power), time.time()) + return PowerMeasurementResult(power=float(power), updated=time.time()) + + def has_voltage_support(self) -> bool: + return False def process_answers(self, answers: dict[str, Any]) -> None: pass diff --git a/utils/measure/measure/powermeter/ocr.py b/utils/measure/measure/powermeter/ocr.py index 1dd4c581b..4f75c5cca 100644 --- a/utils/measure/measure/powermeter/ocr.py +++ b/utils/measure/measure/powermeter/ocr.py @@ -3,6 +3,7 @@ import os from typing import Any +from measure.powermeter.errors import UnsupportedFeatureError from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter @@ -16,12 +17,16 @@ def __init__(self) -> None: self.file = open(filepath, "rb") # noqa: SIM115 super().__init__() - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult: + """Get a new power reading via OCR.""" + if include_voltage: + raise UnsupportedFeatureError("Voltage measurement are not supported for OCR mode.") + last_line = self.read_last_line() (timestamp, power) = last_line.strip().split(";") power = float(power) timestamp = float(timestamp) - return PowerMeasurementResult(power, timestamp) + return PowerMeasurementResult(power=power, updated=timestamp) def read_last_line(self) -> str: try: @@ -32,5 +37,8 @@ def read_last_line(self) -> str: self.file.seek(0) return self.file.readline().decode() + def has_voltage_support(self) -> bool: + return False + def process_answers(self, answers: dict[str, Any]) -> None: pass diff --git a/utils/measure/measure/powermeter/powermeter.py b/utils/measure/measure/powermeter/powermeter.py index 01e1c794c..41241bf6d 100644 --- a/utils/measure/measure/powermeter/powermeter.py +++ b/utils/measure/measure/powermeter/powermeter.py @@ -8,8 +8,12 @@ class PowerMeter(ABC): @abstractmethod - def get_power(self) -> PowerMeasurementResult: - """Get a power measurement from the meter""" + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult: + """Get a power measurement from the meter. Optionally include voltage readings.""" + + @abstractmethod + def has_voltage_support(self) -> bool: + """Returns bool depending on the powermeter capabilities to act as a voltmeter.""" def get_questions(self) -> list[Question]: """Get questions to ask for the chosen powermeter""" @@ -23,3 +27,4 @@ def process_answers(self, answers: dict[str, Any]) -> None: class PowerMeasurementResult(NamedTuple): power: float updated: float + voltage: Optional[float] = None # noqa: F821 diff --git a/utils/measure/measure/powermeter/shelly.py b/utils/measure/measure/powermeter/shelly.py index aeff0b05c..af4822dbb 100644 --- a/utils/measure/measure/powermeter/shelly.py +++ b/utils/measure/measure/powermeter/shelly.py @@ -7,7 +7,7 @@ import requests -from measure.powermeter.errors import ApiConnectionError +from measure.powermeter.errors import ApiConnectionError, UnsupportedFeatureError from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter _LOGGER = logging.getLogger("measure") @@ -29,7 +29,7 @@ def endpoint(self) -> str: def parse_json(self, json: dict) -> PowerMeasurementResult: meter = json["meters"][0] - return PowerMeasurementResult(float(meter["power"]), float(meter["timestamp"])) + return PowerMeasurementResult(power=float(meter["power"]), updated=float(meter["timestamp"])) class ShellyApiGen2Plus(ShellyApi): @@ -42,8 +42,10 @@ def __init__(self, ip_address: str, timeout: int) -> None: def endpoint(self) -> str: return self._endpoint - def parse_json(self, json: dict) -> PowerMeasurementResult: - return PowerMeasurementResult(float(json["apower"]), time.time()) + def parse_json(self, json: dict, include_voltage: bool = False) -> PowerMeasurementResult: + if include_voltage: + return PowerMeasurementResult(power=float(json["apower"]), voltage=float(json["voltage"]), updated=time.time()) + return PowerMeasurementResult(power=float(json["apower"]), updated=time.time()) def check_gen2_plus_endpoints(self) -> None: """ @@ -88,8 +90,8 @@ def __init__(self, shelly_ip: str, timeout: int = 5) -> None: self.api = ShellyApiGen2Plus(self.ip_address, self.timeout) self.api.check_gen2_plus_endpoints() - def get_power(self) -> PowerMeasurementResult: - """Get a new power reading from the Shelly device""" + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult: + """Get a new power reading from the Shelly device. Optionally include voltage.""" try: r = requests.get( f"http://{self.ip_address}{self.api.endpoint}", @@ -100,6 +102,13 @@ def get_power(self) -> PowerMeasurementResult: raise ApiConnectionError("Could not connect to Shelly Plug") from e json = r.json() + + if include_voltage: + if isinstance(self.api, ShellyApiGen1): + raise UnsupportedFeatureError("Voltage measurement is not supported on Shelly Gen1 devices") + if isinstance(self.api, ShellyApiGen2Plus): + return self.api.parse_json(json, include_voltage=True) + return self.api.parse_json(json) def _detect_api_version(self) -> int: @@ -121,5 +130,8 @@ def _detect_api_version(self) -> int: _LOGGER.debug("Shelly API version %d detected", gen) return int(gen) + def has_voltage_support(self) -> bool: + return isinstance(self.api, ShellyApiGen2Plus) + def process_answers(self, answers: dict[str, Any]) -> None: pass diff --git a/utils/measure/measure/powermeter/tasmota.py b/utils/measure/measure/powermeter/tasmota.py index ab37a2899..29fc06964 100644 --- a/utils/measure/measure/powermeter/tasmota.py +++ b/utils/measure/measure/powermeter/tasmota.py @@ -5,7 +5,7 @@ import requests -from measure.powermeter.errors import PowerMeterError +from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter @@ -13,7 +13,12 @@ class TasmotaPowerMeter(PowerMeter): def __init__(self, device_ip: str) -> None: self._device_ip = device_ip - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult: + """Get a new power reading from the Tasmota device. Optionally include voltage (FIXME: not yet implemented).""" + if include_voltage: + # FIXME: Not yet implemented # noqa: FIX001 + raise UnsupportedFeatureError("Voltage measurement is not yet implemented for Tasmota devices.") + r = requests.get( f"http://{self._device_ip}/cm?cmnd=STATUS+8", timeout=10, @@ -25,7 +30,10 @@ def get_power(self) -> PowerMeasurementResult: except KeyError as error: raise PowerMeterError("Unexpected JSON response format") from error - return PowerMeasurementResult(float(power), time.time()) + return PowerMeasurementResult(power=float(power), updated=time.time()) + + def has_voltage_support(self) -> bool: + return False def process_answers(self, answers: dict[str, Any]) -> None: pass diff --git a/utils/measure/measure/powermeter/tuya.py b/utils/measure/measure/powermeter/tuya.py index a98c2e286..dfa06edef 100644 --- a/utils/measure/measure/powermeter/tuya.py +++ b/utils/measure/measure/powermeter/tuya.py @@ -5,7 +5,7 @@ import tuyapower -from measure.powermeter.errors import PowerMeterError +from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter STATUS_OK = "OK" @@ -24,7 +24,12 @@ def __init__( self._device_key = device_key self._device_version = device_version - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult: + """Get a new power reading from the Tuya device. Optionally include voltage (FIXME: not yet implemented).""" + if include_voltage: + # FIXME: Not yet implemented # noqa: FIX001 + raise UnsupportedFeatureError("Voltage measurement is not yet implemented for Tuya devices.") + (_, w, _, _, err) = tuyapower.deviceInfo( self._device_id, self._device_ip, @@ -35,7 +40,10 @@ def get_power(self) -> PowerMeasurementResult: if err != STATUS_OK: raise PowerMeterError("Could not get a successful power reading") - return PowerMeasurementResult(w, time.time()) + return PowerMeasurementResult(power=w, updated=time.time()) + + def has_voltage_support(self) -> bool: + return False def process_answers(self, answers: dict[str, Any]) -> None: pass diff --git a/utils/measure/measure/runner/light.py b/utils/measure/measure/runner/light.py index 5ad61e2f0..5729db5d2 100644 --- a/utils/measure/measure/runner/light.py +++ b/utils/measure/measure/runner/light.py @@ -239,7 +239,7 @@ def set_light_to_maximum_brightness(self, color_mode: ColorMode) -> None: on=True, bri=255, ) - time.sleep(2) # Wait for the light to process + time.sleep(self.config.sleep_time) # Wait for the light to process def get_variations( self, diff --git a/utils/measure/measure/util/measure_util.py b/utils/measure/measure/util/measure_util.py index 27b7e1377..f3c8cd0da 100644 --- a/utils/measure/measure/util/measure_util.py +++ b/utils/measure/measure/util/measure_util.py @@ -1,10 +1,15 @@ +from __future__ import annotations + import logging import os import time from datetime import datetime as dt +from statistics import mean + +import numpy as np from measure.config import MeasureConfig -from measure.const import PROJECT_DIR +from measure.const import PROJECT_DIR, RETRY_COUNT_LIMIT, Trend, dummy_load_measurement_count, dummy_load_measurements_duration from measure.powermeter.errors import ( OutdatedMeasurementError, PowerMeterError, @@ -21,21 +26,113 @@ def __init__(self, power_meter: PowerMeter, config: MeasureConfig) -> None: self.dummy_load_value: float | None = None self.config = config - def take_average_measurement(self, duration: int) -> float: - """Measure average power consumption for a given time period in seconds""" - _LOGGER.info("Measuring average power for %d seconds", duration) + def take_average_measurement(self, duration: int, measure_resistance: bool = False) -> float: + """ + Measure the average power consumption or resistance for a given time period in seconds. + + This function calculates the average power or resistance by taking multiple readings over + the specified duration. If `measure_resistance` is True, the function computes resistance + using the formula R = V^2 / P, where V is the voltage and P is the power. If a dummy load + resistive value is set, the power consumption of the dummy load calculated for each + measurement, depending on the current voltage and is subtracted from the power + measurements. + + Args: + duration (int): The time duration (in seconds) over which to take measurements. + measure_resistance (bool): Whether to measure resistance instead of power. Defaults to False. + + Returns: + float: The average resistance (in Ω) or power (in W) over the measurement duration. + + Raises: + UnsupportedFeatureError: If `measure_resistance` is True but the power meter does not + support voltage measurements. + + Error handling: + - If voltage measurements are not supported, the program will terminate with an appropriate + error message, if measuring power consumption and a dummy load resistive value is set. + - For resistance measurements, voltage values below 1 are treated as invalid, and the program + exits to avoid incorrect calculations. + - Ignores single measurements of <= 0 W. + """ + _LOGGER.info("Measuring average %s over %s seconds", "resistance" if measure_resistance else "power", duration) start_time = time.time() readings: list[float] = [] + + first_measurement = True + while (time.time() - start_time) < duration: + if first_measurement: + first_measurement = False + else: + # sleep time exceeds duration + if not ((time.time() - start_time + self.config.sleep_time) < duration): + break + time.sleep(self.config.sleep_time) + + if measure_resistance: + result = self.power_meter.get_power(include_voltage=True) + power, voltage = result.power, result.voltage + + if voltage < 1: + _LOGGER.error("Error during measurement: Voltage measurement returned zero. Aborting measurement.") + exit(1) + + if round(power, 2) == 0: + _LOGGER.warning("Invalid measurement: power: %.2f W, voltage: %.2f", power, voltage) + continue + + resistance = round((voltage**2) / power, 4) + readings.append(resistance) + _LOGGER.debug( + "Measured resistance: %.2f Ω; measured power: %.2f W, voltage: %.2f", + resistance, + power, + voltage, + ) + continue + + if self.dummy_load_value: # measurement with dummy load + # Validate power meter is capable of measuring voltage + self._validate_voltage_support() + + result = self.power_meter.get_power(include_voltage=True) + + power, voltage = result.power, result.voltage + + if voltage < 1: + _LOGGER.error("Error during measurement: Voltage measurement returned zero. Aborting measurement.") + exit(1) + + dummy_power = (voltage**2) / self.dummy_load_value + power -= dummy_power + + if round(power, 2) <= 0: + _LOGGER.warning( + "Invalid measurement after subtracting dummy load consumption. Calculated consumption: %.2f W; ignoring", + power, + ) + continue + + readings.append(power) + _LOGGER.info("Measured power: %.2f W", power) + continue + + # measurement without dummy load power = self.power_meter.get_power().power - _LOGGER.info("Measured power: %.2f", power) + if round(power, 2) == 0: + _LOGGER.warning("Invalid measurement. Consumption: %.2f W; ignoring", power) + + continue readings.append(power) - time.sleep(self.config.sleep_time) - average = round(sum(readings) / len(readings), 2) - if self.dummy_load_value: - average -= self.dummy_load_value + _LOGGER.info("Measured power: %.2f W", power) - _LOGGER.info("Average power: %s", average) + if not readings: + _LOGGER.error("No valid readings were recorded.") + exit(1) + + average = round(mean(readings), 2) + _LOGGER.info("Average of %d measurements: %.2f %s", len(readings), average, "Ω" if measure_resistance else "W") return average def take_measurement( @@ -44,14 +141,21 @@ def take_measurement( retry_count: int = 0, ) -> float: """Get a measurement from the powermeter, take multiple samples and calculate the average""" + + if self.config.max_retries is None: + _LOGGER.error("No max_retries value was configured. Setting the default of 5 to prevent infinite loop.") + self.config.max_retries = 5 + measurements = [] # Take multiple samples to reduce noise for i in range(1, self.config.sample_count + 1): _LOGGER.debug("Taking sample %d", i) error = None measurement: PowerMeasurementResult | None = None + try: - measurement = self.power_meter.get_power() + measurement = self.power_meter.get_power(include_voltage=bool(self.dummy_load_value)) + updated_at = dt.fromtimestamp(measurement.updated).strftime( "%d-%m-%Y, %H:%M:%S", ) @@ -61,62 +165,168 @@ def take_measurement( if measurement: # Check if measurement is not outdated - if measurement.updated < start_timestamp: + if start_timestamp and measurement.updated < start_timestamp: error = OutdatedMeasurementError( "Power measurement is outdated. Aborting after %d successive retries", self.config.max_retries, ) + power = measurement.power + # Check if we not have a 0 measurement - if measurement.power == 0: + if round(power, 2) <= 0: error = ZeroReadingError("0 watt was read from the power meter") + if self.dummy_load_value: + voltage = measurement.voltage + if voltage < 1: + error = ZeroReadingError("0 Volt was read from the power meter") + else: + dummy_power = (voltage**2) / self.dummy_load_value + power -= dummy_power + + if round(power, 2) <= 0: + error = ZeroReadingError("0 watt was read from the power meter, after subtracting the dummy load") + + measurements.append(power) + if error: # Prevent endless recursion. Throw error when max retries is reached if retry_count == self.config.max_retries: raise error + if retry_count >= RETRY_COUNT_LIMIT: + _LOGGER.error( + "Retry count exceeded %d. Configured max_retries value: %d. Aborting to prevent infinite loop.", + RETRY_COUNT_LIMIT, + self.config.max_retries, + ) + raise error retry_count += 1 time.sleep(self.config.sleep_time) return self.take_measurement(start_timestamp, retry_count) - measurements.append(measurement.power) if self.config.sample_count > 1: time.sleep(self.config.sleep_time_sample) # Determine Average PM reading - average = sum(measurements) / len(measurements) - if self.dummy_load_value: - average -= self.dummy_load_value + if not measurements: + _LOGGER.error("No valid readings were recorded.") + exit(1) + + average = mean(measurements) + _LOGGER.info("Average measurement: %.3f W", average) return average def initialize_dummy_load(self) -> float: - """Get the previously measured dummy load value, or take a new measurement if it doesn't exist""" + """Get the previously measured dummy load resistance, or take a new measurement if it doesn't exist""" dummy_load_file = os.path.join( PROJECT_DIR, - ".persistent/dummy_load", + ".persistent/dummy_load_resistance", ) if os.path.exists(dummy_load_file): with open(dummy_load_file) as f: value = float(f.read()) - _LOGGER.info("Dummy load was already measured before, value: %sW", value) + _LOGGER.info("Dummy load was already measured before, value: %s Ω", value) + print("You need to preheat the dummy load, so it's consumption can stabilize.") + print("If you're unsure the dummy load is sufficiently preheated or you're using a different one, remeasure.") + print() inquirer = input("Do you want to measure the dummy load again? (y/n): ") if inquirer.lower() == "n": self.dummy_load_value = value return self.dummy_load_value + print() + print("Tip: Use a dummy load with constant power consumption. Stick to resistive loads for the best results!") + print("Important: Connect only the dummy load to your smart plug—not the device you're measuring.") + print() + print("The script will now measure the dummy load and continue once it's consumption has stablized.") + print("Depending on the dummy load this may take 2 hours.") + print() + input("Ready to begin measuring the dummy load? Hit Enter ") + self.dummy_load_value = self._measure_dummy_load(dummy_load_file) return self.dummy_load_value def _measure_dummy_load(self, file_path: str) -> float: """Measure the dummy load and persist the value for future measurement session""" - print() - print("Tip: Use a dummy load with constant power consumption. Stick to resistive loads for the best results!") - print("Important: Connect only the dummy load to your smart plug—not the device you're measuring.") - print("Preheat your dummy load until its temperature stabilizes. This usually takes about 2 hours.") - input("Ready to start? Press Enter to begin measuring the dummy load!") - average = self.take_average_measurement(1) + + print( + "Measuring and checking dummy load... this will take at least %.0f minutes." + % (dummy_load_measurement_count / 60 * dummy_load_measurements_duration), + ) + + # Validate power meter is capable of measuring voltage + self._validate_voltage_support() + + while True: + averages = [ + self.take_average_measurement(dummy_load_measurements_duration, measure_resistance=True) for _ in range(dummy_load_measurement_count) + ] + + trend = self._check_trend(averages) + + if not trend: + _LOGGER.error("Error during measurement: No trend could be calculated") + exit(1) + + if trend == Trend.STEADY: + break + + print(f"Dummy load resistance has not yet stablized and is {trend}, repeating.") + + average = round(mean(averages), 2) + + _LOGGER.info("Dummy load measurement completed. Resistance: %s Ω", average) + with open(file_path, "w") as f: f.write(str(average)) return average + + def _check_trend(self, averages: list[float]) -> Trend | None: + """ + Checks if the resistance readings of a dummy load are increasing, decreasing, or steady (fluctuating). + + Returns: + str: "increasing", "decreasing", or "steady" based on the trends. + None: if not enough samples were supplied + """ + if len(averages) < 20: + return None + + mid = len(averages) // 2 # Calculate the midpoint + + first_half = averages[:mid] + second_half = averages[mid:] + + # Helper function to calculate trend + def calc_trend(values: list[float]) -> float: + # Perform a linear regression to estimate the trend + x = np.arange(len(values)) + coeffs = np.polyfit(x, values, 1) # Linear fit: y = mx + c + return coeffs[0] # Extract the slope (m) + + first_trend = calc_trend(first_half) + second_trend = calc_trend(second_half) + + def trend_direction(slope: float, threshold: float = 0.01) -> Trend: + if slope > threshold: + return Trend.INCREASING + if slope < -threshold: + return Trend.DECREASING + return Trend.STEADY + + first_trend = trend_direction(first_trend) + second_trend = trend_direction(second_trend) + + if first_trend == second_trend and first_trend != Trend.STEADY: + return first_trend + return Trend.STEADY + + def _validate_voltage_support(self) -> None: + """Check if the power meter supports voltage readings.""" + + if not self.power_meter.has_voltage_support: + print("The selected power meter does not support voltage measurements, required to measure dummy loads.") + exit(1) diff --git a/utils/measure/tests/test_measure.py b/utils/measure/tests/test_measure.py index 7183039d4..c8458ee79 100644 --- a/utils/measure/tests/test_measure.py +++ b/utils/measure/tests/test_measure.py @@ -39,14 +39,26 @@ def test_wizard(mock_config_factory) -> None: # noqa: ANN001 measure = _create_measure_instance( mock_config, console_events=event_factory( - key.ENTER, # MEASURE_TYPE - "n", # DUMMY_LOAD - "y", # GENERATE_MODEL_JSON - key.DOWN, # COLOR_MODE + # MEASURE_TYPE + key.ENTER, + # GENERATE_MODEL_JSON + "y", + # DUMMY_LOAD + "n", + # MODEL_NAME + "a", + key.ENTER, + # MEASURE_DEVICE + "a", + key.ENTER, + # COLOR_MODE key.DOWN, + key.DOWN, + key.ENTER, + # GZIP key.ENTER, - key.ENTER, # GZIP - "n", # MULTIPLE_LIGHTS + # MULTIPLE_LIGHTS + "n", ), ) @@ -54,7 +66,7 @@ def test_wizard(mock_config_factory) -> None: # noqa: ANN001 measure.start() assert os.path.exists(os.path.join(PROJECT_DIR, "export/dummy/brightness.csv.gz")) - assert not os.path.exists(os.path.join(PROJECT_DIR, "export/dummy/model.json")) + assert os.path.exists(os.path.join(PROJECT_DIR, "export/dummy/model.json")) def test_run_light(mock_config_factory) -> None: # noqa: ANN001