Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add better namespace logic to warnet logs #659

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions src/warnet/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,33 +360,33 @@ def filter(path):
@click.command()
@click.argument("pod_name", type=str, default="")
@click.option("--follow", "-f", is_flag=True, default=False, help="Follow logs")
@click.option("--namespace", type=str, default="default", show_default=True)
@click.option("--namespace", type=str, default="", show_default=True)
def logs(pod_name: str, follow: bool, namespace: str):
"""Show the logs of a pod"""
return _logs(pod_name, follow, namespace)


def _logs(pod_name: str, follow: bool, namespace: Optional[str] = None):
namespace = get_default_namespace_or(namespace)

def format_pods(pods: list[V1Pod]) -> list[str]:
def format_pods(pods: list[V1Pod], namespace: Optional[str]) -> list[str]:
if namespace:
pods = [pod for pod in pods if pod.metadata.namespace == namespace]
sorted_pods = sorted(pods, key=lambda pod: pod.metadata.creation_timestamp, reverse=True)
return [f"{pod.metadata.name}: {pod.metadata.namespace}" for pod in sorted_pods]

if pod_name == "":
try:
pod_list = []
formatted_commanders = format_pods(get_mission(COMMANDER_MISSION))
formatted_tanks = format_pods(get_mission(TANK_MISSION))
formatted_commanders = format_pods(get_mission(COMMANDER_MISSION), namespace)
formatted_tanks = format_pods(get_mission(TANK_MISSION), namespace)
pod_list.extend(formatted_commanders)
pod_list.extend(formatted_tanks)

except Exception as e:
print(f"Could not fetch any pods in namespace ({namespace}): {e}")
click.secho(f"Could not fetch any pods: {e}")
return

if not pod_list:
print(f"Could not fetch any pods in namespace ({namespace})")
click.secho("Could not fetch any pods.")
return

q = [
Expand All @@ -402,8 +402,51 @@ def format_pods(pods: list[V1Pod]) -> list[str]:
else:
return # cancelled by user

try:
pod = get_pod(pod_name, namespace=namespace)
except Exception as e:
click.secho(e)
click.secho(f"Could not get pod: {pod_name}: {namespace}")
return

else:
pod = None
if not namespace:
namespaces = []
for v1namespace in get_namespaces():
namespace_candidate = v1namespace.metadata.name
try:
pod = get_pod(pod_name, namespace=namespace_candidate)
namespaces.append(namespace_candidate)
except Exception:
pass

if len(namespaces) > 1:
click.secho(f"The pod '{pod.metadata.name}' is found in these namespaces:")
for ns in namespaces:
click.secho(f" - {ns}")
click.secho("Please limit your search to one of those namespaces.")
return

if not namespaces:
click.echo(f"Could not find pod in any namespaces: {pod_name}")
return

namespace = namespaces[0]

else:
try:
pod = get_pod(pod_name, namespace=namespace)
except Exception as e:
click.secho(e)
click.secho(f"Could not get pod: {pod_name}: {namespace}")
return

if not pod:
click.secho(f"Could not find: {pod_name}", fg="yellow")
sys.exit(1)

try:
pod = get_pod(pod_name, namespace=namespace)
eligible_container_names = [BITCOINCORE_CONTAINER, COMMANDER_CONTAINER]
available_container_names = [container.name for container in pod.spec.containers]
container_name = next(
Expand Down
92 changes: 92 additions & 0 deletions test/namespace_admin_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import Callable, Optional

import pexpect
from scenarios_test import ScenariosTest
from test_base import TestBase

Expand Down Expand Up @@ -41,6 +42,8 @@ def __init__(self):
self.blue_users = ["carol-warnettest", "default", "mallory-warnettest"]
self.red_users = ["alice-warnettest", self.bob_user, "default"]

self.bitcoin_version_slug = "Bitcoin Core version v27.0.0"

def run_test(self):
try:
os.chdir(self.tmpdir)
Expand All @@ -49,7 +52,9 @@ def run_test(self):
self.setup_namespaces()
self.setup_service_accounts()
self.setup_network()
self.admin_checks_logs()
self.authenticate_and_become_bob()
self.bob_checks_logs()
self.bob_runs_scenario_tests()
finally:
self.return_to_initial_context()
Expand Down Expand Up @@ -176,6 +181,72 @@ def bob_runs_scenario_tests(self):
super().run_test()
assert self.this_is_the_current_context(self.bob_context)

def bob_checks_logs(self):
assert self.this_is_the_current_context(self.bob_context)
self.log.info("Bob will check the logs")

sut = pexpect.spawn("warnet logs", maxread=4096 * 10)
assert expect_without_traceback("Please choose a pod", sut)
sut.sendline("")
assert expect_without_traceback(self.bitcoin_version_slug, sut)
sut.close()

sut = pexpect.spawn(f"warnet logs --namespace {self.red_namespace}", maxread=4096 * 10)
assert expect_without_traceback("Please choose a pod", sut)
sut.sendline("")
assert expect_without_traceback(self.bitcoin_version_slug, sut)
sut.close()

sut = pexpect.spawn("warnet logs tank-0008", maxread=4096 * 10)
assert expect_without_traceback(self.bitcoin_version_slug, sut)
sut.close()

sut = pexpect.spawn(
f"warnet logs tank-0008 --namespace {self.red_namespace}", maxread=4096 * 10
)
assert expect_without_traceback(self.bitcoin_version_slug, sut)
sut.close()

sut = pexpect.spawn("warnet logs this_does_not_exist", maxread=4096 * 10)
assert expect_without_traceback("Could not find pod in any namespaces", sut)
sut.close()

self.log.info("Bob has checked the logs")
assert self.this_is_the_current_context(self.bob_context)

def admin_checks_logs(self):
assert self.this_is_the_current_context(self.initial_context)
self.log.info("The admin will check the logs")

sut = pexpect.spawn("warnet logs", maxread=4096 * 10)
assert expect_without_traceback("Please choose a pod", sut)
sut.sendline("")
assert expect_without_traceback(self.bitcoin_version_slug, sut)
sut.close()

sut = pexpect.spawn(f"warnet logs --namespace {self.red_namespace}", maxread=4096 * 10)
assert expect_without_traceback("Please choose a pod", sut)
sut.sendline("")
assert expect_without_traceback(self.bitcoin_version_slug, sut)
sut.close()

sut = pexpect.spawn("warnet logs tank-0008", maxread=4096 * 10)
assert expect_without_traceback("The pod 'tank-0008' is found in these namespaces", sut)
sut.close()

sut = pexpect.spawn(
f"warnet logs tank-0008 --namespace {self.red_namespace}", maxread=4096 * 10
)
assert expect_without_traceback(self.bitcoin_version_slug, sut)
sut.close()

sut = pexpect.spawn("warnet logs this_does_not_exist", maxread=4096 * 10)
assert expect_without_traceback("Could not find pod in any namespaces", sut)
sut.close()

self.log.info("The admin has checked the logs")
assert self.this_is_the_current_context(self.initial_context)


def remove_user(kubeconfig_data: dict, username: str) -> dict:
kubeconfig_data["users"] = [
Expand All @@ -191,6 +262,27 @@ def remove_context(kubeconfig_data: dict, context_name: str) -> dict:
return kubeconfig_data


class StackTraceFoundException(Exception):
"""Custom exception raised when a stack trace is found in the output."""

pass


def expect_without_traceback(expectation: str, sut: pexpect.spawn, timeout: int = 2) -> bool:
expectation_found = False
while True:
try:
sut.expect(["\r", "\n"], timeout=timeout) # inquirer uses \r
line = sut.before.decode("utf-8")
if "Traceback (" in line:
raise StackTraceFoundException
if expectation in line:
expectation_found = True
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT):
break
return expectation_found


if __name__ == "__main__":
test = NamespaceAdminTest()
test.run_test()