Skip to content

Commit

Permalink
Automate figuring out the indeterministic properties regex
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <[email protected]>
  • Loading branch information
tylergu committed Feb 18, 2024
1 parent a6eda7e commit fc8ee09
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 13 deletions.
205 changes: 192 additions & 13 deletions acto/post_process/post_diff_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import difflib
import glob
import hashlib
import json
Expand All @@ -13,7 +14,7 @@
import threading
import time
from copy import deepcopy
from typing import Dict, List, Optional
from typing import Optional

import pandas as pd
import pydantic
Expand All @@ -33,22 +34,21 @@
from acto.serialization import ActoEncoder
from acto.snapshot import Snapshot
from acto.trial import Step
from acto.utils import add_acto_label, get_thread_logger
from acto.utils.error_handler import handle_excepthook, thread_excepthook
from acto.utils import add_acto_label, error_handler, get_thread_logger


class DiffTestResult(pydantic.BaseModel):
"""The result of a diff test
It contains the input digest, the snapshot, the original trial and generation,
and the time spent on each step
The oracle result is separated from this dataclass, so that it can easily recomputed
after changing the oracle
The oracle result is separated from this dataclass,
so that it can easily recomputed after changing the oracle
"""

input_digest: str
snapshot: Snapshot
originals: list[Dict]
time: Dict
originals: list[dict]
time: dict

@classmethod
def from_file(cls, file_path: str) -> "DiffTestResult":
Expand Down Expand Up @@ -332,7 +332,7 @@ class AdditionalRunner:

def __init__(
self,
context: Dict,
context: dict,
deploy: Deploy,
workdir: str,
cluster: base.KubernetesEngine,
Expand Down Expand Up @@ -403,7 +403,7 @@ class DeployRunner:
def __init__(
self,
workqueue: multiprocessing.Queue,
context: Dict,
context: dict,
deploy: Deploy,
workdir: str,
cluster: base.KubernetesEngine,
Expand Down Expand Up @@ -517,6 +517,81 @@ def run(self):
generation += 1


def compute_common_regex(paths: list[str]) -> list[str]:
"""Compute the common regex from the list of paths"""
common_regex: set[str] = set()
sorted_paths = sorted(paths)
curr_regex = None
for i in range(len(sorted_paths)):
if curr_regex is None or not re.match(curr_regex, sorted_paths[i]):
# if we do not have enough items to look ahead, then we give up
if i + 2 >= len(sorted_paths):
common_regex.add("^" + re.escape(sorted_paths[i]) + "$")
continue

path = sorted_paths[i]
next_path = sorted_paths[i + 1]
next_next_path = sorted_paths[i + 2]

# if the resource type is different, then we give up to avoid
# a wildcard regex
if (
path.startswith("root")
and next_path.startswith("root")
and next_next_path.startswith("root")
):
resource_type = re.findall(r"root\['(.*?)'\]", path)[0]
next_resource_type = re.findall(r"root\['(.*?)'\]", next_path)[
0
]
next_next_resource_type = re.findall(
r"root\['(.*?)'\]", next_next_path
)[0]
if (
resource_type != next_resource_type
or next_resource_type != next_next_resource_type
):
common_regex.add("^" + re.escape(path) + "$")
continue

Check warning on line 555 in acto/post_process/post_diff_test.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 554-555

# Look ahead to find a regex candidate
# Construct the regex candidate using the difflib
matched_blocks = difflib.SequenceMatcher(
None,
path,
next_path,
).get_matching_blocks()
regex_candidate = r""
for block in matched_blocks:
print(block)
if block.a == 0 and block.b == 0:
regex_candidate += r"^"
elif block.size != 0:
# we may have duplicate wild card here,
# but it is fine because two consequence wild cards is
# equivalent to one wild card
regex_candidate += r".*"

if block.size <= 3:
# prevent accidental single character match in random string
continue
regex_candidate += re.escape(
path[block.a : block.a + block.size]
)
regex_candidate += r"$"

# Check if the regex candidate is valid for the third item
# if matched, then we have a valid regex
# if not, then we give up finding a common regex for the current
# item
if re.match(regex_candidate, next_next_path):
common_regex.add(regex_candidate)
curr_regex = regex_candidate
else:
common_regex.add(path)

Check warning on line 591 in acto/post_process/post_diff_test.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 591
return list(common_regex)


class PostDiffTest(PostProcessor):
"""Post diff test class for Acto"""

Expand Down Expand Up @@ -566,7 +641,7 @@ def __init__(
],
)

self.unique_inputs: Dict[
self.unique_inputs: dict[
str, pd.DataFrame
] = {} # input digest -> group of steps
groups = self.df.groupby("input_digest")
Expand Down Expand Up @@ -605,7 +680,7 @@ def post_process(self, workdir: str, num_workers: int = 1):
for unique_input_group in self.unique_inputs.values():
workqueue.put(unique_input_group)

runners: List[DeployRunner] = []
runners: list[DeployRunner] = []

Check warning on line 683 in acto/post_process/post_diff_test.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 683
for i in range(num_workers):
runner = DeployRunner(
workqueue,
Expand Down Expand Up @@ -635,13 +710,36 @@ def check(self, workdir: str, num_workers: int = 1):
)
trial_dirs = glob.glob(os.path.join(workdir, "trial-*"))

with open(self.config.seed_custom_resource, "r", encoding="utf-8") as f:
seed_cr = json.load(f)
seed_input_digest = hashlib.md5(

Check warning on line 715 in acto/post_process/post_diff_test.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 713-715
json.dumps(seed_cr, sort_keys=True).encode("utf-8")
).hexdigest()

workqueue: multiprocessing.Queue = multiprocessing.Queue()
for trial_dir in trial_dirs:
for diff_test_result_path in glob.glob(
os.path.join(trial_dir, "difftest-*.json")
):
# 1. Populate the workqueue
workqueue.put(diff_test_result_path)

# 2. Find the seed test result and compute the common regex
diff_test_result = DiffTestResult.from_file(
diff_test_result_path
)
if diff_test_result.input_digest == seed_input_digest:
diff_skip_regex = self.__get_diff_paths(diff_test_result)
logger.info(
"Seed input digest: %s, diff_skip_regex: %s",
seed_input_digest,
diff_skip_regex,
)
if self.config.diff_ignore_fields is None:
self.config.diff_ignore_fields = diff_skip_regex
else:
self.config.diff_ignore_fields.extend(diff_skip_regex)

Check warning on line 741 in acto/post_process/post_diff_test.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 728-741

processes = []
for i in range(num_workers):
p = multiprocessing.Process(
Expand Down Expand Up @@ -822,6 +920,87 @@ def check_diff_test_step(
to_state=diff_test_result.snapshot.system_state,
)

def __get_diff_paths(self, diff_test_result: DiffTestResult) -> list[str]:
"""Get the diff paths from a diff test result
Algorithm:
Iterate on the original trials, in principle they should be the same
If they are not, the diffs should be the indeterministic fields
to be skipped when doing the comparison.
Naively, we can just append all the indeterministic fields and return
them, however, there are cases where the field path itself is not
deterministic (e.g. the name of the secret could be randomly generated)
To handle this randomness in the name, we can use the first two
original trials to compare the system state to get the initial
regex for skipping.
If we do not have random names, the subsequent trials should not
have additional indeterministic fields.
Then, we keep iterating on the rest of the original results, and
check if we have additional indeterministic fields. If we do, we
collect them and try to figure out the proper regex to skip them.
Args:
diff_test_result (DiffTestResult): The diff test result
Returns:
list[str]: The list of diff paths
"""

initial_regex: set[str] = set()
indeterministic_regex: set[str] = set()
first_step = True
for original in diff_test_result.originals:
trial = original["trial"]
gen = original["gen"]
trial_basename = os.path.basename(trial)
original_result = self.trial_to_steps[trial_basename].steps[
str(gen)
]
diff_result = self.check_diff_test_step(
diff_test_result, original_result, self.config
)

if diff_result is not None:
for diff_category, diff in diff_result.diff.items():
if diff_category == "dictionary_item_removed":
removed_items: list[str] = diff
for removed_item in removed_items:
removed_item = re.escape(removed_item)
if first_step:
initial_regex.add(removed_item)
else:
indeterministic_regex.add(removed_item)
elif diff_category == "dictionary_item_added":
added_items: list[str] = diff
for added_item in added_items:
added_item = re.escape(added_item)
if first_step:
initial_regex.add(added_item)
else:
indeterministic_regex.add(added_item)
elif diff_category == "values_changed":
changed_items: dict[str, dict] = diff
for changed_item in changed_items.keys():
changed_item = re.escape(changed_item)
if first_step:
initial_regex.add(changed_item)
else:
indeterministic_regex.add(changed_item)
elif diff_category == "type_changes":
type_changes: dict[str, dict] = diff
for type_change in type_changes.keys():
type_change = re.escape(type_change)
if first_step:
initial_regex.add(type_change)
else:
indeterministic_regex.add(type_change)
first_step = False

# Handle the case where the name is not deterministic
common_regex = compute_common_regex(list(indeterministic_regex))

return list(initial_regex) + common_regex

Check warning on line 1002 in acto/post_process/post_diff_test.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 949-1002


def main():
"""Main entry point."""
Expand All @@ -834,8 +1013,8 @@ def main():
args = parser.parse_args()

# Register custom exception hook
sys.excepthook = handle_excepthook
threading.excepthook = thread_excepthook
sys.excepthook = error_handler.handle_excepthook
threading.excepthook = error_handler.thread_excepthook

Check warning on line 1017 in acto/post_process/post_diff_test.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 1016-1017

log_filename = "check.log" if args.checkonly else "test.log"
os.makedirs(args.workdir_path, exist_ok=True)
Expand Down
Loading

0 comments on commit fc8ee09

Please sign in to comment.