Skip to content

Commit

Permalink
Work on rapp
Browse files Browse the repository at this point in the history
  • Loading branch information
joente committed Dec 26, 2024
1 parent 8fd221d commit 548d82a
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 86 deletions.
75 changes: 75 additions & 0 deletions lib/docker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import re
import asyncio
import logging
from typing import Optional, Tuple, List
from .envvars import COMPOSE_PATH


class DockerException(Exception):
"""Raised when reading the docker version. If this succeeds, other errors
will be captured and stored."""
pass


class Docker:

lock = asyncio.Lock()

MIN_DOCKER_VERSION = 24
_RE_DOCKER_VERSION = \
re.compile(r'Docker version ([0-9]+)\.([0-9]+)\.([0-9]+).*')

@classmethod
def _read_docker_version(cls, output) -> Optional[Tuple[int, int, int]]:
m = cls._RE_DOCKER_VERSION.match(output)
if not m:
return
try:
major, minor, patch = int(m.group(1)), int(m.group(2)), int(m.group(3))
except Exception:
return
return major, minor, patch

@classmethod
def _run(cls, cmd: str) -> Tuple[str, str]:
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
cwd=COMPOSE_PATH,
)
stdout, stderr = await proc.communicate()
out = stdout.decode()
err = stderr.decode().strip()
if err:
logging.error(err)
except Exception as e:
err = str(e) or type(e).__name__
logging.error(f'cmd `{cmd}` failed (err)')
return '', err
else:
return out, err

@classmethod
async def version(cls) -> str:
async with cls.lock:
out, err = await cls._run('docker -v')
if 'not found' in err or 'not found' in out:
raise DockerException('not found')
if err:
raise Exception(err)
docker_version = cls._read_docker_version(out)
if not docker_version:
raise DockerException('missing docker version')
if docker_version[0] < cls.MIN_DOCKER_VERSION:
vstr = '.'.join([str(i) for i in docker_version])
raise DockerException(f'docker too old: v{vstr}')

return docker_version

@classmethod
async def pull_and_update(cls):
async with cls.lock:
await cls._run('docker compose pull')
await cls._run('docker compose up -d --remove-orphans')
8 changes: 8 additions & 0 deletions lib/envvars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os

AGENTCORE_HOST = os.getenv('AGENTCORE_HOST', '127.0.0.1')
AGENTCORE_PORT = int(os.getenv('AGENTCORE_PORT', 8770))
COMPOSE_FILE = os.getenv('COMPOSE_FILE', '/docker/docker-compose.yml')
ENV_FILE = os.getenv('COMPOSE_FILE', '/docker/.env')
CONFIG_FILE = os.getenv('CONFIG_FILE', '/config/infrasonar.yaml')
COMPOSE_PATH = os.path.dirname(COMPOSE_FILE)
48 changes: 0 additions & 48 deletions lib/helpers.py

This file was deleted.

91 changes: 91 additions & 0 deletions lib/logview.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import asyncio
import time
from typing import List, Optional
from .envvars import COMPOSE_PATH


class LogView:

MAX_UNUSED_TIME = 30.0 # kill after 30 seconds unused

def __init__(self, name: str, on_stop: callable):
self.name = name
self._lines: List[str] = []
self._process: Optional[asyncio.Process] = None
self._reader: Optional[asyncio.Future] = None
self._watcher: Optional[asyncio.Future] = None
self._on_stop = on_stop
self._accessed: float = 0.0

async def start(self, n: Optional[int] = None):
tail = f' -n {n}' if n is not None else ''
cmd = f'docker logs {self.name} -f{tail}'
self._process = await asyncio.create_subprocess_shell(
cmd,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
cwd=COMPOSE_PATH)
self._reader = asyncio.ensure_future(self._read())
self._watcher = asyncio.ensure_future(self._watch())
await asyncio.sleep(0.5) # give a little time to read some lines

async def _read(self):
try:
while True:
line = await self.process.stderr.readline()
if line:
try:
line = line.decode()
except Exception as e:
self.lines.append(f'Decoding error: {e}')
else:
self.lines.append(line)
else:
break
except asyncio.CancelledError:
pass
except Exception:
self.stop()

async def _watch(self):
while True:
now = time.time()
if now - self._accessed > self.MAX_UNUSED_TIME:
break
await asyncio.sleep(1.0)
self.stop()

def get_lines(self, start: int = 0) -> dict:
self._accessed = time.time()
n = len(self._lines)
if start > n:
start = 0
return {
'lines': self._lines[start:],
'next': n
}

def stop(self):
try:
self._reader.cancel()
except Exception:
pass
try:
self._watcher.cancel()
except Exception:
pass
try:
self._process.kill()

# below is a fix for Python 3.12 (for some reason close is not
# reached on the transport after calling kill or terminatre)
self._process._transport.close()
except Exception:
pass

self._reader = None
self._watcher = None
self._process = None

self._on_stop(self.name)
self._on_stop = lambda _: None
65 changes: 40 additions & 25 deletions lib/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .net.package import Package
from .net.protocol import Protocol
from .state import State
from .docker import Docker



Expand All @@ -26,30 +27,36 @@ class RappProtocol(Protocol):
def __init__(self):
super().__init__()

def _on_ping(self, pkg: Package):
def _empty_ok(self, pkg: Package) -> Package:
return Package.make(self.PROTO_RAPP_RES, pid=pkg.pid, is_binary=True)

def _on_ping(self, pkg: Package) -> Package:
logging.debug("Ping")
pkg = Package.make(self.PROTO_RAPP_RES, pid=pkg.pid, is_binary=True)
self.write(pkg)
return self._empty_ok(pkg)

def _on_read(self, pkg: Package) -> Package:
logging.debug("Read")
data = State.get()
pkg = Package.make(self.PROTO_RAPP_RES, data=data, pid=pkg.pid)
self.write(pkg)
return Package.make(self.PROTO_RAPP_RES, data=data, pid=pkg.pid)

def _on_push(self, pkg: Package) -> Package:
def _on_push(self, pkg: Package):
logging.debug("Push")
try:
State.set(pkg.data)
except Exception as e:
data = {
'reason': str(e) or f'unknown error: {type(e).__name__}'
}
pkg = Package.make(self.PROTO_RAPP_ERR, data=data, pid=pkg.pid)
else:
pkg = \
Package.make(self.PROTO_RAPP_RES, pid=pkg.pid, is_binary=True)
self.write(pkg)
State.set(pkg.data)
return self._empty_ok(pkg)

def _on_update(self, pkg: Package):
logging.debug("Pull & Update")
asyncio.ensure_future(Docker.pull_and_update())
return self._empty_ok(pkg)

def _on_log(self, pkg: Package):
assert isinstance(pkg.data, dict), 'log request must be a dict'
name = pkg.data.get('name')
assert name and isinstance(name, str), 'missing or invalid name'
start = pkg.data.get('start', 0)
assert isinstance(start, int) and start >= 0, 'invalid start'
data = State.get_log(name, start)
return Package.make(self.PROTO_RAPP_RES, data=data, pid=pkg.pid)

def on_package_received(self, pkg: Package, _map={
PROTO_RAPP_PING: _on_ping,
Expand All @@ -61,13 +68,21 @@ def on_package_received(self, pkg: Package, _map={
handle = _map.get(pkg.tp)
if handle is None:
logging.error(f'unhandled package type: {pkg.tp}')
return

if Docker.lock.locked():
logging.debug(f'Busy ({pkg.tp})')
pkg = Package.make(
self.PROTO_RAPP_BUSY,
pid=pkg.pid,
is_binary=True)
else:
if State.lock.locked():
pkg = Package.make(
self.PROTO_RAPP_BUSY,
pid=pkg.pid,
is_binary=True)
self.write(pkg)
else:
handle(self, pkg)
try:
pkg = handle(self, pkg)
except Exception as e:
reason = str(e) or f'unknown error: {type(e).__name__}'
logging.error(reason)
data = {'reason': reason}
pkg = Package.make(self.PROTO_RAPP_ERR, data=data, pid=pkg.pid)
self.write(pkg)

5 changes: 1 addition & 4 deletions lib/rapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
from typing import Optional
from .protocol import RappProtocol
from .state import State


AGENTCORE_HOST = os.getenv('AGENTCORE_HOST', '127.0.0.1')
AGENTCORE_PORT = int(os.getenv('AGENTCORE_PORT', 8770))
from .envvars import AGENTCORE_HOST, AGENTCORE_PORT


class Rapp:
Expand Down
32 changes: 23 additions & 9 deletions lib/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import asyncio
import yaml
import logging
from typing import Set, Optional
from typing import Set, List, Dict
from configobj import ConfigObj
from .helpers import read_docker_version
from .docker import Docker
from .envvars import COMPOSE_FILE, CONFIG_FILE, ENV_FILE
from .logview import LogView


COMPOSE_FILE = os.getenv('COMPOSE_FILE', '/docker/docker-compose.yml')
ENV_FILE = os.getenv('COMPOSE_FILE', '/docker/.env')
CONFIG_FILE = os.getenv('CONFIG_FILE', '/config/infrasonar.yaml')
TL = (tuple, list)
COMPOSE_KEYS = set(('environment', 'image'))
PROBE_KEYS = set(('key', 'compose', 'config', 'use'))
Expand All @@ -29,7 +29,6 @@ class StateException(Exception):

class State:
loop = asyncio.new_event_loop()
lock: Optional[asyncio.Lock] = None
compose_data: dict = {}
env_data: dict = {}
config_data: dict = {}
Expand All @@ -38,13 +37,28 @@ class State:
'LOG_LEVEL',
'LOG_COLORIZED'
])
loggers: Dict[str, LogView] = {}

@classmethod
async def _init(cls):
def _init(cls):
cls.lock = asyncio.Lock()
cls.loggers['rapp'] = Docker

@classmethod
def get_log(cls, name: str, start: int = 0):
name = f'infrasonar-{name}-1'
if name not in cls.loggers:
start = 0
cls.loggers[name] = LogView(name, cls.rm_logger)
cls.loop.run_until_complete(cls.loggers[name].start())
return cls.loggers[name].get_lines(start)

@classmethod
def rm_logger(cls, name: str):
del cls.loggers[name]

@classmethod
async def _read(cls):
def _read(cls):
with open(COMPOSE_FILE, 'r') as fp:
cls.compose_data = yaml.safe_load(fp)
with open(CONFIG_FILE, 'r') as fp:
Expand Down Expand Up @@ -331,5 +345,5 @@ def init(cls):
cls.get()

# Test docker version
docker_version = cls.loop.run_until_complete(read_docker_version())
docker_version = cls.loop.run_until_complete(Docker.version())
logging.info(f'docker version: {docker_version}')

0 comments on commit 548d82a

Please sign in to comment.