Skip to content

Commit

Permalink
Add post processors for simple crash test and chaining inputs
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <[email protected]>
  • Loading branch information
tylergu committed Dec 13, 2023
1 parent 898e44f commit 48eaa35
Show file tree
Hide file tree
Showing 2 changed files with 390 additions and 0 deletions.
65 changes: 65 additions & 0 deletions acto/post_process/post_chain_inputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import os

import jsonpatch
import yaml

from acto.lib.operator_config import OperatorConfig
from acto.post_process.post_process import PostProcessor


class ChainInputs(PostProcessor):
"""Post processor for extracting inputs from a test run"""

def __init__(
self,
testrun_dir: str,
config: OperatorConfig,
ignore_invalid: bool = False,
acto_namespace: int = 0):
self.acto_namespace = acto_namespace
super().__init__(testrun_dir, config)

self.all_inputs = []
for trial in sorted(self.trial_to_steps.keys()):
steps = self.trial_to_steps[trial]
for i in sorted(steps.keys()):
step = steps[i]
invalid, _ = step.runtime_result.is_invalid()
if invalid and not ignore_invalid:
continue
if not step.runtime_result.is_pass():
continue
self.all_inputs.append({
'trial': trial,
'gen': step.gen,
'input': step.input,
'input_digest': step.input_digest,
'operator_log': step.operator_log,
'system_state': step.system_state,
'cli_output': step.cli_output,
'runtime_result': step.runtime_result
})

def serialize(self, output_dir: str):
previous_input = {}
index = 0
for input in self.all_inputs:
print(f"{input['trial']}")
patch = jsonpatch.JsonPatch.from_diff(
previous_input, input["input"])
if patch:
skip_input = False
for ops in patch:
if "/spec/conf" in ops["path"]:
print(ops)
skip_input = True
break

if skip_input:
continue
with open(os.path.join(output_dir, f'input-{index}.yaml'), 'w') as f:
yaml.dump(input["input"], f)
with open(os.path.join(output_dir, f'input-{index}.patch'), 'w') as f:
f.write(str(patch))
previous_input = input["input"]
index += 1
325 changes: 325 additions & 0 deletions acto/post_process/simple_crash_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
import glob
import json
import logging
import math
import multiprocessing
import os
import queue
import re
import subprocess
import sys
import threading
import time
from functools import partial
from typing import Dict, List

import kubernetes
import kubernetes.client.models as k8s_models

from acto.common import kubernetes_client
from acto.deploy import Deploy
from acto.kubectl_client.kubectl import KubectlClient
from acto.kubernetes_engine import kind
from acto.lib.operator_config import OperatorConfig
from acto.post_process.post_diff_test import DeployRunner, DiffTestResult, PostDiffTest
from acto.post_process.post_process import Step
from acto.runner.runner import Runner
from acto.serialization import ActoEncoder
from acto.utils.error_handler import handle_excepthook, thread_excepthook


def get_crash_config_map(
apiclient: kubernetes.client.ApiClient,
trial_dir: str,
generation: int) -> dict:
logging.info(
f"Getting the configmap for the crash test along with system states")
core_v1_api = kubernetes.client.CoreV1Api(apiclient)
config_map = core_v1_api.read_namespaced_config_map(
name="fault-injection-config",
namespace="default",
)
with open(os.path.join(trial_dir, f"crash-config-{generation}.json"), "w") as f:
json.dump(config_map.to_dict(), f, cls=ActoEncoder, indent=6)
return config_map.to_dict()


def create_crash_config_map(
apiclient: kubernetes.client.ApiClient,
cr_kind: str,
namespace: str,
cr_name: str):
core_v1_api = kubernetes.client.CoreV1Api(apiclient)
config_map = k8s_models.V1ConfigMap(
api_version="v1",
kind="ConfigMap",
metadata=k8s_models.V1ObjectMeta(
name="fault-injection-config",
),
data={
"cr_key": f"{cr_kind}/{namespace}/{cr_name}",
"current": "0",
"expected": "1",
},
)
core_v1_api.create_namespaced_config_map(
namespace="default",
body=config_map,
)


def replace_crash_config_map(
apiclient: kubernetes.client.ApiClient,
cr_kind: str,
namespace: str,
cr_name: str,
operator_log: str):

# Counting how many requests in total in the step
count = 0
for line in operator_log:
if re.match(r"^Reconciling.*(Create|Update).*", line):
count += 1

target_count = math.floor(count * 0.7)
logging.info(
f"Setting the target count to {target_count} out of {count} requests")

core_v1_api = kubernetes.client.CoreV1Api(apiclient)
config_map = k8s_models.V1ConfigMap(
api_version="v1",
kind="ConfigMap",
metadata=k8s_models.V1ObjectMeta(
name="fault-injection-config",
),
data={
"cr_key": f"{cr_kind}/{namespace}/{cr_name}",
"current": "0",
"expected": str(target_count),
},
)
core_v1_api.replace_namespaced_config_map(
name="fault-injection-config",
namespace="default",
body=config_map,
)


class CrashTrialRunner(DeployRunner):

def __init__(
self,
workqueue: multiprocessing.Queue,
context: dict,
deploy: Deploy,
workdir: str,
cluster: kind.Kind,
worker_id: int,
acto_namespace: int):
super().__init__(workqueue, context, deploy,
workdir, cluster, worker_id, acto_namespace)

# Prepare the hook to create the configmap for the fault injection
cr_kind = self._context["crd"]["body"]["spec"]["names"]["kind"]
namespace = self._context["namespace"]
cr_name = "test-cluster"
self._hook = partial(replace_crash_config_map, cr_kind=cr_kind,
namespace=namespace, cr_name=cr_name)

def run(self):
while True:
try:
trial, steps = self._workqueue.get(block=False)
except queue.Empty:
break

trial_dir = os.path.join(self._workdir, trial)
os.makedirs(trial_dir, exist_ok=True)
before_k8s_bootstrap_time = time.time()
self._cluster.restart_cluster(self._cluster_name, self._kubeconfig)
self._cluster.load_images(self._images_archive, self._cluster_name)
apiclient = kubernetes_client(self._kubeconfig, self._context_name)
kubectl_client = KubectlClient(
self._kubeconfig, self._context_name)
after_k8s_bootstrap_time = time.time()
deployed = self._deploy.deploy_with_retry(
self._kubeconfig,
self._context_name,
kubectl_client=kubectl_client,
namespace=self._context["namespace"])
after_operator_deploy_time = time.time()

runner = Runner(
self._context, trial_dir, self._kubeconfig, self._context_name,
custom_system_state_f=get_crash_config_map)

steps: Dict[str, Step]
for key in sorted(steps, key=lambda x: int(x)):
step = steps[key]
logging.info(f"Running trial {trial} gen {step.gen}")
hook = partial(self._hook, operator_log=step.operator_log)
snapshot, err = runner.run(step.input, step.gen, [hook])
after_run_time = time.time()
difftest_result = DiffTestResult(
input_digest=step.input_digest, snapshot=snapshot.to_dict(),
originals=[{"trial": trial, "gen": step.gen}],
time={"k8s_bootstrap": after_k8s_bootstrap_time -
before_k8s_bootstrap_time,
"operator_deploy": after_operator_deploy_time -
after_k8s_bootstrap_time, "run": after_run_time -
after_operator_deploy_time, },)
difftest_result_path = os.path.join(
trial_dir, "difftest-%03d.json" % step.gen)
difftest_result.to_file(difftest_result_path)


class SimpleCrashTest(PostDiffTest):
"""Crash injection test for the operator using the existing testrun
This currently is still a prototype, where it depends on the operators' cooperation
to inject the crash. The operator needs to implement a configmap that can be used
to inject the crash. The configmap should have the following format:
{
"cr_key": "<cr_kind>/<cr_namespace>/<cr_name>",
"current": "<current_count>",
"expected": "<expected_count>",
}
The operator should also implement a hook to update the configmap when the
operator is running. The hook should be called after the operator has processed
a certain number of requests. The hook should update the "current" field of the
configmap to the number of requests that the operator has processed. The hook
should also update the "expected" field of the configmap to the number of requests
that the operator is expected to process.
"""

def __init__(
self,
testrun_dir: str,
config: OperatorConfig,
ignore_invalid: bool = False,
acto_namespace: int = 0):
super().__init__(testrun_dir, config, ignore_invalid, acto_namespace)

compare_results_files = glob.glob(os.path.join(
testrun_dir, "post_diff_test", "compare-results-*.json"))
for compare_results_file in compare_results_files:
digest = re.search(r"compare-results-(\w+).json",
compare_results_file).group(1)
del self.unique_inputs[digest]

logging.info(
f"Running Unique inputs excluding errorneous ones: {len(self.unique_inputs)}")

def post_process(self, workdir: str, num_workers: int = 1):
if not os.path.exists(workdir):
os.mkdir(workdir)

# Prepare the hook to create the configmap for the fault injection
cr_kind = self._context["crd"]["body"]["spec"]["names"]["kind"]
namespace = self._context["namespace"]
cr_name = "test-cluster"
posthook = partial(create_crash_config_map, cr_kind=cr_kind,
namespace=namespace, cr_name=cr_name)

cluster = kind.Kind(
acto_namespace=self.acto_namespace,
posthooks=[posthook],
feature_gates=self.config.kubernetes_engine.feature_gates)
cluster.configure_cluster(
self.config.num_nodes, self.config.kubernetes_version)
deploy = Deploy(self.config.deploy)

# Build an archive to be preloaded
images_archive = os.path.join(workdir, "images.tar")
if len(self.context["preload_images"]) > 0:
# first make sure images are present locally
for image in self.context["preload_images"]:
subprocess.run(["docker", "pull", image])
subprocess.run(["docker", "image", "save", "-o", images_archive] +
list(self.context["preload_images"]))

################## Operation sequence crash test ######################
num_ops = 0
workqueue = multiprocessing.Queue()
for trial, steps in self._trial_to_steps.items():
new_steps = {}
for step_key in list(steps.keys()):
if not steps[step_key].runtime_result.is_error():
new_steps[step_key] = steps[step_key]
num_ops += 1
workqueue.put((trial, new_steps))
logging.info(f"Running {num_ops} trials")

runners: List[CrashTrialRunner] = []
for i in range(num_workers):
runner = CrashTrialRunner(workqueue, self.context, deploy,
workdir, cluster, i, self.acto_namespace)
runners.append(runner)

processes = []
for runner in runners:
p = multiprocessing.Process(target=runner.run)
p.start()
processes.append(p)

for p in processes:
p.join()

################### Single operation crash test #######################
workqueue = multiprocessing.Queue()
for unique_input_group in self.unique_inputs.values():
workqueue.put(unique_input_group)

runners: List[DeployRunner] = []
for i in range(num_workers):
runner = DeployRunner(workqueue, self.context, deploy,
workdir, cluster, i, self.acto_namespace)
runners.append(runner)

processes = []
for runner in runners:
p = multiprocessing.Process(target=runner.run)
p.start()
processes.append(p)

for p in processes:
p.join()


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--config", type=str, required=True)
parser.add_argument("--testrun-dir", type=str, required=True)
parser.add_argument("--workdir-path", type=str, required=True)
parser.add_argument("--num-workers", type=int, default=1)
parser.add_argument("--checkonly", action="store_true")
args = parser.parse_args()

# Register custom exception hook
sys.excepthook = handle_excepthook
threading.excepthook = thread_excepthook
global notify_crash_
notify_crash_ = True

log_filename = "check.log" if args.checkonly else "test.log"
os.makedirs(args.workdir_path, exist_ok=True)
# Setting up log infra
logging.basicConfig(
filename=os.path.join(args.workdir_path, log_filename),
level=logging.DEBUG, filemode="w",
format="%(asctime)s %(levelname)-7s, %(name)s, %(filename)-9s:%(lineno)d, %(message)s")
logging.getLogger("kubernetes").setLevel(logging.ERROR)
logging.getLogger("sh").setLevel(logging.ERROR)

start = time.time()

with open(args.config, "r") as config_file:
config = OperatorConfig(**json.load(config_file))
p = SimpleCrashTest(testrun_dir=args.testrun_dir, config=config)
if not args.checkonly:
p.post_process(args.workdir_path, num_workers=args.num_workers)
p.check(args.workdir_path, num_workers=args.num_workers)

logging.info(f"Total time: {time.time() - start} seconds")

1 comment on commit 48eaa35

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
acto
   __main__.py88880%1–175
   common.py3549972%97–98, 102–116, 119–126, 129, 132, 136, 138, 140, 142, 144, 148, 153–162, 203, 236, 249, 291, 315–316, 325, 327, 330, 334–343, 363–366, 415, 418, 432, 460–461, 498–503, 505–507, 514–517, 521, 537–538, 544–555, 614–623, 634–639
   deploy.py887119%15–41, 47–55, 59, 66–116, 124–130, 133–138
   engine.py5374919%40–57, 64–80, 94–197, 209–235, 238–296, 312–435, 439–540, 550–575, 579–587, 590–603, 628–746, 749–838, 842–933
   exception.py220%1–2
   oracle_handle.py241346%15–18, 26–27, 39–46, 54
   reproduce.py1228134%30–40, 48–52, 58–59, 72–93, 96, 99, 102, 106–108, 111, 114, 122–123, 126, 130, 134, 138, 142–176, 179–189, 192–225
   serialization.py301647%19–27, 33–39
   snapshot.py622658%32, 48, 58–79, 82–86, 89
acto/checker
   checker.py13285%12, 19
   checker_set.py571279%41–42, 67–78
   test_checker.py1621526%13–246
acto/checker/impl
   crash.py30293%14, 16
   health.py52590%37, 58–61, 76, 92
   kubectl_cli.py29486%32–33, 44–46
   operator_log.py24292%24, 32
   state.py2255177%55, 63–64, 69–93, 104–105, 133, 140, 147–149, 166, 265, 288, 317, 319–323, 329–336
   state_compare.py76396%62, 73, 80
   state_condition.py391367%18, 24, 29–37, 47, 53
acto/input
   get_matched_schemas.py542259%12, 47–51, 55–74
   input.py60948920%30–37, 74, 92, 112, 123–126, 129, 159–171, 174–185, 189, 193, 200, 206, 210–389, 410–424, 432–437, 441–454, 463, 479, 482–489, 501, 503, 505–507, 510–523, 528, 536–547, 550–556, 564–591, 595–899, 922–935, 939–986
   testcase.py552064%40–50, 53, 56, 59, 62–66, 95–96, 100, 103, 109, 116, 119
   testplan.py18313725%14–24, 27–29, 32, 35–42, 45, 48–67, 70–74, 78, 82, 85–91, 99–107, 110–121, 124, 127, 130, 133, 136–138, 141–151, 154–164, 167, 174–185, 188–194, 200, 203–219, 222, 225, 231, 239–244, 247, 250, 253, 259–260, 263–269, 272, 275, 278
   value_with_schema.py33721835%16, 20, 24, 28, 32, 36, 40, 54, 58, 61–67, 70–76, 86–114, 117–124, 128–137, 141–149, 152–156, 159, 162, 166, 180, 183, 186–192, 195–201, 211–230, 233–240, 243, 247–256, 260–272, 275–279, 282, 285, 289, 301, 307, 312, 314, 316, 320, 324, 329–333, 337–340, 349–359, 362–369, 373–376, 380–385, 388–391, 405, 408–412, 416, 421–428, 431–434, 437–439, 442–445, 448–451, 462, 465, 468, 485–498
   valuegenerator.py62038937%20, 24, 28, 32, 43, 47, 51, 55, 76–86, 90–104, 107, 110, 114, 117, 121–130, 133–139, 142, 145, 148, 151, 154, 157–167, 194–199, 202–213, 216, 219, 223, 226–231, 234–237, 240, 243–248, 251–254, 257, 260, 263, 266, 269, 273–282, 285, 288, 291, 294–304, 331–343, 346–347, 350, 353, 357, 360–365, 368–371, 374, 377–382, 385–388, 391, 394, 397, 400, 403, 406, 409–419, 450–482, 485–496, 499–502, 505–508, 512–520, 523, 526, 529, 532, 535, 538–548, 573–589, 592–609, 612, 615, 619–621, 624–628, 631–632, 635–644, 647–653, 656–657, 660–670, 673, 676, 679, 682, 685, 688–698, 710–711, 714–724, 727–730, 733–736, 740, 748, 751–752, 755–765, 768–771, 774–777, 781, 794–797, 800–814, 817, 820, 824, 827–832, 835, 838, 841–846, 849, 852, 855, 858, 861–871, 881, 884, 887, 890, 894, 913, 919, 931, 933, 936–940, 943–946, 951, 955, 957, 961–962
acto/input/known_schemas
   base.py531375%17–18, 28, 37, 46–47, 56–57, 66, 75, 84–85, 93
   cronjob_schemas.py763357%13, 16–19, 22, 25, 36–39, 42–47, 50, 53, 59, 62–65, 68, 71, 82, 85–90, 93, 96, 113, 117–119, 131, 137, 140
   deployment_schemas.py592558%16, 22–27, 30–32, 35, 38, 54–57, 65–67, 70, 78–81, 91, 94
   known_schema.py753948%28, 31–34, 37, 43, 46–48, 51, 54, 81–84, 102–113, 117–135
   pod_disruption_budget_schemas.py562261%14–17, 21, 25–27, 30, 41–44, 48, 54, 57, 68–71, 81, 84
   pod_schemas.py79727166%16–19, 23, 28, 32, 40–43, 47, 51–53, 61–64, 68, 73, 83, 92, 151, 156, 160, 167, 171, 178, 182, 238, 242, 247, 251, 294, 298, 303, 307, 335, 338, 341, 344, 347, 350, 353, 356, 359, 362, 365, 368, 393, 397, 400–405, 408, 414, 417, 420, 428, 431, 434, 437, 445, 453, 481, 488–494, 503, 507, 539, 543, 572, 575, 578, 581, 584, 587, 590, 619, 623, 626–631, 634, 642, 645, 648, 664–667, 670–672, 675, 684, 690, 693–696, 699, 702, 713–714, 717–719, 722, 725, 736, 739, 742, 745–748, 752, 756–758, 761–765, 768, 774, 777, 780, 783, 786, 789, 792, 795, 798, 801, 804, 807, 810, 813, 816, 840, 845, 849–857, 860, 866, 869, 872, 875, 878, 881, 884, 887, 890, 893, 896, 899, 902, 905, 908, 929, 933–935, 938–943, 946, 959, 964, 974, 980, 986, 989, 992, 1032, 1036–1038, 1041, 1054, 1059, 1065, 1068–1071, 1074, 1077, 1088–1091, 1094–1096, 1102, 1108, 1111–1114, 1117, 1120, 1133–1136, 1139–1141, 1147, 1153, 1156–1159, 1162, 1165, 1176–1179, 1182–1184, 1190, 1196, 1199–1202, 1205, 1212, 1215–1217, 1223, 1238, 1243, 1247, 1274, 1278, 1291, 1296, 1302, 1305, 1308, 1314, 1317, 1320–1322, 1325, 1342, 1345, 1348, 1374, 1378–1381, 1384, 1402, 1408, 1411, 1414, 1421, 1427, 1469, 1473
   resource_schemas.py1495066%15–19, 22, 25, 28–32, 35, 38, 45, 49, 54, 57–59, 62, 76, 78, 82, 96, 102–105, 108, 111, 121, 134, 147, 154, 159–162, 165, 173–176, 179, 187, 194, 199, 213, 231, 236
   service_schemas.py1786762%13, 16, 19, 25–30, 33, 36, 42, 45–48, 51, 54, 64–67, 70–73, 79, 85, 88–91, 94, 101–102, 105–108, 111, 114, 127, 142, 180, 184, 208, 214, 217, 220, 228–231, 235, 240, 244–246, 249, 257–258, 261, 264, 277–280, 284, 288–290, 293
   statefulset_schemas.py1866167%15–18, 21, 31–36, 42, 49–53, 56–58, 61–63, 66–70, 73–75, 78–80, 83, 90–93, 99–102, 105, 132, 149, 154, 158, 164, 167–170, 173, 176, 190–193, 196–201, 207, 225, 230, 234, 262, 266, 290
   storage_schemas.py1797956%13, 16, 25–30, 33, 36, 42, 48–53, 59, 67–70, 74, 79–80, 83, 89, 92–95, 98, 104–105, 108–111, 114–116, 122, 130–131, 135, 140, 145–148, 154–155, 158, 164, 181–184, 193, 197, 203–205, 211, 214, 228–231, 244, 250–252, 255, 258, 266–269, 273, 277–279, 282
acto/kubectl_client
   kubectl.py231822%8–14, 23–29, 37–44
acto/kubernetes_engine
   base.py532258%30, 34, 38, 42, 46, 49–67, 79
   k3d.py856820%17, 21–39, 45, 54–78, 81–97, 100–110, 117–133, 137–139
   kind.py993268%51–53, 57–58, 84, 90, 99–102, 108–112, 115–116, 119–131, 139, 144, 147, 158
acto/monkey_patch
   monkey_patch.py792470%9, 18, 36, 39, 41, 45–56, 76, 90–95, 106
acto/parse_log
   parse_log.py781976%71–74, 84–86, 89–91, 96–98, 116–124
acto/post_process
   post_chain_inputs.py41410%1–65
   post_diff_test.py40723542%64–65, 68, 78, 93–94, 133–134, 205, 215, 219, 224–233, 268, 270, 272–276, 281–298, 306–314, 317–342, 349–358, 361–423, 437–441, 461, 469–501, 504–521, 525–574, 586, 590, 596–597, 599–620, 630–666
   post_process.py1042279%51, 55, 67, 77–88, 100, 103, 138–142, 146, 150, 158, 162
   simple_crash_test.py16212622%35–44, 52–65, 79–101, 119–126, 131–173, 201–210, 214–286, 290–325
   test_post_process.py28196%53
acto/runner
   runner.py32229010%29–73, 76–78, 92–147, 150–159, 162–193, 200–228, 235–258, 261–267, 270–292, 296–301, 312–317, 331–336, 349–441, 446–450, 456–466, 477–507, 512–545
acto/schema
   anyof.py442055%26, 30–38, 41, 44, 47–49, 52, 55–60
   array.py702959%31, 43–52, 57, 59–62, 71–78, 81–83, 86–88, 91, 94, 97, 103
   base.py1025744%13–15, 18–20, 23, 26–37, 40, 43, 46–48, 51–61, 64–74, 77, 80–86, 95, 100, 105, 110, 114, 118, 142, 145–149
   boolean.py271063%16, 20, 23, 26, 29–32, 35, 38
   integer.py26965%17, 21–23, 26, 29, 32, 35, 38
   number.py301163%30–32, 35–37, 40, 43, 46, 49, 52
   object.py1174859%44, 46, 51, 66–75, 80, 83, 94–109, 112–120, 123–126, 129, 132, 148, 151–158, 168
   oneof.py443032%13–19, 22, 25–27, 30–38, 41, 44, 47–49, 52, 55–60
   opaque.py17665%13, 16, 19, 22, 25, 28
   schema.py42881%21, 25, 31–34, 49–50
   string.py27967%25, 29–31, 34, 37, 40, 43, 46
acto/utils
   __init__.py13192%11
   acto_timer.py312229%10–15, 19, 22–33, 38–40, 44–47
   error_handler.py433323%13–30, 38–52, 56–74
   k8s_helper.py645120%21–27, 39–45, 57–61, 73–79, 83–91, 95–102, 106–127
   preprocess.py695816%17–73, 93–141, 147–192
   process_with_except.py990%1–13
   thread_logger.py15380%9, 18, 28
TOTAL8345438048% 

Tests Skipped Failures Errors Time
139 0 💤 0 ❌ 0 🔥 1m 39s ⏱️

Please sign in to comment.