Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor for argocd #160

Merged
merged 5 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/edge_containers_cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
import polars

from ._version import __version__

__all__ = ["__version__"]

# Set formatting of polars tables
polars.Config.set_tbl_hide_column_data_types(True)
polars.Config.set_tbl_hide_dataframe_shape(True)
polars.Config.set_tbl_rows(-1)
polars.Config.set_tbl_cols(-1)
polars.Config.set_fmt_str_lengths(82)
polars.Config.set_tbl_formatting("ASCII_MARKDOWN")
95 changes: 74 additions & 21 deletions src/edge_containers_cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

import typer

import edge_containers_cli.globals as globals
from edge_containers_cli.cmds.cli import cli
from edge_containers_cli.cli import cli
from edge_containers_cli.definitions import ENV, ECBackends, ECContext, ECLogLevels

from . import __version__
from .backend import backend as ec_backend
from .backend import init_backend
from .logging import init_logging
from .shell import init_shell
from .utils import init_cleanup

__all__ = ["main"]

Expand All @@ -17,6 +21,20 @@ def version_callback(value: bool):
raise typer.Exit()


def backend_callback(ctx: typer.Context, backend: ECBackends):
init_backend(backend)
# Dynamically drop any method not implemented
not_implemented = [
mthd.replace("_", "-") for mthd in ec_backend.get_notimplemented()
]
for command in not_implemented:
typer_commands = ctx.command.commands # type: ignore
if command in typer_commands:
typer_commands.pop(command)

return backend.value


@cli.callback()
def main(
ctx: typer.Context,
Expand All @@ -28,39 +46,74 @@ def main(
help="Log the version of ec and exit",
),
repo: str = typer.Option(
"",
ECContext().repo,
"-r",
"--repo",
help="service/ioc instances repository",
help="Service instances repository",
envvar=ENV.repo.value,
),
namespace: str = typer.Option(
"", "-n", "--namespace", help="kubernetes namespace to use"
target: str = typer.Option(
ECContext().target,
"-t",
"--target",
help="K8S namespace or ARGOCD <project>/<root-app>",
envvar=ENV.target.value,
),
log_level: str = typer.Option(
"WARN", help="Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)"
backend: ECBackends = typer.Option(
ECBackends.ARGOCD,
"-b",
"--backend",
callback=backend_callback,
is_eager=True,
help="Backend to use",
envvar=ENV.backend.value,
expose_value=True,
),
verbose: bool = typer.Option(
globals.EC_VERBOSE, "-v", "--verbose", help="print the commands we run"
False,
"-v",
"--verbose",
help="Print the commands we run",
envvar=ENV.verbose.value,
show_default=True,
),
dryrun: bool = typer.Option(
False,
"--dryrun",
help="Print the commands we run without execution",
envvar=ENV.dryrun.value,
show_default=True,
),
debug: bool = typer.Option(
globals.EC_DEBUG,
False,
"-d",
"--debug",
help="Enable debug logging to console and retain temporary files",
help="Enable debug logging, retain temp files",
envvar=ENV.debug.value,
show_default=True,
),
log_level: ECLogLevels = typer.Option(
ECLogLevels.WARNING,
help="Log level",
envvar=ENV.log_level.value,
),
log_url: str = typer.Option(
ECContext().log_url,
help="Log url",
envvar=ENV.log_url.value,
),
):
"""Edge Containers assistant CLI"""
init_logging(ECLogLevels.DEBUG if debug else log_level)
init_shell(verbose, dryrun)
init_cleanup(debug)

globals.EC_VERBOSE, globals.EC_DEBUG = bool(verbose), bool(debug)

init_logging(log_level.upper())

# create a context dictionary to pass to all sub commands
repo = repo or globals.EC_SERVICES_REPO
namespace = namespace or globals.EC_K8S_NAMESPACE
ctx.ensure_object(globals.Context)
context = globals.Context(namespace=namespace, beamline_repo=repo)
ctx.obj = context
context = ECContext(
repo=repo,
target=target,
log_url=log_url,
)
ec_backend.set_context(context)


# test with:
Expand Down
148 changes: 52 additions & 96 deletions src/edge_containers_cli/autocomplete.py
Original file line number Diff line number Diff line change
@@ -1,140 +1,96 @@
import json
import os
import tempfile
import time
import urllib
from pathlib import Path
from subprocess import CalledProcessError

import typer

import edge_containers_cli.globals as globals
import edge_containers_cli.shell as shell
from edge_containers_cli.cmds.k8s_commands import check_namespace
from edge_containers_cli.docker import Docker
from edge_containers_cli.git import create_version_map
from edge_containers_cli.logging import log
from edge_containers_cli.utils import cleanup_temp
from edge_containers_cli.backend import backend as ec_backend
from edge_containers_cli.cmds.commands import CommandError
from edge_containers_cli.definitions import ECContext
from edge_containers_cli.git import GitError, create_version_map
from edge_containers_cli.shell import ShellError
from edge_containers_cli.utils import cache_dict, new_workdir, read_cached_dict


def url_encode(in_string: str) -> str:
return urllib.parse.quote(in_string, safe="") # type: ignore


def cache_dict(cache_folder: str, cached_file: str, data_struc: dict) -> None:
cache_dir = os.path.join(globals.CACHE_ROOT, cache_folder)
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

cache_path = os.path.join(cache_dir, cached_file)
with open(cache_path, "w") as f:
f.write(json.dumps(data_struc, indent=4))


def read_cached_dict(cache_folder: str, cached_file: str) -> dict:
cache_path = os.path.join(globals.CACHE_ROOT, cache_folder, cached_file)
read_dict = {}

# Check cache if available
if os.path.exists(cache_path):
# Read from cache if not stale
if (time.time() - os.path.getmtime(cache_path)) < globals.CACHE_EXPIRY:
with open(cache_path) as f:
read_dict = json.load(f)

return read_dict


def fetch_service_graph(beamline_repo: str) -> dict:
version_map = read_cached_dict(url_encode(beamline_repo), globals.IOC_CACHE)
def autocomplete_backend_init(ctx: typer.Context):
params = ctx.parent.params # type: ignore
context = ECContext(

Check warning on line 21 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L20-L21

Added lines #L20 - L21 were not covered by tests
repo=params["repo"],
target=params["target"],
log_url=params["log_url"],
)
ec_backend.set_context(context)

Check warning on line 26 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L26

Added line #L26 was not covered by tests


def fetch_service_graph(repo: str) -> dict:
version_map = read_cached_dict(

Check warning on line 30 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L30

Added line #L30 was not covered by tests
globals.CACHE_ROOT / url_encode(repo), globals.SERVICE_CACHE
)
if not version_map:
tmp_dir = Path(tempfile.mkdtemp())
version_map = create_version_map(beamline_repo, tmp_dir)
cache_dict(url_encode(beamline_repo), globals.IOC_CACHE, version_map)
cleanup_temp(tmp_dir)
with new_workdir() as path:
version_map = create_version_map(

Check warning on line 35 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L34-L35

Added lines #L34 - L35 were not covered by tests
repo, Path(globals.SERVICES_DIR), path, shared=[globals.SHARED_VALUES]
)
cache_dict(

Check warning on line 38 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L38

Added line #L38 was not covered by tests
globals.CACHE_ROOT / url_encode(repo),
globals.SERVICE_CACHE,
version_map,
)

return version_map


def avail_services(ctx: typer.Context) -> list[str]:
params = ctx.parent.params # type: ignore
services_repo = params["repo"] or globals.EC_SERVICES_REPO
autocomplete_backend_init(ctx)

Check warning on line 48 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L48

Added line #L48 was not covered by tests

# This block prevents getting a stack trace during autocompletion
try:
services_graph = fetch_service_graph(services_repo)
services_graph = fetch_service_graph(ec_backend.commands.repo)

Check warning on line 52 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L52

Added line #L52 was not covered by tests
return list(services_graph.keys())
except typer.Exit:
return [" "]
except CalledProcessError:
return [" "]
except (ShellError, CommandError) as e:
typer.echo(f"\n{e}", nl=False, err=True)
return []

Check warning on line 56 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L54-L56

Added lines #L54 - L56 were not covered by tests


def avail_versions(ctx: typer.Context) -> list[str]:
params = ctx.parent.params # type: ignore
beamline_repo = params["repo"] or globals.EC_SERVICES_REPO
autocomplete_backend_init(ctx)

Check warning on line 60 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L60

Added line #L60 was not covered by tests
service_name = ctx.params["service_name"]

# This block prevents getting a stack trace during autocompletion
try:
version_map = fetch_service_graph(beamline_repo)
version_map = fetch_service_graph(ec_backend.commands.repo)

Check warning on line 65 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L65

Added line #L65 was not covered by tests
svc_versions = version_map[service_name]
return svc_versions
except KeyError:
log.error("IOC not found")
return [" "]
except typer.Exit:
return [" "]
except CalledProcessError:
return [" "]
typer.echo(f"\n{service_name} not found", nl=False, err=True)
return []
except (ShellError, CommandError, GitError) as e:
typer.echo(f"\n{e}", nl=False, err=True)
return []

Check warning on line 73 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L69-L73

Added lines #L69 - L73 were not covered by tests


def force_plain_completion() -> list[str]:
"""Forces filepath completion"""
return []


def running_svc(ctx: typer.Context) -> list[str]:
params = ctx.parent.params # type: ignore
namespace = params["namespace"] or globals.EC_K8S_NAMESPACE

# This block prevents getting a stack trace during autocompletion
autocomplete_backend_init(ctx)

Check warning on line 82 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L82

Added line #L82 was not covered by tests
try:
if namespace == globals.LOCAL_NAMESPACE:
docker = Docker().docker
format = "{{.Names}}"
command = f"{docker} ps --filter label=is_IOC=true --format {format}"
svc_list = str(shell.run_command(command, interactive=False)).split()
return svc_list
else:
check_namespace(namespace)
columns = "-o custom-columns=IOC_NAME:metadata.labels.app"
command = f"kubectl -n {namespace} get pod {columns}"
svc_list = str(shell.run_command(command, interactive=False)).split()[1:]
return svc_list
except typer.Exit:
return [" "]
except CalledProcessError:
return [" "]
return ec_backend.commands._running_services()
except (CommandError, ShellError) as e:
typer.echo(f"\n{e}", nl=False, err=True)
return []

Check warning on line 87 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L84-L87

Added lines #L84 - L87 were not covered by tests


def all_svc(ctx: typer.Context) -> list[str]:
params = ctx.parent.params # type: ignore
namespace = params["namespace"] or globals.EC_K8S_NAMESPACE

# This block prevents getting a stack trace during autocompletion
autocomplete_backend_init(ctx)

Check warning on line 91 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L91

Added line #L91 was not covered by tests
try:
if namespace == globals.LOCAL_NAMESPACE:
docker = Docker().docker
format = "{{.Names}}"
command = f"{docker} ps -a --filter label=is_IOC=true --format {format}"
svc_list = str(shell.run_command(command, interactive=False)).split()
return svc_list
else:
check_namespace(namespace)
command = f"helm list -qn {namespace}"
svc_list = str(shell.run_command(command, interactive=False)).split()
return svc_list
except typer.Exit:
return [" "]
except CalledProcessError:
return [" "]
return ec_backend.commands._all_services()
except (CommandError, ShellError) as e:
typer.echo(f"\n{e}", nl=False, err=True)
return []

Check warning on line 96 in src/edge_containers_cli/autocomplete.py

View check run for this annotation

Codecov / codecov/patch

src/edge_containers_cli/autocomplete.py#L93-L96

Added lines #L93 - L96 were not covered by tests
Loading
Loading