From 548d82a59835a5c1bc309d364fda9aeedc76e090 Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Thu, 26 Dec 2024 12:12:47 +0100 Subject: [PATCH] Work on rapp --- lib/docker.py | 75 ++++++++++++++++++++++++++++++++++++++++ lib/envvars.py | 8 +++++ lib/helpers.py | 48 -------------------------- lib/logview.py | 91 +++++++++++++++++++++++++++++++++++++++++++++++++ lib/protocol.py | 65 +++++++++++++++++++++-------------- lib/rapp.py | 5 +-- lib/state.py | 32 ++++++++++++----- 7 files changed, 238 insertions(+), 86 deletions(-) create mode 100644 lib/docker.py create mode 100644 lib/envvars.py delete mode 100644 lib/helpers.py create mode 100644 lib/logview.py diff --git a/lib/docker.py b/lib/docker.py new file mode 100644 index 0000000..b842b83 --- /dev/null +++ b/lib/docker.py @@ -0,0 +1,75 @@ +import re +import asyncio +import logging +from typing import Optional, Tuple, List +from .envvars import COMPOSE_PATH + + +class DockerException(Exception): + """Raised when reading the docker version. If this succeeds, other errors + will be captured and stored.""" + pass + + +class Docker: + + lock = asyncio.Lock() + + MIN_DOCKER_VERSION = 24 + _RE_DOCKER_VERSION = \ + re.compile(r'Docker version ([0-9]+)\.([0-9]+)\.([0-9]+).*') + + @classmethod + def _read_docker_version(cls, output) -> Optional[Tuple[int, int, int]]: + m = cls._RE_DOCKER_VERSION.match(output) + if not m: + return + try: + major, minor, patch = int(m.group(1)), int(m.group(2)), int(m.group(3)) + except Exception: + return + return major, minor, patch + + @classmethod + def _run(cls, cmd: str) -> Tuple[str, str]: + try: + proc = await asyncio.create_subprocess_shell( + cmd, + stderr=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + cwd=COMPOSE_PATH, + ) + stdout, stderr = await proc.communicate() + out = stdout.decode() + err = stderr.decode().strip() + if err: + logging.error(err) + except Exception as e: + err = str(e) or type(e).__name__ + logging.error(f'cmd `{cmd}` failed (err)') + return '', err + else: + return out, err + + @classmethod + async def version(cls) -> str: + async with cls.lock: + out, err = await cls._run('docker -v') + if 'not found' in err or 'not found' in out: + raise DockerException('not found') + if err: + raise Exception(err) + docker_version = cls._read_docker_version(out) + if not docker_version: + raise DockerException('missing docker version') + if docker_version[0] < cls.MIN_DOCKER_VERSION: + vstr = '.'.join([str(i) for i in docker_version]) + raise DockerException(f'docker too old: v{vstr}') + + return docker_version + + @classmethod + async def pull_and_update(cls): + async with cls.lock: + await cls._run('docker compose pull') + await cls._run('docker compose up -d --remove-orphans') diff --git a/lib/envvars.py b/lib/envvars.py new file mode 100644 index 0000000..d5dcc3a --- /dev/null +++ b/lib/envvars.py @@ -0,0 +1,8 @@ +import os + +AGENTCORE_HOST = os.getenv('AGENTCORE_HOST', '127.0.0.1') +AGENTCORE_PORT = int(os.getenv('AGENTCORE_PORT', 8770)) +COMPOSE_FILE = os.getenv('COMPOSE_FILE', '/docker/docker-compose.yml') +ENV_FILE = os.getenv('COMPOSE_FILE', '/docker/.env') +CONFIG_FILE = os.getenv('CONFIG_FILE', '/config/infrasonar.yaml') +COMPOSE_PATH = os.path.dirname(COMPOSE_FILE) \ No newline at end of file diff --git a/lib/helpers.py b/lib/helpers.py deleted file mode 100644 index 64517d3..0000000 --- a/lib/helpers.py +++ /dev/null @@ -1,48 +0,0 @@ -import re -import asyncio -from typing import Optional, Tuple - - -class DockerException(Exception): - pass - - -_MIN_DOCKER_VERSION = 24 -_RE_DOCKER_VERSION = \ - re.compile(r'Docker version ([0-9]+)\.([0-9]+)\.([0-9]+).*') - - -def _read_docker_version(output) -> Optional[Tuple[int, int, int]]: - m = _RE_DOCKER_VERSION.match(output) - if not m: - return - try: - major, minor, patch = int(m.group(1)), int(m.group(2)), int(m.group(3)) - except Exception: - return - return major, minor, patch - - -async def read_docker_version() -> str: - cmd = 'docker -v' - proc = await asyncio.create_subprocess_shell( - cmd, - stderr=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - ) - - stdout, stderr = await proc.communicate() - out = stdout.decode() - err = stderr.decode() - if 'not found' in err or 'not found' in out: - raise DockerException('not found') - if err: - raise Exception(err) - docker_version = read_docker_version(out) - if not docker_version: - raise DockerException('missing docker version') - if docker_version[0] < _MIN_DOCKER_VERSION: - vstr = '.'.join([str(i) for i in docker_version]) - raise DockerException(f'docker too old: v{vstr}') - - return docker_version diff --git a/lib/logview.py b/lib/logview.py new file mode 100644 index 0000000..99226d1 --- /dev/null +++ b/lib/logview.py @@ -0,0 +1,91 @@ +import asyncio +import time +from typing import List, Optional +from .envvars import COMPOSE_PATH + + +class LogView: + + MAX_UNUSED_TIME = 30.0 # kill after 30 seconds unused + + def __init__(self, name: str, on_stop: callable): + self.name = name + self._lines: List[str] = [] + self._process: Optional[asyncio.Process] = None + self._reader: Optional[asyncio.Future] = None + self._watcher: Optional[asyncio.Future] = None + self._on_stop = on_stop + self._accessed: float = 0.0 + + async def start(self, n: Optional[int] = None): + tail = f' -n {n}' if n is not None else '' + cmd = f'docker logs {self.name} -f{tail}' + self._process = await asyncio.create_subprocess_shell( + cmd, + stderr=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + cwd=COMPOSE_PATH) + self._reader = asyncio.ensure_future(self._read()) + self._watcher = asyncio.ensure_future(self._watch()) + await asyncio.sleep(0.5) # give a little time to read some lines + + async def _read(self): + try: + while True: + line = await self.process.stderr.readline() + if line: + try: + line = line.decode() + except Exception as e: + self.lines.append(f'Decoding error: {e}') + else: + self.lines.append(line) + else: + break + except asyncio.CancelledError: + pass + except Exception: + self.stop() + + async def _watch(self): + while True: + now = time.time() + if now - self._accessed > self.MAX_UNUSED_TIME: + break + await asyncio.sleep(1.0) + self.stop() + + def get_lines(self, start: int = 0) -> dict: + self._accessed = time.time() + n = len(self._lines) + if start > n: + start = 0 + return { + 'lines': self._lines[start:], + 'next': n + } + + def stop(self): + try: + self._reader.cancel() + except Exception: + pass + try: + self._watcher.cancel() + except Exception: + pass + try: + self._process.kill() + + # below is a fix for Python 3.12 (for some reason close is not + # reached on the transport after calling kill or terminatre) + self._process._transport.close() + except Exception: + pass + + self._reader = None + self._watcher = None + self._process = None + + self._on_stop(self.name) + self._on_stop = lambda _: None diff --git a/lib/protocol.py b/lib/protocol.py index 4fd420d..5ec850d 100644 --- a/lib/protocol.py +++ b/lib/protocol.py @@ -5,6 +5,7 @@ from .net.package import Package from .net.protocol import Protocol from .state import State +from .docker import Docker @@ -26,30 +27,36 @@ class RappProtocol(Protocol): def __init__(self): super().__init__() - def _on_ping(self, pkg: Package): + def _empty_ok(self, pkg: Package) -> Package: + return Package.make(self.PROTO_RAPP_RES, pid=pkg.pid, is_binary=True) + + def _on_ping(self, pkg: Package) -> Package: logging.debug("Ping") - pkg = Package.make(self.PROTO_RAPP_RES, pid=pkg.pid, is_binary=True) - self.write(pkg) + return self._empty_ok(pkg) def _on_read(self, pkg: Package) -> Package: logging.debug("Read") data = State.get() - pkg = Package.make(self.PROTO_RAPP_RES, data=data, pid=pkg.pid) - self.write(pkg) + return Package.make(self.PROTO_RAPP_RES, data=data, pid=pkg.pid) - def _on_push(self, pkg: Package) -> Package: + def _on_push(self, pkg: Package): logging.debug("Push") - try: - State.set(pkg.data) - except Exception as e: - data = { - 'reason': str(e) or f'unknown error: {type(e).__name__}' - } - pkg = Package.make(self.PROTO_RAPP_ERR, data=data, pid=pkg.pid) - else: - pkg = \ - Package.make(self.PROTO_RAPP_RES, pid=pkg.pid, is_binary=True) - self.write(pkg) + State.set(pkg.data) + return self._empty_ok(pkg) + + def _on_update(self, pkg: Package): + logging.debug("Pull & Update") + asyncio.ensure_future(Docker.pull_and_update()) + return self._empty_ok(pkg) + + def _on_log(self, pkg: Package): + assert isinstance(pkg.data, dict), 'log request must be a dict' + name = pkg.data.get('name') + assert name and isinstance(name, str), 'missing or invalid name' + start = pkg.data.get('start', 0) + assert isinstance(start, int) and start >= 0, 'invalid start' + data = State.get_log(name, start) + return Package.make(self.PROTO_RAPP_RES, data=data, pid=pkg.pid) def on_package_received(self, pkg: Package, _map={ PROTO_RAPP_PING: _on_ping, @@ -61,13 +68,21 @@ def on_package_received(self, pkg: Package, _map={ handle = _map.get(pkg.tp) if handle is None: logging.error(f'unhandled package type: {pkg.tp}') + return + + if Docker.lock.locked(): + logging.debug(f'Busy ({pkg.tp})') + pkg = Package.make( + self.PROTO_RAPP_BUSY, + pid=pkg.pid, + is_binary=True) else: - if State.lock.locked(): - pkg = Package.make( - self.PROTO_RAPP_BUSY, - pid=pkg.pid, - is_binary=True) - self.write(pkg) - else: - handle(self, pkg) + try: + pkg = handle(self, pkg) + except Exception as e: + reason = str(e) or f'unknown error: {type(e).__name__}' + logging.error(reason) + data = {'reason': reason} + pkg = Package.make(self.PROTO_RAPP_ERR, data=data, pid=pkg.pid) + self.write(pkg) diff --git a/lib/rapp.py b/lib/rapp.py index 325e93e..5a17934 100644 --- a/lib/rapp.py +++ b/lib/rapp.py @@ -5,10 +5,7 @@ from typing import Optional from .protocol import RappProtocol from .state import State - - -AGENTCORE_HOST = os.getenv('AGENTCORE_HOST', '127.0.0.1') -AGENTCORE_PORT = int(os.getenv('AGENTCORE_PORT', 8770)) +from .envvars import AGENTCORE_HOST, AGENTCORE_PORT class Rapp: diff --git a/lib/state.py b/lib/state.py index d7f88c5..1cc713a 100644 --- a/lib/state.py +++ b/lib/state.py @@ -2,13 +2,13 @@ import asyncio import yaml import logging -from typing import Set, Optional +from typing import Set, List, Dict from configobj import ConfigObj -from .helpers import read_docker_version +from .docker import Docker +from .envvars import COMPOSE_FILE, CONFIG_FILE, ENV_FILE +from .logview import LogView + -COMPOSE_FILE = os.getenv('COMPOSE_FILE', '/docker/docker-compose.yml') -ENV_FILE = os.getenv('COMPOSE_FILE', '/docker/.env') -CONFIG_FILE = os.getenv('CONFIG_FILE', '/config/infrasonar.yaml') TL = (tuple, list) COMPOSE_KEYS = set(('environment', 'image')) PROBE_KEYS = set(('key', 'compose', 'config', 'use')) @@ -29,7 +29,6 @@ class StateException(Exception): class State: loop = asyncio.new_event_loop() - lock: Optional[asyncio.Lock] = None compose_data: dict = {} env_data: dict = {} config_data: dict = {} @@ -38,13 +37,28 @@ class State: 'LOG_LEVEL', 'LOG_COLORIZED' ]) + loggers: Dict[str, LogView] = {} @classmethod - async def _init(cls): + def _init(cls): cls.lock = asyncio.Lock() + cls.loggers['rapp'] = Docker + + @classmethod + def get_log(cls, name: str, start: int = 0): + name = f'infrasonar-{name}-1' + if name not in cls.loggers: + start = 0 + cls.loggers[name] = LogView(name, cls.rm_logger) + cls.loop.run_until_complete(cls.loggers[name].start()) + return cls.loggers[name].get_lines(start) + + @classmethod + def rm_logger(cls, name: str): + del cls.loggers[name] @classmethod - async def _read(cls): + def _read(cls): with open(COMPOSE_FILE, 'r') as fp: cls.compose_data = yaml.safe_load(fp) with open(CONFIG_FILE, 'r') as fp: @@ -331,5 +345,5 @@ def init(cls): cls.get() # Test docker version - docker_version = cls.loop.run_until_complete(read_docker_version()) + docker_version = cls.loop.run_until_complete(Docker.version()) logging.info(f'docker version: {docker_version}')