Skip to content

Commit

Permalink
Touch up front end:
Browse files Browse the repository at this point in the history
- Use typer to handle env variables (Show in --help and do type casting/sanitising)
- Add .config to override defaults + management commands
- Dynamic command hiding based on chosen backend and commands implemented
- Flow of options for commands through backend to support validation
- Seperate frontend (typer) from backend (particular: error handling)

Some backend work:
- Remove docker
- Remove handling of symlink charts (git and helm commands)
- Tempfile context manager for debug retension
* Still needs more work
  • Loading branch information
marcelldls committed Jul 24, 2024
1 parent 302c4fd commit d9d794b
Show file tree
Hide file tree
Showing 20 changed files with 972 additions and 1,310 deletions.
97 changes: 72 additions & 25 deletions src/edge_containers_cli/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import os
from typing import Optional

import typer

import edge_containers_cli.globals as globals
from edge_containers_cli.cmds.cli import cli
from edge_containers_cli.cli import cli
from edge_containers_cli.definitions import ENV, ECBackends, ECContext, ECLogLevels
from edge_containers_cli.utils import ConfigController

from . import __version__
from .backend import backend as ec_backend
from .backend import init_backend
from .logging import init_logging
from .shell import init_shell
from .utils import init_cleanup

__all__ = ["main"]

Expand All @@ -17,6 +24,24 @@ def version_callback(value: bool):
raise typer.Exit()


def backend_callback(ctx: typer.Context, backend: ECBackends):
init_backend(backend)
# Dynamically drop any method not implemented
not_implemented = [
mthd.replace("_", "-") for mthd in ec_backend.get_notimplemented()
]
for command in not_implemented:
typer_commands = ctx.command.commands # type: ignore
if command in typer_commands:
typer_commands.pop(command)

return backend.value


args = ConfigController(globals.CONFIG_ROOT / globals.ENV_CONFIG)
args.read_config()


@cli.callback()
def main(
ctx: typer.Context,
Expand All @@ -28,42 +53,64 @@ def main(
help="Log the version of ec and exit",
),
repo: str = typer.Option(
"",
args.get_var(ENV.repo, ECContext().repo),
"-r",
"--repo",
help="service/ioc instances repository",
help="Service instances repository",
envvar=ENV.repo.value,
),
namespace: str = typer.Option(
"", "-n", "--namespace", help="kubernetes namespace to use"
args.get_var(ENV.namespace, ECContext().namespace),
"-n",
"--namespace",
help="Kubernetes namespace to use",
envvar=ENV.namespace.value,
),
log_level: str = typer.Option(
"WARN", help="Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)"
backend: ECBackends = typer.Option(
args.get_var(ENV.backend, ECBackends.K8S),
"-b",
"--backend",
callback=backend_callback,
is_eager=True,
help="Backend to use",
envvar=ENV.backend.value,
expose_value=True,
),
verbose: bool = typer.Option(
globals.EC_VERBOSE, "-v", "--verbose", help="print the commands we run"
args.get_var(ENV.verbose, False),
"-v",
"--verbose",
help="Print the commands we run",
envvar=ENV.verbose.value,
show_default=True,
),
debug: bool = typer.Option(
globals.EC_DEBUG,
args.get_var(ENV.debug, False),
"-d",
"--debug",
help="Enable debug logging to console and retain temporary files",
help="Enable debug logging, retain temp files",
envvar=ENV.debug.value,
show_default=True,
),
log_level: ECLogLevels = typer.Option(
args.get_var(ENV.log_level, ECLogLevels.WARNING),
help="Log level",
envvar=ENV.log_level.value,
),
log_url: str = typer.Option(
args.get_var(ENV.log_url, ECContext().log_url),
help="Log url",
envvar=ENV.log_url.value,
),
):
"""Edge Containers assistant CLI"""
init_logging(ECLogLevels.DEBUG if debug else log_level)
init_shell(verbose)
init_cleanup(debug)

globals.EC_VERBOSE, globals.EC_DEBUG = bool(verbose), bool(debug)

init_logging(log_level.upper())

# create a context dictionary to pass to all sub commands
repo = repo or globals.EC_SERVICES_REPO
namespace = namespace or globals.EC_K8S_NAMESPACE
ctx.ensure_object(globals.Context)
context = globals.Context(namespace=namespace, beamline_repo=repo)
ctx.obj = context


# test with:
# python -m edge_containers_cli
if __name__ == "__main__":
cli()
context = ECContext(
repo=repo,
namespace=namespace,
log_url=log_url,
)
ec_backend.set_context(context)
148 changes: 55 additions & 93 deletions src/edge_containers_cli/autocomplete.py
Original file line number Diff line number Diff line change
@@ -1,140 +1,102 @@
import json
import os
import tempfile
import time
import urllib
from pathlib import Path
from subprocess import CalledProcessError

import typer

import edge_containers_cli.globals as globals
import edge_containers_cli.shell as shell
from edge_containers_cli.cmds.k8s_commands import check_namespace
from edge_containers_cli.docker import Docker
from edge_containers_cli.backend import backend as ec_backend
from edge_containers_cli.cmds.commands import CommandError
from edge_containers_cli.definitions import ECContext

# from edge_containers_cli.cmds.k8s_commands import check_namespace
from edge_containers_cli.git import create_version_map
from edge_containers_cli.logging import log
from edge_containers_cli.utils import cleanup_temp
from edge_containers_cli.shell import ShellError
from edge_containers_cli.utils import cache_dict, read_cached_dict, tmpdir, ConfigController


def url_encode(in_string: str) -> str:
return urllib.parse.quote(in_string, safe="") # type: ignore


def cache_dict(cache_folder: str, cached_file: str, data_struc: dict) -> None:
cache_dir = os.path.join(globals.CACHE_ROOT, cache_folder)
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

cache_path = os.path.join(cache_dir, cached_file)
with open(cache_path, "w") as f:
f.write(json.dumps(data_struc, indent=4))


def read_cached_dict(cache_folder: str, cached_file: str) -> dict:
cache_path = os.path.join(globals.CACHE_ROOT, cache_folder, cached_file)
read_dict = {}

# Check cache if available
if os.path.exists(cache_path):
# Read from cache if not stale
if (time.time() - os.path.getmtime(cache_path)) < globals.CACHE_EXPIRY:
with open(cache_path) as f:
read_dict = json.load(f)

return read_dict


def fetch_service_graph(beamline_repo: str) -> dict:
version_map = read_cached_dict(url_encode(beamline_repo), globals.IOC_CACHE)
def autocomplete_backend_init(ctx: typer.Context):
params = ctx.parent.params # type: ignore
context = ECContext(
repo=params["repo"],
namespace=params["namespace"],
log_url=params["log_url"],
)
ec_backend.set_context(context)


def fetch_service_graph(repo: str) -> dict:
version_map = read_cached_dict(
globals.CACHE_ROOT / url_encode(repo), globals.SERVICE_CACHE
)
if not version_map:
tmp_dir = Path(tempfile.mkdtemp())
version_map = create_version_map(beamline_repo, tmp_dir)
cache_dict(url_encode(beamline_repo), globals.IOC_CACHE, version_map)
cleanup_temp(tmp_dir)
with tmpdir as path:
version_map = create_version_map(repo, path)
cache_dict(
globals.CACHE_ROOT / url_encode(repo),
globals.SERVICE_CACHE,
version_map,
)

return version_map


def avail_services(ctx: typer.Context) -> list[str]:
params = ctx.parent.params # type: ignore
services_repo = params["repo"] or globals.EC_SERVICES_REPO
services_repo = params["repo"]

# This block prevents getting a stack trace during autocompletion
try:
services_graph = fetch_service_graph(services_repo)
return list(services_graph.keys())
except typer.Exit:
return [" "]
except CalledProcessError:
return [" "]
except ShellError as e:
typer.echo(f"\n{e}", nl=False, err=True)
return []


def avail_versions(ctx: typer.Context) -> list[str]:
params = ctx.parent.params # type: ignore
beamline_repo = params["repo"] or globals.EC_SERVICES_REPO
repo = params["repo"]
service_name = ctx.params["service_name"]

# This block prevents getting a stack trace during autocompletion
try:
version_map = fetch_service_graph(beamline_repo)
version_map = fetch_service_graph(repo)
svc_versions = version_map[service_name]
return svc_versions
except KeyError:
log.error("IOC not found")
return [" "]
except typer.Exit:
return [" "]
except CalledProcessError:
return [" "]
typer.echo(f"\n{service_name} not found", nl=False, err=True)
return []
except ShellError as e:
typer.echo(f"\n{e}", nl=False, err=True)
return []


def force_plain_completion() -> list[str]:
"""Forces filepath completion"""
return []


def running_svc(ctx: typer.Context) -> list[str]:
params = ctx.parent.params # type: ignore
namespace = params["namespace"] or globals.EC_K8S_NAMESPACE

# This block prevents getting a stack trace during autocompletion
autocomplete_backend_init(ctx)
try:
if namespace == globals.LOCAL_NAMESPACE:
docker = Docker().docker
format = "{{.Names}}"
command = f"{docker} ps --filter label=is_IOC=true --format {format}"
svc_list = str(shell.run_command(command, interactive=False)).split()
return svc_list
else:
check_namespace(namespace)
columns = "-o custom-columns=IOC_NAME:metadata.labels.app"
command = f"kubectl -n {namespace} get pod {columns}"
svc_list = str(shell.run_command(command, interactive=False)).split()[1:]
return svc_list
except typer.Exit:
return [" "]
except CalledProcessError:
return [" "]
return ec_backend.commands._running_services()
except CommandError as e:
typer.echo(f"\n{e}", nl=False, err=True)
return []


def all_svc(ctx: typer.Context) -> list[str]:
params = ctx.parent.params # type: ignore
namespace = params["namespace"] or globals.EC_K8S_NAMESPACE

# This block prevents getting a stack trace during autocompletion
autocomplete_backend_init(ctx)
try:
if namespace == globals.LOCAL_NAMESPACE:
docker = Docker().docker
format = "{{.Names}}"
command = f"{docker} ps -a --filter label=is_IOC=true --format {format}"
svc_list = str(shell.run_command(command, interactive=False)).split()
return svc_list
else:
check_namespace(namespace)
command = f"helm list -qn {namespace}"
svc_list = str(shell.run_command(command, interactive=False)).split()
return svc_list
except typer.Exit:
return [" "]
except CalledProcessError:
return [" "]
return ec_backend.commands._all_services()
except CommandError as e:
typer.echo(f"\n{e}", nl=False, err=True)
return []


def all_contexts(ctx: typer.Context) -> list[str]:
args = ConfigController(globals.CONFIG_ROOT / globals.ENV_CONFIG)
return args.get_contexts()
62 changes: 62 additions & 0 deletions src/edge_containers_cli/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
Manage a backend for the project
"""

from edge_containers_cli.cmds.argo_commands import ArgoCommands
from edge_containers_cli.cmds.commands import Commands
from edge_containers_cli.cmds.k8s_commands import K8sCommands
from edge_containers_cli.definitions import ECBackends, ECContext
from edge_containers_cli.utils import public_methods


class BackendError(Exception):
pass


class Backend:
def __init__(self) -> None:
self._value: ECBackends | None = None
self._cxt: ECContext | None = None
self._Commands: type | None = None
self._commands: Commands | None = None

@property
def commands(self):
if self._commands is None:
raise BackendError("Backend commands not constructed")
else:
return self._commands

def set_backend(self, backend: ECBackends):
self._value = backend
if backend == ECBackends.K8S:
self._Commands = K8sCommands
elif backend == ECBackends.ARGOCD:
self._Commands = ArgoCommands

def set_context(self, context: ECContext):
"""
Construct the appropriate Commands class
"""
if self._Commands is None:
pass
else:
self._cxt = context
self._commands = self._Commands(context)

def get_notimplemented(self) -> list[str]:
notimplemented = []
if self._Commands is None:
return []
else:
for command in public_methods(self._Commands):
if getattr(self._Commands, command) is getattr(Commands, command):
notimplemented.append(command)
return notimplemented


backend = Backend()


def init_backend(set_backend: ECBackends):
backend.set_backend(set_backend)
Loading

0 comments on commit d9d794b

Please sign in to comment.