From be887a0230092be55894a38393cc623824160dc7 Mon Sep 17 00:00:00 2001 From: Tyler Gu Date: Wed, 25 Sep 2024 15:19:21 -0500 Subject: [PATCH] Refactor chactos Signed-off-by: Tyler Gu --- acto/__main__.py | 30 --- acto/kubernetes_engine/base.py | 5 + chactos/__main__.py | 33 ++- chactos/fault_injections.py | 426 +++++++++++---------------------- chactos/fault_injector.py | 38 +++ 5 files changed, 213 insertions(+), 319 deletions(-) create mode 100644 chactos/fault_injector.py diff --git a/acto/__main__.py b/acto/__main__.py index b7c54ecef2..d01ddd2fc4 100644 --- a/acto/__main__.py +++ b/acto/__main__.py @@ -11,9 +11,7 @@ from acto.engine import Acto, apply_testcase from acto.input.input import DeterministicInputModel from acto.lib.operator_config import OperatorConfig -from chactos.fault_injection_config import FaultInjectionConfig from acto.post_process.post_diff_test import PostDiffTest -from chactos.fault_injections import ChactosDriver from acto.utils.error_handler import handle_excepthook, thread_excepthook from acto.utils.thread_logger import get_thread_logger @@ -38,14 +36,6 @@ help="Operator porting config path", required=True, ) -parser.add_argument( - "--fault_injection_config", - "-fjc", - dest="fault_injection_config", - help="Operator porting fault injection config path", - required=False, - default="", -) parser.add_argument( "--cluster-runtime", "-r", @@ -131,15 +121,6 @@ del config["monkey_patch"] config = OperatorConfig.model_validate(config) -if args.fault_injection_config != "": - with open( - args.fault_injection_config, "r", encoding="utf-8" - ) as fault_injection_config_file: - fault_injection_config = json.load(fault_injection_config_file) - fault_injection_config = FaultInjectionConfig.model_validate( - fault_injection_config - ) - logger.info("Acto started with [%s]", sys.argv) logger.info("Operator config: %s", config) @@ -183,16 +164,5 @@ p.post_process(post_diff_test_dir, num_workers=args.num_workers) p.check(post_diff_test_dir, num_workers=args.num_workers) -# logger.info("Acto invokes Chactos for fault injection") - -# chactos = ChactosDriver( -# testrun_dir=args.workdir_path, -# operator_config=config, -# fault_injection_config=fault_injection_config, -# context_file=context_cache, -# worker_id=0, -# ) -# chactos.run() - end_time = datetime.now() logger.info("Acto end to end finished in %s", end_time - start_time) diff --git a/acto/kubernetes_engine/base.py b/acto/kubernetes_engine/base.py index 21e18814fe..c8cebe5190 100644 --- a/acto/kubernetes_engine/base.py +++ b/acto/kubernetes_engine/base.py @@ -111,3 +111,8 @@ def get_node_list(self, name: str): # no nodes can be found, returning an empty array return [] return p.stdout.strip().split("\n") + + @staticmethod + def cluster_name(acto_namespace: int, worker_id: int) -> str: + """Helper function to generate cluster name""" + return f"acto-{acto_namespace}-cluster-{worker_id}" diff --git a/chactos/__main__.py b/chactos/__main__.py index 4df83ed2b2..75ed918d96 100644 --- a/chactos/__main__.py +++ b/chactos/__main__.py @@ -3,8 +3,9 @@ import logging from datetime import datetime -from chactos.fault_injection_config import FaultInjectionConfig -from chactos.fault_injections import ExperimentDriver +from lib.operator_config import OperatorConfig + +from chactos.fault_injections import ChactosDriver parser = argparse.ArgumentParser( description="Automatic, Continuous Testing for k8s/openshift Operators" @@ -20,9 +21,23 @@ "--config", "-c", dest="config", + help="Operator config path", + required=True, +) +parser.add_argument( + "--fi-config", + "-fic", + dest="fi_config", help="Operator fault injection config path", required=True, ) +parser.add_argument( + "--testrun-dir", + "-td", + dest="testrun_dir", + help="Testrun directory to load the inputs from", + required=True, +) args = parser.parse_args() logging.basicConfig( @@ -33,9 +48,17 @@ logging.getLogger("sh").setLevel(logging.ERROR) with open(args.config, "r", encoding="utf-8") as config_file: - config = json.load(config_file) + config_data = json.load(config_file) + operator_config = OperatorConfig.model_validate(config_data) + +with open(args.fi_config, "r", encoding="utf-8") as fi_config_file: + fi_config_data = json.load(fi_config_file) + fi_config = OperatorConfig.model_validate(fi_config_data) -driver = ExperimentDriver( - operator_config=FaultInjectionConfig.model_validate(config), worker_id=0 +driver = ChactosDriver( + testrun_dir=args.testrun_dir, + work_dir=args.workdir_path, + operator_config=operator_config, + fault_injection_config=fi_config, ) driver.run() diff --git a/chactos/fault_injections.py b/chactos/fault_injections.py index d06f5130de..eb6364b462 100644 --- a/chactos/fault_injections.py +++ b/chactos/fault_injections.py @@ -1,168 +1,115 @@ -import glob import logging import multiprocessing import os import queue +import subprocess import time import kubernetes import yaml -import json -from acto import constant -from acto.common import kubernetes_client +from acto.common import kubernetes_client, print_event from acto.deploy import Deploy -from acto.kubectl_client.helm import Helm from acto.kubectl_client.kubectl import KubectlClient +from acto.kubernetes_engine import kind +from acto.lib.operator_config import OperatorConfig from acto.post_process.post_process import PostProcessor -from acto.kubernetes_engine.base import KubernetesEngine -from acto.kubernetes_engine.kind import Kind from acto.system_state.kubernetes_system_state import KubernetesSystemState +from acto.trial import Trial from acto.utils import acto_timer -from chactos.failures.file_chaos import ( - ApplicationFileDelay, - ApplicationFileFailure, -) +from chactos.failures import failure from chactos.failures.network_chaos import OperatorApplicationPartitionFailure from chactos.fault_injection_config import FaultInjectionConfig -from acto.lib.operator_config import OperatorConfig -def load_inputs_from_dir(dir_: str) -> list[object]: - """Load inputs from a directory""" - inputs = [] - files = sorted(glob.glob(f"{dir_}/input-*.yaml")) - logging.info("Loading %d inputs from %s", len(files), dir_) - for file in files: - with open(file, "r", encoding="utf-8") as f: - inputs.append(yaml.load(f, Loader=yaml.FullLoader)) +class ChactosDriver(PostProcessor): + """Fault injection driver""" + + def __init__( + self, + testrun_dir: str, + work_dir: str, + operator_config: OperatorConfig, + fault_injection_config: FaultInjectionConfig, + ): + super().__init__(testrun_dir=testrun_dir, config=operator_config) + self._operator_config = operator_config + self._fault_injection_config = fault_injection_config + self._work_dir = work_dir - return inputs + self.namespace = self.context["namespace"] + self.kubernetes_provider = kind.Kind( + acto_namespace=0, + feature_gates=operator_config.kubernetes_engine.feature_gates, + num_nodes=operator_config.num_nodes, + version=operator_config.kubernetes_version, + ) -class ExperimentDriver: - """Driver class for running fault injection experiments.""" + # Build an archive to be preloaded + container_tool = os.getenv("IMAGE_TOOL", "docker") + self._images_archive = os.path.join(work_dir, "images.tar") + if len(self.context["preload_images"]) > 0: + print_event("Preparing required images...") + # first make sure images are present locally + for image in self.context["preload_images"]: + subprocess.run( + [container_tool, "pull", image], + stdout=subprocess.DEVNULL, + check=True, + ) + subprocess.run( + [container_tool, "image", "save", "-o", self._images_archive] + + list(self.context["preload_images"]), + stdout=subprocess.DEVNULL, + check=True, + ) - def __init__(self, operator_config: FaultInjectionConfig, worker_id: int): - self._worker_id = worker_id - self._operator_config = operator_config + self._deployer = Deploy(operator_config.deploy) + + def fault_injection_trial_dir(self, trial_name: str, sequence: int): + """Return the fault injection trial directory""" + return os.path.join(self._work_dir, f"{trial_name}-fi-{sequence}") def run(self): - """Run the experiment.""" - operator_selector = self._operator_config.operator_selector - operator_selector["namespaces"] = [constant.CONST.ACTO_NAMESPACE] - app_selector = self._operator_config.application_selector - app_selector["namespaces"] = [constant.CONST.ACTO_NAMESPACE] + """Run the fault injection exp""" + operator_selector = self._fault_injection_config.operator_selector + operator_selector["namespaces"] = [self.namespace] + app_selector = self._fault_injection_config.application_selector + app_selector["namespaces"] = [self.namespace] failures = [] - failures.append( - ApplicationFileFailure( - app_selector=app_selector, - data_dir=self._operator_config.application_data_dir, - ) - ) failures.append( OperatorApplicationPartitionFailure( operator_selector=operator_selector, app_selector=app_selector, - namespace=constant.CONST.ACTO_NAMESPACE - ) - ) - failures.append( - ApplicationFileDelay( - app_selector=app_selector, - data_dir=self._operator_config.application_data_dir, + namespace=self.namespace, ) ) - for failure in failures: - logging.info("Applying failure %s", failure.name()) - k8s_cluster_engine: KubernetesEngine = Kind( - 0, num_nodes=self._operator_config.kubernetes.num_nodes - ) - cluster_name = f"acto-cluster-{self._worker_id}" - kubecontext = k8s_cluster_engine.get_context_name(cluster_name) - kubeconfig = os.path.join( - os.path.expanduser("~"), ".kube", kubecontext - ) - k8s_cluster_engine.restart_cluster(cluster_name, kubeconfig) - apiclient = kubernetes_client(kubeconfig, kubecontext) - kubectl_client = KubectlClient(kubeconfig, kubecontext) - - # Deploy dependencies and operator - helm_client = Helm(kubeconfig, kubecontext) - p = helm_client.install( - release_name="chaos-mesh", - chart="chaos-mesh", - namespace="cass-operator", - repo="https://charts.chaos-mesh.org", - args=[ - "--set", - "chaosDaemon.runtime=containerd", - "--set", - "chaosDaemon.socketPath=/run/containerd/containerd.sock", - "--version", - "2.6.3", - ], - ) - if p.returncode != 0: - raise RuntimeError("Failed to install chaos-mesh", p.stderr) - - deployer = Deploy(self._operator_config.deploy) - deployer.deploy( - kubeconfig, - kubecontext, - kubectl_client=KubectlClient( - kubeconfig=kubeconfig, context_name=kubecontext - ), - namespace=constant.CONST.ACTO_NAMESPACE, - ) - - crs = load_inputs_from_dir(self._operator_config.input_dir) - - cr = crs.pop(0) - self.apply_cr(cr, kubectl_client) - converged = wait_for_converge( - apiclient, constant.CONST.ACTO_NAMESPACE - ) - - if not converged: - logging.error("Failed to converge") - return - - while crs: - failure.apply(kubectl_client) - cr = crs.pop(0) - self.apply_cr(cr, kubectl_client) - converged = wait_for_converge( - apiclient, constant.CONST.ACTO_NAMESPACE, hard_timeout=180 - ) - - failure.cleanup(kubectl_client) - converged = wait_for_converge( - apiclient, constant.CONST.ACTO_NAMESPACE - ) - - # oracle - system_state = KubernetesSystemState.from_api_client( - api_client=apiclient, - namespace=constant.CONST.ACTO_NAMESPACE, - ) - health = system_state.check_health() - if not health.is_healthy(): - logging.error("System is not healthy %s", health) - return + for failure in failures: + for trial_name, trial in self.trials.items(): + for worker_id in range(self._operator_config.num_nodes): + self.run_trial( + trial_name=trial_name, + trial=trial, + worker_id=worker_id, + failure=failure, + ) def apply_cr( self, - cr: dict, + cr, kubectl_client: KubectlClient, ): """Apply a CR.""" + cr_file = "cr.yaml" with open(cr_file, "w", encoding="utf-8") as f: yaml.dump(cr, f) p = kubectl_client.kubectl( - ["apply", "-f", cr_file, "-n", constant.CONST.ACTO_NAMESPACE] + ["apply", "-f", cr_file, "-n", self.namespace], + capture_output=True, + text=True, ) if p.returncode != 0: logging.error( @@ -174,191 +121,102 @@ def apply_cr( return False return True - -class ChactosDriver(PostProcessor): - """Fault injection driver""" - - def __init__( + def run_trial( self, - testrun_dir: str, - operator_config: OperatorConfig, - fault_injection_config: FaultInjectionConfig, - context_file: str, + trial_name: str, + trial: Trial, worker_id: int, + failure: failure.Failure, ): - super().__init__(testrun_dir=testrun_dir, config=operator_config) - self._worker_id = worker_id - self._operator_config = operator_config - self._fault_injection_config = fault_injection_config - # with open(context_file, "r", encoding="utf-8") as context_fin: - # self.context = json.load(context_fin) - # self.context["preload_images"] = set( - # self.context["preload_images"] - # ) - self.namespace = self._context["namespace"] - - def fetch_crs_from_trials(self) -> dict[str, object]: - """ - Fetches crs from trials and put them into a list of string yamls - """ + """Run a trial, this function can be parallelized - # TODO: IO ops exists program prematurely?? - crs = {} - trials = self.trial_to_steps + This function takes a trial from the Acto's normal test run and + runs it with fault injections. - for trial_names in trials: - input_crs = [] - steps = trials[trial_names].steps - for tup in steps.items(): - step = tup[1] - input_cr = step.snapshot.input_cr - input_crs.append(input_cr) - crs[trial_names] = input_crs - # print(len(crs[trial_names])) - return crs + With a normal trial with sequence of empty -> m1 -> m2 -> m3 -> m4, + it may result in multiple fault injection trials: + empty -> m1 -> m2 -> m3 + empty -> m3 -> m4 + if m2 -> m3 fails in the fault injection trial. + """ + fault_injection_sequence = 0 - def run(self): - """Run the fault injection exp""" - operator_selector = self._fault_injection_config.operator_selector - operator_selector["namespaces"] = [self.namespace] - app_selector = self._fault_injection_config.application_selector - app_selector["namespaces"] = [self.namespace] - failures = [] - failures.append( - OperatorApplicationPartitionFailure( - operator_selector=operator_selector, - app_selector=app_selector, - namespace=self.namespace + steps = sorted(trial.steps.keys()) + while steps: + fault_injection_trial_dir = self.fault_injection_trial_dir( + trial_name=trial_name, sequence=fault_injection_sequence ) - ) + os.makedirs(fault_injection_trial_dir, exist_ok=True) - for failure in failures: - logging.info("Applying failure %s", failure.name()) - k8s_cluster_engine: KubernetesEngine = Kind( - 0, num_nodes=self._fault_injection_config.kubernetes.num_nodes + # Set up the Kubernetes cluster + kubernetes_cluster_name = self.kubernetes_provider.cluster_name( + acto_namespace=0, worker_id=worker_id + ) + kubernetes_context = self.kubernetes_provider.get_context_name( + kubernetes_cluster_name ) - cluster_name = f"acto-cluster-{self._worker_id}" - kubecontext = k8s_cluster_engine.get_context_name(cluster_name) kubeconfig = os.path.join( - os.path.expanduser("~"), ".kube", kubecontext + os.path.expanduser("~"), ".kube", kubernetes_context ) - k8s_cluster_engine.restart_cluster(cluster_name, kubeconfig) - apiclient = kubernetes_client(kubeconfig, kubecontext) - kubectl_client = KubectlClient(kubeconfig, kubecontext) - - logging.info("Installing the operator...") - deployer = Deploy(self._fault_injection_config.deploy) - deployer.deploy( - kubeconfig, - kubecontext, - kubectl_client=KubectlClient( - kubeconfig=kubeconfig, context_name=kubecontext - ), - namespace=self.namespace, + self.kubernetes_provider.restart_cluster( + kubernetes_cluster_name, kubeconfig ) - logging.debug("Done") - - # Deploy dependencies and operator - logging.info("Installing chaos-mesh into namespace [%s]...", self.namespace) - helm_client = Helm(kubeconfig, kubecontext) - p = helm_client.install( - release_name="chaos-mesh", - chart="chaos-mesh", - namespace=self.namespace, - repo="https://charts.chaos-mesh.org", - namespace_existed=True, - args=[ - "--set", - "chaosDaemon.runtime=containerd", - "--set", - "chaosDaemon.socketPath=/run/containerd/containerd.sock", - "--version", - "2.7.0", - ], + self.kubernetes_provider.load_images( + self._images_archive, kubernetes_cluster_name ) - logging.debug("Done") + kubectl_client = KubectlClient(kubeconfig, kubernetes_context) + api_client = kubernetes_client(kubeconfig, kubernetes_context) - if p.returncode != 0: - raise RuntimeError("Failed to install chaos-mesh", p.stderr) + # Set up the operator and dependencies + deployed = self._deployer.deploy_with_retry( + kubeconfig, + kubernetes_context, + kubectl_client=kubectl_client, + namespace=self.context["namespace"], + ) + if not deployed: + logging.error("Failed to deploy the operator") + return - logging.info("Fetching CRs from trials made by Acto...") - trial_crs = self.fetch_crs_from_trials() - logging.debug("Done") + step_key = steps.pop(0) + step = trial.steps[step_key] + self.apply_cr(step.snapshot.input_cr, kubectl_client) + wait_for_converge( + kubernetes_client(kubeconfig, kubernetes_context), + self.context["namespace"], + ) + while steps: + step_key = steps.pop(0) + step = trial.steps[step_key] - for trial_name, trial_crs in trial_crs.items(): - logging.info("Testing trial [%s]", trial_name) + logging.debug("Applying failure NOW %s", failure.name()) + failure.apply(kubectl_client) - crs = trial_crs - cr = crs.pop(0) + logging.debug("Applying next CR") + self.apply_cr(step.snapshot.input_cr, kubectl_client) - logging.debug("Applying first CR in the trial") - self.apply_cr(cr, kubectl_client) + logging.debug("Waiting for CR to converge") converged = wait_for_converge( - apiclient, self.namespace + api_client, self.namespace, hard_timeout=180 ) - if not converged: - logging.error("Failed to converge") - return - - logging.debug("Running fault injection on all [%i] CRs in this trial", len(crs)) - while crs: - logging.debug("Applying failure NOW %s", failure.name()) - failure.apply(kubectl_client) - - logging.debug("Applying next CR") - cr = crs.pop(0) - self.apply_cr(cr, kubectl_client) - - logging.debug("Waiting for CR to converge") - converged = wait_for_converge( - apiclient, self.namespace, hard_timeout=180 - ) - - logging.debug("Clearning up failure %s", failure.name()) - failure.cleanup(kubectl_client) - - logging.debug("Waiting for cleanup to converge") - converged = wait_for_converge( - apiclient, self.namespace - ) - - logging.debug("Acquiring oracle.") - # oracle - system_state = KubernetesSystemState.from_api_client( - api_client=apiclient, - namespace=self.namespace, - ) - health = system_state.check_health() - if not health.is_healthy(): - logging.error("System is not healthy %s", health) - return + logging.debug("Clearning up failure %s", failure.name()) + failure.cleanup(kubectl_client) - def apply_cr( - self, - cr, - kubectl_client: KubectlClient, - ): - """Apply a CR.""" + logging.debug("Waiting for cleanup to converge") + converged = wait_for_converge(api_client, self.namespace) - cr_file = "cr.yaml" - with open(cr_file, "w", encoding="utf-8") as f: - yaml.dump(cr, f) - p = kubectl_client.kubectl( - ["apply", "-f", cr_file, "-n", self.namespace], - capture_output=True, - text=True, - ) - if p.returncode != 0: - logging.error( - "Failed to apply CR due to error from kubectl" - + f" (returncode={p.returncode})" - + f" (stdout={p.stdout})" - + f" (stderr={p.stderr})" - ) - return False - return True + logging.debug("Acquiring oracle.") + # oracle + system_state = KubernetesSystemState.from_api_client( + api_client=api_client, + namespace=self.namespace, + ) + health = system_state.check_health() + if not health.is_healthy(): + logging.error("System is not healthy %s", health) + break def wait_for_converge(api_client, namespace, wait_time=60, hard_timeout=600): diff --git a/chactos/fault_injector.py b/chactos/fault_injector.py new file mode 100644 index 0000000000..9c413a4954 --- /dev/null +++ b/chactos/fault_injector.py @@ -0,0 +1,38 @@ +import abc + +from kubectl_client.helm import Helm + + +class FaultInjectorInterface(abc.ABC): + """Interface for fault injectors""" + + def __init__(self) -> None: + pass + + +class ChaosMeshFaultInjector(FaultInjectorInterface): + """Fault injector implemented using Chaos Mesh""" + + def __init__(self) -> None: + pass + + def install(self, kube_config: str, kube_context: str): + helm_client = Helm(kube_config, kube_context) + p = helm_client.install( + release_name="chaos-mesh", + chart="chaos-mesh", + namespace="chaos-mesh", + repo="https://charts.chaos-mesh.org", + namespace_existed=True, + args=[ + "--set", + "chaosDaemon.runtime=containerd", + "--set", + "chaosDaemon.socketPath=/run/containerd/containerd.sock", + "--version", + "2.7.0", + ], + ) + + if p.returncode != 0: + raise RuntimeError("Failed to install chaos-mesh", p.stderr)