Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <[email protected]>
  • Loading branch information
tylergu committed Nov 14, 2023
1 parent 7fbcae6 commit d80784e
Show file tree
Hide file tree
Showing 8 changed files with 5,327 additions and 30 deletions.
27 changes: 27 additions & 0 deletions data/rabbitmq-operator/v2.5.0/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"deploy": {
"method": "YAML",
"file": "data/rabbitmq-operator/v2.5.0/operator.yaml",
"init": null
},
"crd_name": null,
"custom_fields": "data.rabbitmq-operator.prune",
"blackbox_custom_fields": "data.rabbitmq-operator.prune_blackbox",
"k8s_fields": "data.rabbitmq-operator.k8s_mapping",
"seed_custom_resource": "data/rabbitmq-operator/cr.yaml",
"example_dir": "data/rabbitmq-operator/examples",
"analysis": {
"github_link": "https://github.com/rabbitmq/cluster-operator.git",
"commit": "f2ab5cecca7fa4bbba62ba084bfa4ae1b25d15ff",
"entrypoint": null,
"type": "RabbitmqCluster",
"package": "github.com/rabbitmq/cluster-operator/api/v1beta1"
},
"diff_ignore_fields": [
"\\['metadata'\\]\\['annotations'\\]\\['rabbitmq.com",
"\\['secret'\\]\\['default\\-token\\-.*'\\]",
"\\['secret'\\]\\['rabbitmq\\-cluster\\-operator\\-token\\-.*'\\]",
"\\['secret'\\]\\['test\\-cluster\\-server\\-token\\-.*'\\]",
"\\['service'\\]\\['test\\-cluster'\\]\\['spec'\\]\\['ports'\\]"
]
}
5,144 changes: 5,144 additions & 0 deletions data/rabbitmq-operator/v2.5.0/operator.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion data/zookeeper-operator/v0.2.15/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6077,7 +6077,7 @@ spec:
containers:
- name: zookeeper-operator
# Replace this with the built image name
image: pravega/zookeeper-operator:0.2.15
image: ghcr.io/xlab-uiuc/zookeeper-operator:0.2.15
ports:
- containerPort: 60000
name: metrics
Expand Down
37 changes: 35 additions & 2 deletions performance_measurement/check_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List

import kubernetes
import kubernetes.client.models as k8s_models


Expand Down Expand Up @@ -100,12 +101,44 @@ def check_persistent_volume_claim_retention_policy(
# check if persistent volume claim retention policy match
current_policy = sts_object["spec"]["persistent_volume_claim_retention_policy"]
if desired_policy is not None:
if "whenDeleted" in desired_policy and desired_policy["whenDeleted"] and sts_object[
if "whenDeleted" in desired_policy and desired_policy["whenDeleted"] and current_policy[
"when_deleted"] != desired_policy["whenDeleted"]:
return False

if "whenScaled" in desired_policy and desired_policy["whenScaled"] and sts_object[
if "whenScaled" in desired_policy and desired_policy["whenScaled"] and current_policy[
"when_scaled"] != desired_policy["whenScaled"]:
return False

return True


def check_pods_ready(input: dict, apiclient: kubernetes.client.ApiClient, namespace: str):
coreV1Api = kubernetes.client.CoreV1Api(apiclient)
appV1Api = kubernetes.client.AppsV1Api(apiclient)
statefulsets = appV1Api.list_namespaced_stateful_set(namespace)

for statefulset in statefulsets.items:
sts_object = statefulset.to_dict()

if sts_object["status"]["current_revision"] != sts_object["status"]["update_revision"]:
return False

if sts_object["status"]["ready_replicas"] != input["spec"]["replicas"]:
return False

pods = coreV1Api.list_namespaced_pod(namespace)
for pod in pods.items:
pod_object = pod.to_dict()

if pod_object["status"]["phase"] != "Running":
return False

containers_ready = True
for container_status in pod_object["status"]["container_statuses"]:
if container_status["ready"] != True:
containers_ready = False
break
if not containers_ready:
return False

return True
31 changes: 18 additions & 13 deletions performance_measurement/measure_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import sys
from datetime import datetime
from typing import Callable

import yaml
from measure_runner import ConditionFuncType, MeasurementRunner
Expand All @@ -15,15 +16,16 @@
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), '..')))

from acto.utils.preprocess import process_crd
from acto.post_process.post_chain_inputs import ChainInputs
from acto.lib.operator_config import OperatorConfig
from acto.kubernetes_engine import kind
from acto.kubectl_client.kubectl import KubectlClient
from acto.deploy import Deploy
from acto.constant import CONST
from acto.common import kubernetes_client
from acto import utils
from acto.common import kubernetes_client
from acto.constant import CONST
from acto.deploy import Deploy
from acto.kubectl_client.kubectl import KubectlClient
from acto.kubernetes_engine import kind
from acto.lib.operator_config import OperatorConfig
from acto.post_process.post_chain_inputs import ChainInputs
from acto.utils.preprocess import process_crd


def load_inputs_from_dir(dir: str) -> list:
inputs = []
Expand All @@ -38,7 +40,7 @@ def load_inputs_from_dir(dir: str) -> list:

def test_normal(
workdir: str, input_dir: str, config: OperatorConfig, condition_1_func: ConditionFuncType,
condition_2_func: ConditionFuncType):
condition_2_func: ConditionFuncType, sts_name_f: Callable[[dict], str]):
# prepare workloads
workloads = load_inputs_from_dir(dir=input_dir)

Expand Down Expand Up @@ -70,7 +72,7 @@ def test_normal(
gen = 0
for workload in workloads:
measurement_result = runner.run(
workload, condition_1_func, condition_2_func, gen)
workload, condition_1_func, condition_2_func, sts_name_f, gen)
if measurement_result is not None:
measurement_result_file = f"{trial_dir}/measurement_result_{gen:03d}.json"
with open(measurement_result_file, "w") as f:
Expand All @@ -92,7 +94,7 @@ def test_normal(
logging.info('Not deployed. Try again!')

measurement_result = runner.run(
workload, condition_1_func, condition_2_func, gen)
workload, condition_1_func, condition_2_func, sts_name_f, gen)
if measurement_result is not None:
measurement_result_file = f"{single_operation_trial_dir}/measurement_result_{gen:03d}.json"
with open(measurement_result_file, "w") as f:
Expand All @@ -105,16 +107,19 @@ def main(args):
condition_1_func: ConditionFuncType = None
reference_condition_1_func: ConditionFuncType = None
condition_2_func: ConditionFuncType = None
sts_name_f = None
if args.project == "rabbitmq-operator":
input_generator = RabbitMQInputGenerator
condition_1_func = MeasurementRunner.wait_for_rabbitmq_spec
reference_condition_1_func = MeasurementRunner.wait_for_reference_rabbitmq_spec
condition_2_func = MeasurementRunner.wait_for_pod_ready
sts_name_f = MeasurementRunner.rabbitmq_sts_name
elif args.project == "zookeeper-operator":
input_generator = ZooKeeperInputGenerator
condition_1_func = MeasurementRunner.wait_for_zk_spec
reference_condition_1_func = MeasurementRunner.wait_for_reference_zk_spec
condition_2_func = MeasurementRunner.wait_for_pod_ready
sts_name_f = MeasurementRunner.zk_sts_name

# parse the inputs
with open(args.anvil_config, "r") as config_file:
Expand All @@ -134,7 +139,7 @@ def main(args):
test_normal(
anvil_workdir,
f"{args.workdir_path}/inputs/anvil_inputs", config, condition_1_func,
condition_2_func)
condition_2_func, sts_name_f)

# Run the reference performance test
# reference_config = "data/zookeeper-operator/v0.2.15/config.json"
Expand All @@ -147,7 +152,7 @@ def main(args):

reference_dir = f"{args.workdir_path}/reference"
test_normal(reference_dir, f"{args.workdir_path}/inputs/reference", config,
reference_condition_1_func, condition_2_func)
reference_condition_1_func, condition_2_func, sts_name_f)


if __name__ == "__main__":
Expand Down
111 changes: 98 additions & 13 deletions performance_measurement/measure_runner.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
import json
import logging
import os
import queue
import sys
import time
from dataclasses import dataclass
from multiprocessing import Process, Queue
from typing import Callable

import kubernetes
import kubernetes.client.models as k8s_models
import yaml
from check_utils import (check_affinity,
check_persistent_volume_claim_retention_policy,
check_resources, check_tolerations)
check_pods_ready, check_resources, check_tolerations)

sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), '..')))

from acto.utils.thread_logger import get_thread_logger
from acto.serialization import ActoEncoder
from acto.runner import Runner
from acto.kubectl_client.kubectl import KubectlClient
from acto.common import kubernetes_client
from acto.kubectl_client.kubectl import KubectlClient
from acto.runner import Runner
from acto.serialization import ActoEncoder
from acto.utils import acto_timer
from acto.utils.thread_logger import get_thread_logger

ConditionFuncType = Callable[[dict, kubernetes.client.ApiClient, str], bool]

Expand Down Expand Up @@ -183,6 +186,7 @@ def run(self,
input: dict,
condition_1: ConditionFuncType,
condition_2: ConditionFuncType,
sts_name_f: Callable[[dict], str],
generation: int) -> MeasurementResult:
'''
Run the input CRD and check if the condition is satisfied
Expand Down Expand Up @@ -212,22 +216,52 @@ def run(self,
logger.error('STDOUT: ' + cli_result.stdout)
logger.error('STDERR: ' + cli_result.stderr)
return None

start_time = time.time()
condition_1(input, self.apiclient, self.namespace)
timestamp_1 = time.time()
condition_2(input, self.apiclient, self.namespace)
timestamp_2 = time.time()
sts_event_list = MeasurementRunner.wait_for_converge(
input=input, apiclient=self.apiclient, namespace=self.namespace, sts_name=sts_name_f(input))

condition_1 = None
condition_2 = None
last_revision = sts_event_list[-1][0]["object"].status.update_revision
for sts_event, timestamp in sts_event_list:
logger.info(f"{sts_event['type']} {sts_event['object'].metadata.name} at {timestamp}")

if condition_1 is None:
if sts_event["object"].status.update_revision == last_revision:
condition_1 = timestamp
else:
continue

if condition_2 is None:
if (sts_event["object"].status.ready_replicas == input["spec"]["replicas"]
and sts_event["object"].status.current_revision == last_revision):
condition_2 = timestamp
break
else:
continue

duration_1 = timestamp_1 - start_time

# condition_1(input, self.apiclient, self.namespace)
# timestamp_1 = time.time()
# condition_2(input, self.apiclient, self.namespace)
# timestamp_2 = time.time()

duration_1 = condition_1 - start_time
logging.info('Condition 1 took %f seconds' % duration_1)

duration_2 = timestamp_2 - start_time
duration_2 = condition_2 - start_time
logging.info('Condition 2 took %f seconds' % duration_2)

with open("%s/sts-events-%03d.json" % (self.trial_dir, generation), 'w') as system_state_file:
json.dump([{"ts": ts, "statefulset": sts["object"].to_dict()} for sts, ts in sts_event_list],
system_state_file,
cls=ActoEncoder,
indent=4)

self.collect_system_state()

return MeasurementResult(start_time, timestamp_1, timestamp_2)
return MeasurementResult(start_time, condition_1, condition_2)

def wait_for_reference_zk_spec(
input: dict, apiclient: kubernetes.client.ApiClient, namespace: str) -> bool:
Expand Down Expand Up @@ -712,3 +746,54 @@ def wait_for_pod_ready(
break

return True

def wait_for_converge(input: dict, apiclient: kubernetes.client.ApiClient, namespace: str, sts_name: str) -> list:
appV1Api = kubernetes.client.AppsV1Api(apiclient)
watch = kubernetes.watch.Watch()

statefulset_updates = []
statefulset_updates_queue = Queue(maxsize=0)
statefulset_stream = watch.stream(func=appV1Api.list_namespaced_stateful_set,
namespace=namespace,
field_selector="metadata.name=%s" % sts_name)
timer_hard_timeout = acto_timer.ActoTimer(600, statefulset_updates_queue, "timeout")
watch_process = Process(target=MeasurementRunner.watch_system_events,
args=(statefulset_stream, statefulset_updates_queue))

timer_hard_timeout.start()
watch_process.start()

while True:
try:
statefulset_event = statefulset_updates_queue.get(timeout=120)
if isinstance(statefulset_event, str) and statefulset_event == "timeout":
break
else:
statefulset_updates.append(statefulset_event)
except queue.Empty:
if check_pods_ready(input, apiclient, namespace):
break
else:
continue

statefulset_stream.close()
timer_hard_timeout.cancel()
watch_process.terminate()

return statefulset_updates

def watch_system_events(event_stream, queue: Queue):
'''A process that watches namespaced events
'''
for object in event_stream:
try:
ts = time.time()
queue.put((object, ts))
except (ValueError, AssertionError):
pass

def zk_sts_name(input: dict):
return input["metadata"]["name"]

def rabbitmq_sts_name(input: dict):
return f"{input['metadata']['name']}-server"
4 changes: 3 additions & 1 deletion performance_measurement/rabbitmq_inputs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from acto.post_process.post_chain_inputs import ChainInputs
import os
import sys

Expand All @@ -8,6 +7,8 @@
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), '..')))

from acto.post_process.post_chain_inputs import ChainInputs


class RabbitMQInputGenerator(ChainInputs):

Expand All @@ -20,6 +21,7 @@ def serialize(self, output_dir: str):
os.makedirs(reference_input_dir, exist_ok=True)
for input in self.all_inputs:
print(f"{input['trial']}")
input["input"]["spec"]["image"] = "ghcr.io/xlab-uiuc/rabbitmq:3.11.10-management"
patch = jsonpatch.JsonPatch.from_diff(previous_input, input["input"])
if patch:
with open(os.path.join(anvil_input_dir, f'input-{index:03d}.yaml'), 'w') as f:
Expand Down
1 change: 1 addition & 0 deletions performance_measurement/zk_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def serialize(self, output_dir: str):
os.makedirs(reference_input_dir, exist_ok=True)
for input in self.all_inputs:
print(f"{input['trial']}")
input["input"]["spec"]["image"] = "ghcr.io/xlab-uiuc/zookeeper:0.2.14"
patch = jsonpatch.JsonPatch.from_diff(previous_input, input["input"])
if patch:
skip_input = False
Expand Down

0 comments on commit d80784e

Please sign in to comment.