diff --git a/performance_measurement/fluent_inputs.py b/performance_measurement/fluent_inputs.py index e2874ff25..2d11e2602 100644 --- a/performance_measurement/fluent_inputs.py +++ b/performance_measurement/fluent_inputs.py @@ -20,8 +20,9 @@ def serialize(self, output_dir: str): os.makedirs(reference_input_dir, exist_ok=True) print(f"Serializing to {output_dir}") for input in self.all_inputs: - if "claims" in input["input"]["spec"]: - del input["input"]["spec"]["claims"] + if "resources" in input["input"]["spec"] and "claims" in input["input"]["spec"][ + "resources"]: + del input["input"]["spec"]["resources"]["claims"] patch = jsonpatch.JsonPatch.from_diff(previous_input, input["input"]) if patch: print(patch) diff --git a/performance_measurement/measure_performance.py b/performance_measurement/measure_performance.py index 507d1f11e..20ae91c72 100644 --- a/performance_measurement/measure_performance.py +++ b/performance_measurement/measure_performance.py @@ -12,6 +12,7 @@ import yaml from fluent_inputs import FluentInputGenerator from measure_runner import MeasurementRunner +from performance_measurement.metrics_api_watcher import MetricsApiWatcher from rabbitmq_inputs import RabbitMQInputGenerator from stats_watch import WatchStats from zk_inputs import ZooKeeperInputGenerator @@ -72,14 +73,21 @@ def test_normal( # operation sequence trial_dir = f"{workdir}/trial-normal" os.makedirs(trial_dir, exist_ok=True) + + runner = MeasurementRunner( + namespace, crd, trial_dir, kubeconfig, context_name) + + # start the stats watcher control_plane_stats_dir = os.path.join(trial_dir, "control_plane_stats") os.makedirs(control_plane_stats_dir, exist_ok=True) watcher = WatchStats(cluster, "anvil", control_plane_stats_dir) - watcher_thread = threading.Thread(target=watcher.start) + pods_watcher = MetricsApiWatcher(control_plane_stats_dir) + watcher_thread = threading.Thread(target=watcher.start, args=(cluster,)) + pods_watcher_thread = threading.Thread(target=pods_watcher.start, args=(runner.apiclient,deploy.operator_name)) watcher_thread.start() + pods_watcher_thread.start() - runner = MeasurementRunner( - namespace, crd, trial_dir, kubeconfig, context_name) + # run the workload gen = 0 for workload in workloads: measurement_result = runner.run(workload, sts_name_f, ds_name_f, gen) @@ -88,8 +96,12 @@ def test_normal( with open(measurement_result_file, "w") as f: json.dump(dataclasses.asdict(measurement_result), f) gen += 1 + + # stop the stats watchers watcher.stop() + pods_watcher.stop() watcher_thread.join() + pods_watcher_thread.join() if "single-operation" in modes: # single operation @@ -100,6 +112,10 @@ def test_normal( os.makedirs(single_operation_trial_dir, exist_ok=True) control_plane_stats_dir = os.path.join(single_operation_trial_dir, "control_plane_stats") os.makedirs(control_plane_stats_dir, exist_ok=True) + + watcher = WatchStats(cluster, "anvil", control_plane_stats_dir) + pods_watcher = MetricsApiWatcher(control_plane_stats_dir) + gen = 0 for workload in workloads: cluster.restart_cluster(name="anvil", kubeconfig=kubeconfig) @@ -110,17 +126,23 @@ def test_normal( if not deployed: logging.info('Not deployed. Try again!') - watcher = WatchStats(cluster, "anvil", control_plane_stats_dir) - watcher_thread = threading.Thread(target=watcher.start) + watcher_thread = threading.Thread(target=watcher.start, args=(cluster,)) + pods_watcher_thread = threading.Thread(target=pods_watcher.start, args=(runner.apiclient,deploy.operator_name)) watcher_thread.start() + pods_watcher_thread.start() + measurement_result = runner.run(workload, sts_name_f, ds_name_f, gen) if measurement_result is not None: measurement_result_file = f"{single_operation_trial_dir}/measurement_result_{gen:03d}.json" with open(measurement_result_file, "w") as f: json.dump(dataclasses.asdict(measurement_result), f) gen += 1 + + # stop the stats watchers watcher.stop() + pods_watcher.stop() watcher_thread.join() + pods_watcher_thread.join() def generate_inputs(testrun_dir: str, input_generator: ChainInputs, config: OperatorConfig): diff --git a/performance_measurement/metrics_api_watcher.py b/performance_measurement/metrics_api_watcher.py new file mode 100644 index 000000000..2c2d65e5d --- /dev/null +++ b/performance_measurement/metrics_api_watcher.py @@ -0,0 +1,62 @@ +import json +import logging +import os +from typing import List + +import kubernetes +from kubernetes.client import ApiClient + + +class MetricsApiWatcher(): + + def __init__(self, output_dir: str) -> None: + """Initialize the WatchPodStats class + + Args: + apiclient: kubernetes client + operator_name: name of the operator + output_dir: output directory + """ + self._output_dir = output_dir + self._sequence = 0 + self._stop = False + + def write_stats(self, stats_buf: List[dict], sequence: int): + with open(os.path.join(self._output_dir, f'pods_stats.{sequence:08d}.json'), 'w') as f: + json.dump(stats_buf, f, indent=4) + + def start(self, apiclient: ApiClient, operator_name: str, ): + stats_buf: List[dict] = [] + custom_api = kubernetes.client.CustomObjectsApi(apiclient) + watch = kubernetes.watch.Watch() + + stream = watch.stream(func=custom_api.list_cluster_custom_object, + group="metrics.k8s.io", + version="v1beta1", + plural="pods",) + for stats in stream: + if self._stop: + break + pod_metrics = {} + for pod in stats['items']: + pod_name = pod['metadata']['name'] + if pod_name == "etcd-kind-control-plane": + pod_metrics = pod + pod_metrics['timestamp'] = pod['timestamp'] + elif pod_name == operator_name: + pod_metrics = pod + pod_metrics['timestamp'] = pod['timestamp'] + + stats_buf.append(pod_metrics) + if len(stats_buf) >= 1000: + self.write_stats(stats_buf, self._sequence) + stats_buf = [] + self._sequence += 1 + if len(stats_buf) > 0: + logging.info(f"Stopped, Writing {len(stats_buf)} stats to file") + self.write_stats(stats_buf, self._sequence) + self._sequence += 1 + return + + def stop(self): + self._stop = True diff --git a/performance_measurement/rabbitmq_inputs.py b/performance_measurement/rabbitmq_inputs.py index 0b6412b2d..3fe6de58e 100644 --- a/performance_measurement/rabbitmq_inputs.py +++ b/performance_measurement/rabbitmq_inputs.py @@ -25,8 +25,9 @@ def serialize(self, output_dir: str): print(f"{input['trial']}") input["input"]["spec"]["image"] = "ghcr.io/xlab-uiuc/rabbitmq:3.11.10-management" input["input"]["spec"]["replicas"] = 3 - if "claims" in input["input"]["spec"]: - del input["input"]["spec"]["claims"] + if "resources" in input["input"]["spec"] and "claims" in input["input"]["spec"][ + "resources"]: + del input["input"]["spec"]["resources"]["claims"] if "resources" not in input["input"]["spec"] or input["input"]["spec"]["resources"] is None: input["input"]["spec"]["resources"] = { "limits": { diff --git a/performance_measurement/stats_watch.py b/performance_measurement/stats_watch.py index 2c99f13a0..186ec495a 100644 --- a/performance_measurement/stats_watch.py +++ b/performance_measurement/stats_watch.py @@ -1,4 +1,3 @@ -import asyncio import json import logging import os @@ -7,31 +6,31 @@ class WatchStats(): - def __init__(self, cluster: KubernetesEngine, cluster_name: str, output_dir: str) -> None: - self._cluster = cluster + def __init__(self, cluster_name: str, output_dir: str) -> None: self._cluster_name = cluster_name self._output_dir = output_dir + self._sequence = 0 self._stop = False def write_stats(self, stats_buf: List[dict], sequence: int): with open(os.path.join(self._output_dir, f'{self._cluster_name}_stats.{sequence:08d}.json'), 'w') as f: json.dump(stats_buf, f, indent=4) - def start(self): + def start(self, cluster: KubernetesEngine): stats_buf = [] - sequence = 0 - stats_stream = self._cluster.stream_control_plane_stats("anvil") + stats_stream = cluster.stream_control_plane_stats("anvil") for stats in stats_stream: if self._stop: break stats_buf.append(stats) if len(stats_buf) >= 1000: - self.write_stats(stats_buf, sequence) + self.write_stats(stats_buf, self._sequence) stats_buf = [] - sequence += 1 + self._sequence += 1 if len(stats_buf) > 0: logging.info(f"Stopped, Writing {len(stats_buf)} stats to file") - self.write_stats(stats_buf, sequence) + self.write_stats(stats_buf, self._sequence) + self._sequence += 1 return def stop(self):