Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <[email protected]>
  • Loading branch information
tylergu committed Nov 19, 2023
1 parent 99beed1 commit 883b65d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 1 deletion.
15 changes: 15 additions & 0 deletions performance_measurement/measure_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import os
import sys
from datetime import datetime
import threading
from typing import Callable

import yaml
from measure_runner import ConditionFuncType, MeasurementRunner
from fluent_inputs import FluentInputGenerator
from stats_watch import WatchStats
from rabbitmq_inputs import RabbitMQInputGenerator
from zk_inputs import ZooKeeperInputGenerator

Expand Down Expand Up @@ -70,6 +72,11 @@ def test_normal(
# operation sequence
trial_dir = f"{workdir}/trial-normal"
os.makedirs(trial_dir, exist_ok=True)
control_plane_stats_dir = os.path.join(trial_dir, "control_plane_stats")
os.makedirs(control_plane_stats_dir, exist_ok=True)
watcher = WatchStats(cluster, "anvil", control_plane_stats_dir)
watcher_thread = threading.Thread(target=watcher.start)

runner = MeasurementRunner(
namespace, crd, trial_dir, kubeconfig, context_name)
gen = 0
Expand All @@ -80,6 +87,8 @@ def test_normal(
with open(measurement_result_file, "w") as f:
json.dump(dataclasses.asdict(measurement_result), f)
gen += 1
watcher.stop()
watcher_thread.join()

if "single-operation" in modes:
# single operation
Expand All @@ -88,6 +97,8 @@ def test_normal(

single_operation_trial_dir = f"{workdir}/trial-single-operation"
os.makedirs(single_operation_trial_dir, exist_ok=True)
control_plane_stats_dir = os.path.join(trial_dir, "control_plane_stats")
os.makedirs(control_plane_stats_dir, exist_ok=True)
gen = 0
for workload in workloads:
cluster.restart_cluster(name="anvil", kubeconfig=kubeconfig)
Expand All @@ -98,12 +109,16 @@ def test_normal(
if not deployed:
logging.info('Not deployed. Try again!')

watcher = WatchStats(cluster, "anvil", control_plane_stats_dir)
watcher_thread = threading.Thread(target=watcher.start)
measurement_result = runner.run(workload, sts_name_f, ds_name_f, gen)
if measurement_result is not None:
measurement_result_file = f"{single_operation_trial_dir}/measurement_result_{gen:03d}.json"
with open(measurement_result_file, "w") as f:
json.dump(dataclasses.asdict(measurement_result), f)
gen += 1
watcher.stop()
watcher_thread.join()


def generate_inputs(testrun_dir: str, input_generator: ChainInputs, config: OperatorConfig):
Expand Down
3 changes: 2 additions & 1 deletion performance_measurement/measure_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ def run(self,
condition_1 = None
condition_2 = None
if sts_name_f:
last_revision = event_list[-1][0]["object"].status.update_revision
last_spec = event_list[-1][0]['object'].to_dict()["spec"]
last_spec_hash = hashlib.sha256(json.dumps(
last_spec, sort_keys=True).encode()).hexdigest()
Expand All @@ -253,7 +254,7 @@ def run(self,

if condition_2 is None:
if (event["object"].status.ready_replicas == input["spec"]["replicas"]
and current_spec_hash == last_spec_hash):
and event["object"].status.current_revision == last_revision):
condition_2 = timestamp
break
else:
Expand Down
38 changes: 38 additions & 0 deletions performance_measurement/stats_watch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
import json
import logging
import os
from typing import List
from acto.kubernetes_engine.base import KubernetesEngine

class WatchStats():

def __init__(self, cluster: KubernetesEngine, cluster_name: str, output_dir: str) -> None:
self._cluster = cluster
self._cluster_name = cluster_name
self._output_dir = output_dir
self._stop = False

def write_stats(self, stats_buf: List[dict], sequence: int):
with open(os.path.join(self._output_dir, f'{self._cluster_name}_stats.{sequence:08d}.json'), 'w') as f:
json.dump(stats_buf, f, indent=4)

def start(self):
stats_buf = []
sequence = 0
stats_stream = self._cluster.stream_control_plane_stats("anvil")
for stats in stats_stream:
if self._stop:
break
stats_buf.append(stats)
if len(stats_buf) >= 1000:
self.write_stats(stats_buf, sequence)
stats_buf = []
sequence += 1
if len(stats_buf) > 0:
logging.info(f"Stopped, Writing {len(stats_buf)} stats to file")
self.write_stats(stats_buf, sequence)
return

def stop(self):
self._stop = True

0 comments on commit 883b65d

Please sign in to comment.