From af00a8cdee802a80e2e4f0976dd9f0c6df686b3a Mon Sep 17 00:00:00 2001 From: Tobias Schaffner Date: Tue, 12 Mar 2024 16:03:55 +0100 Subject: [PATCH] feat(power): add driver for Anel ethernet power strips Anel sells several different ethernet power strips. The strips expose an UDP api that allows to switch the plugs on an off. Add a simple power driver for this UDP interface. Signed-off-by: Tobias Schaffner --- .github/wordlist.txt | 2 + Kconfig | 4 +- docs/config.rst | 30 ++++++++++- mtda.ini | 9 ++++ mtda/power/anel.py | 115 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 156 insertions(+), 4 deletions(-) create mode 100644 mtda/power/anel.py diff --git a/.github/wordlist.txt b/.github/wordlist.txt index 2d617c76..aa727188 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -12,6 +12,7 @@ ACM Allwinner amd +Anel api APIs ARCELI @@ -121,6 +122,7 @@ Tizen's TPM tpmtool ttyS +UDP UI unmuted uncomment diff --git a/Kconfig b/Kconfig index 4691a019..939667d3 100644 --- a/Kconfig +++ b/Kconfig @@ -104,8 +104,8 @@ config POWER_VARIANT string "Power variant" default "qemu" help - Select a power variant from 'aviosys_8800', 'gpio', 'pduclient', - 'qemu' and 'usbrelay'. + Select a power variant from 'aviosys_8800', 'anel', 'gpio', + 'pduclient', 'qemu' and 'usbrelay'. endmenu menu "Remote Settings" diff --git a/docs/config.rst b/docs/config.rst index 663fab43..39e04d73 100644 --- a/docs/config.rst +++ b/docs/config.rst @@ -84,8 +84,8 @@ General settings may be selected with ``variant``. * ``variant``: string [required] - Select a power variant from ``aviosys_8800``, ``gpio``, ``pduclient``, - ``qemu``, ``shellcmd`` and ``usbrelay``. + Select a power variant from ``aviosys_8800``, ``anel``, ``gpio``, + ``pduclient``, ``qemu``, ``shellcmd`` and ``usbrelay``. * ``remote``: section [optional] Specify the host and ports to connect to when using a MTDA client (such as @@ -236,6 +236,32 @@ Aviosys. The following settings are supported: * ``vid``: integer [optional] The USB vendor ID of the power outlet (defaults to ``067b``). +``anel`` driver settings +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The ``anel`` driver supports the UDP API of the Ethernet controlled power strips +from Anel. The following settings are supported: + +* ``host``: string [required] + The IP or hostname of the power strip. + +* ``plug``: integer [required] + The number of the plug used. + +* ``user``: string [optional] + The username as configured in the web interface (defaults to ``admin``). + +* ``password``: string [optional] + The password as configured in the web interface (defaults to ``amel``). + +* ``port_in``: integer [optional] + The receive port of the UDP api as configured in the web interface + (defaults to ``77``). + +* ``port_out``: integer [optional] + The send of the UDP api as configured in the web interface + (defaults to ``75``). + ``docker`` driver settings ~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/mtda.ini b/mtda.ini index 3b6e77dc..065f81d4 100644 --- a/mtda.ini +++ b/mtda.ini @@ -120,6 +120,7 @@ time-until = login: # --------------------------------------------------------------------------- # Set "variant" to specify which power control device to use. Use one of: # - aviosys_8800 +# - anel # - docker # - gpio # - pduclient @@ -131,6 +132,14 @@ time-until = login: [power] variant=aviosys_8800 +# variant=anel +# host=192.168.0.20 +# plug=5 +# user=admin +# password=anel +# port_in=77 +# port_out=75 + # variant=gpio # gpio=gpiochip0@203 # enable=high diff --git a/mtda/power/anel.py b/mtda/power/anel.py new file mode 100644 index 00000000..5c2e57e0 --- /dev/null +++ b/mtda/power/anel.py @@ -0,0 +1,115 @@ +# --------------------------------------------------------------------------- +# Anel power strip driver for MTDA +# --------------------------------------------------------------------------- +# +# This software is a part of MTDA. +# Copyright (c) Siemens AG, 2024 +# +# --------------------------------------------------------------------------- +# SPDX-License-Identifier: MIT +# --------------------------------------------------------------------------- + +# System imports +import socket +from contextlib import contextmanager + +# Local imports +from mtda.power.controller import PowerController + + +class AnelPowerController(PowerController): + + def __init__(self, mtda): + self._host = None + self._plug = None + self._user = "admin" + self._password = "anel" + self._port_in = 77 + self._port_out = 75 + self._status = self.POWER_OFF + self.mtda = mtda + + def configure(self, conf): + if 'host' in conf: + self._host = conf['host'] + if 'plug' in conf: + self._plug = int(conf['plug']) + if 'user' in conf: + self._user = conf['user'] + if 'password' in conf: + self.password = conf['password'] + if 'port_in' in conf: + self.check_on = int(conf['port_in']) + if 'port_out' in conf: + self.check_on = int(conf['port_out']) + + def probe(self): + if self._host is None: + raise ValueError("host not specified") + if self._plug is None: + raise ValueError("plug not specified") + + def command(self, args): + return False + + @contextmanager + def _in(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind(('', self._port_in)) + sock.settimeout(0.5) + yield sock + sock.close() + + @contextmanager + def _out(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + yield sock + sock.close() + + def _send(self, payload): + data = (payload + self._user + self._password).encode('latin') + with self._out() as connection: + connection.sendto(data, (self._host, self._port_out)) + + def _receive(self): + with self._in() as connection: + data, _ = connection.recvfrom(1024) + return data.decode('latin') + + def _switch(self, state): + payload = f"Sw_{'on' if state else 'off'}{self._plug}" + self._send(payload) + + try: + result = self._receive().split(':')[5+self._plug].rsplit(',', 1)[1] + except TimeoutError: + self.mtda.debug(3, "power.anel._switch(): TimeoutError") + + if result == "0": + self._status = self.POWER_OFF + elif result == "1": + self._status = self.POWER_ON + else: + self._status = self.POWER_UNSURE + + def on(self): + self._switch(True) + return self._status == self.POWER_ON + + def off(self): + self._switch(False) + return self._status == self.POWER_OFF + + def status(self): + return self._status + + def toggle(self): + if self.status() == self.POWER_OFF: + self.on() + else: + self.off() + return self.status() + + +def instantiate(mtda): + return AnelPowerController(mtda)