Skip to content

Commit

Permalink
Merge pull request #177 from CITCOM-project/json-cate
Browse files Browse the repository at this point in the history
Json cate
  • Loading branch information
christopher-wild authored Apr 26, 2023
2 parents 72c3997 + 1203490 commit 8fafe9d
Show file tree
Hide file tree
Showing 25 changed files with 448 additions and 174 deletions.
5 changes: 3 additions & 2 deletions causal_testing/data_collection/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def filter_valid_data(self, data: pd.DataFrame, check_pos: bool = True) -> pd.Da
self.scenario.variables[var].z3
== self.scenario.variables[var].z3_val(self.scenario.variables[var].z3, row[var])
for var in self.scenario.variables
if var in row
if var in row and not pd.isnull(row[var])
]
for c in model:
solver.assert_and_track(c, f"model: {c}")
Expand Down Expand Up @@ -147,7 +147,8 @@ def collect_data(self, **kwargs) -> pd.DataFrame:

execution_data_df = self.data
for meta in self.scenario.metas():
meta.populate(execution_data_df)
if meta.name not in self.data:
meta.populate(execution_data_df)
scenario_execution_data_df = self.filter_valid_data(execution_data_df)
for var_name, var in self.scenario.variables.items():
if issubclass(var.datatype, Enum):
Expand Down
2 changes: 1 addition & 1 deletion causal_testing/generation/abstract_causal_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _generate_concrete_tests(
)

for v in self.scenario.inputs():
if row[v.name] != v.cast(model[v.z3]):
if v.name in row and row[v.name] != v.cast(model[v.z3]):
constraints = "\n ".join([str(c) for c in self.scenario.constraints if v.name in str(c)])
logger.warning(
f"Unable to set variable {v.name} to {row[v.name]} because of constraints\n"
Expand Down
44 changes: 44 additions & 0 deletions causal_testing/generation/enum_gen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""This module contains the class EnumGen, which allows us to easily create
generating uniform distributions from enums."""

from enum import Enum
from scipy.stats import rv_discrete
import numpy as np


class EnumGen(rv_discrete):
"""This class allows us to easily create generating uniform distributions
from enums. This is helpful for generating concrete test inputs from
abstract test cases."""

def __init__(self, datatype: Enum):
super().__init__()
self.datatype = dict(enumerate(datatype, 1))
self.inverse_dt = {v: k for k, v in self.datatype.items()}

def ppf(self, q):
"""Percent point function (inverse of `cdf`) at q of the given RV.
Parameters
----------
q : array_like
Lower tail probability.
Returns
-------
k : array_like
Quantile corresponding to the lower tail probability, q.
"""
return np.vectorize(self.datatype.get)(np.ceil(len(self.datatype) * q))

def cdf(self, k):
"""
Cumulative distribution function of the given RV.
Parameters
----------
k : array_like
quantiles
Returns
-------
cdf : ndarray
Cumulative distribution function evaluated at `x`
"""
return np.vectorize(self.inverse_dt.get)(k) / len(self.datatype)
127 changes: 86 additions & 41 deletions causal_testing/json_front/json_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
from causal_testing.specification.causal_specification import CausalSpecification
from causal_testing.specification.scenario import Scenario
from causal_testing.specification.variable import Input, Meta, Output
from causal_testing.testing.base_test_case import BaseTestCase
from causal_testing.testing.causal_test_case import CausalTestCase
from causal_testing.testing.causal_test_engine import CausalTestEngine
from causal_testing.testing.estimators import Estimator
from causal_testing.testing.base_test_case import BaseTestCase

logger = logging.getLogger(__name__)

Expand All @@ -41,7 +41,7 @@ class JsonUtility:
:attr {Meta} metas: Causal variables representing metavariables.
:attr {pd.DataFrame}: Pandas DataFrame containing runtime data.
:attr {dict} test_plan: Dictionary containing the key value pairs from the loaded json test plan.
:attr {Scenario} modelling_scenario:
:attr {Scenario} scenario:
:attr {CausalSpecification} causal_specification:
"""

Expand Down Expand Up @@ -75,6 +75,32 @@ def setup(self, scenario: Scenario):
self._json_parse()
self._populate_metas()

def _create_abstract_test_case(self, test, mutates, effects):
assert len(test["mutations"]) == 1
treatment_var = next(self.scenario.variables[v] for v in test["mutations"])
if not treatment_var.distribution:
fitter = Fitter(self.data[treatment_var.name], distributions=get_common_distributions())
fitter.fit()
(dist, params) = list(fitter.get_best(method="sumsquare_error").items())[0]
treatment_var.distribution = getattr(scipy.stats, dist)(**params)
self._append_to_file(treatment_var.name + f" {dist}({params})", logging.INFO)

abstract_test = AbstractCausalTestCase(
scenario=self.scenario,
intervention_constraints=[mutates[v](k) for k, v in test["mutations"].items()],
treatment_variable=treatment_var,
expected_causal_effect={
self.scenario.variables[variable]: effects[effect]
for variable, effect in test["expected_effect"].items()
},
effect_modifiers={self.scenario.variables[v] for v in test["effect_modifiers"]}
if "effect_modifiers" in test
else {},
estimate_type=test["estimate_type"],
effect=test.get("effect", "total"),
)
return abstract_test

def run_json_tests(self, effects: dict, estimators: dict, f_flag: bool = False, mutates: dict = None):
"""Runs and evaluates each test case specified in the JSON input
Expand All @@ -84,23 +110,51 @@ def run_json_tests(self, effects: dict, estimators: dict, f_flag: bool = False,
:param f_flag: Failure flag that if True the script will stop executing when a test fails.
"""
failures = 0
msg = ""
for test in self.test_plan["tests"]:
if "skip" in test and test["skip"]:
continue
test["estimator"] = estimators[test["estimator"]]
if "mutations" in test:
abstract_test = self._create_abstract_test_case(test, mutates, effects)

concrete_tests, dummy = abstract_test.generate_concrete_tests(5, 0.05)
failures = self._execute_tests(concrete_tests, test, f_flag)
msg = (
f"Executing test: {test['name']}\n"
+ "abstract_test\n"
+ f"{abstract_test}\n"
+ f"{abstract_test.treatment_variable.name},{abstract_test.treatment_variable.distribution}\n"
+ f"Number of concrete tests for test case: {str(len(concrete_tests))}\n"
+ f"{failures}/{len(concrete_tests)} failed for {test['name']}"
)
if test["estimate_type"] == "coefficient":
base_test_case = BaseTestCase(
treatment_variable=next(self.scenario.variables[v] for v in test["mutations"]),
outcome_variable=next(self.scenario.variables[v] for v in test["expected_effect"]),
effect=test.get("effect", "direct"),
)
assert len(test["expected_effect"]) == 1, "Can only have one expected effect."
concrete_tests = [
CausalTestCase(
base_test_case=base_test_case,
expected_causal_effect=next(
effects[effect] for variable, effect in test["expected_effect"].items()
),
estimate_type="coefficient",
effect_modifier_configuration={
self.scenario.variables[v] for v in test.get("effect_modifiers", [])
},
)
]
failures = self._execute_tests(concrete_tests, test, f_flag)
msg = (
f"Executing test: {test['name']} \n"
+ f" {concrete_tests[0]} \n"
+ f" {failures}/{len(concrete_tests)} failed for {test['name']}"
)
else:
abstract_test = self._create_abstract_test_case(test, mutates, effects)
concrete_tests, dummy = abstract_test.generate_concrete_tests(5, 0.05)
failures = self._execute_tests(concrete_tests, test, f_flag)

msg = (
f"Executing test: {test['name']} \n"
+ " abstract_test \n"
+ f" {abstract_test} \n"
+ f" {abstract_test.treatment_variable.name},"
+ f" {abstract_test.treatment_variable.distribution} \n"
+ f" Number of concrete tests for test case: {str(len(concrete_tests))} \n"
+ f" {failures}/{len(concrete_tests)} failed for {test['name']}"
)
self._append_to_file(msg, logging.INFO)
else:
outcome_variable = next(
Expand Down Expand Up @@ -132,24 +186,6 @@ def run_json_tests(self, effects: dict, estimators: dict, f_flag: bool = False,
)
self._append_to_file(msg, logging.INFO)

def _create_abstract_test_case(self, test, mutates, effects):
assert len(test["mutations"]) == 1
abstract_test = AbstractCausalTestCase(
scenario=self.scenario,
intervention_constraints=[mutates[v](k) for k, v in test["mutations"].items()],
treatment_variable=next(self.scenario.variables[v] for v in test["mutations"]),
expected_causal_effect={
self.scenario.variables[variable]: effects[effect]
for variable, effect in test["expected_effect"].items()
},
effect_modifiers={self.scenario.variables[v] for v in test["effect_modifiers"]}
if "effect_modifiers" in test
else {},
estimate_type=test["estimate_type"],
effect=test.get("effect", "total"),
)
return abstract_test

def _execute_tests(self, concrete_tests, test, f_flag):
failures = 0
if "formula" in test:
Expand All @@ -175,13 +211,6 @@ def _populate_metas(self):
"""
for meta in self.scenario.variables_of_type(Meta):
meta.populate(self.data)
for var in self.scenario.variables_of_type(Meta).union(self.scenario.variables_of_type(Output)):
if not var.distribution:
fitter = Fitter(self.data[var.name], distributions=get_common_distributions())
fitter.fit()
(dist, params) = list(fitter.get_best(method="sumsquare_error").items())[0]
var.distribution = getattr(scipy.stats, dist)(**params)
self._append_to_file(var.name + f" {dist}({params})", logging.INFO)

def _execute_test_case(self, causal_test_case: CausalTestCase, test: Iterable[Mapping], f_flag: bool) -> bool:
"""Executes a singular test case, prints the results and returns the test case result
Expand All @@ -193,6 +222,15 @@ def _execute_test_case(self, causal_test_case: CausalTestCase, test: Iterable[Ma
:rtype: bool
"""
failed = False

for var in self.scenario.variables_of_type(Meta).union(self.scenario.variables_of_type(Output)):
if not var.distribution:
fitter = Fitter(self.data[var.name], distributions=get_common_distributions())
fitter.fit()
(dist, params) = list(fitter.get_best(method="sumsquare_error").items())[0]
var.distribution = getattr(scipy.stats, dist)(**params)
self._append_to_file(var.name + f" {dist}({params})", logging.INFO)

causal_test_engine, estimation_model = self._setup_test(causal_test_case, test)
causal_test_result = causal_test_engine.execute_test(
estimation_model, causal_test_case, estimate_type=causal_test_case.estimate_type
Expand All @@ -218,16 +256,23 @@ def _execute_test_case(self, causal_test_case: CausalTestCase, test: Iterable[Ma
logger.warning(" FAILED- expected %s, got %s", causal_test_case.expected_causal_effect, result_string)
return failed

def _setup_test(self, causal_test_case: CausalTestCase, test: Mapping) -> tuple[CausalTestEngine, Estimator]:
def _setup_test(
self, causal_test_case: CausalTestCase, test: Mapping, conditions: list[str] = None
) -> tuple[CausalTestEngine, Estimator]:
"""Create the necessary inputs for a single test case
:param causal_test_case: The concrete test case to be executed
:param test: Single JSON test definition stored in a mapping (dict)
:param conditions: A list of conditions which should be applied to the
data. Conditions should be in the query format detailed at
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html
:returns:
- causal_test_engine - Test Engine instance for the test being run
- estimation_model - Estimator instance for the test being run
"""

data_collector = ObservationalDataCollector(self.scenario, self.data)
data_collector = ObservationalDataCollector(
self.scenario, self.data.query(" & ".join(conditions)) if conditions else self.data
)
causal_test_engine = CausalTestEngine(self.causal_specification, data_collector, index_col=0)

minimal_adjustment_set = self.causal_specification.causal_dag.identification(causal_test_case.base_test_case)
Expand Down
9 changes: 9 additions & 0 deletions causal_testing/specification/causal_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,5 +521,14 @@ def identification(self, base_test_case: BaseTestCase):
minimal_adjustment_set = min(minimal_adjustment_sets, key=len)
return minimal_adjustment_set

def to_dot_string(self) -> str:
"""Return a string of the DOT representation of the causal DAG.
:return DOT string of the DAG.
"""
dotstring = "digraph G {\n"
dotstring += "".join([f"{a} -> {b};\n" for a, b in self.graph.edges])
dotstring += "}"
return dotstring

def __str__(self):
return f"Nodes: {self.graph.nodes}\nEdges: {self.graph.edges}"
28 changes: 28 additions & 0 deletions causal_testing/specification/metamorphic_relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ def execute_tests(self, data_collector: ExperimentalDataCollector):
def assertion(self, source_output, follow_up_output):
"""An assertion that should be applied to an individual metamorphic test run."""

@abstractmethod
def to_json_stub(self, skip=True) -> dict:
"""Convert to a JSON frontend stub string for user customisation"""

@abstractmethod
def test_oracle(self, test_results):
"""A test oracle that assert whether the MR holds or not based on ALL test results.
Expand Down Expand Up @@ -129,6 +133,18 @@ def test_oracle(self, test_results):
self.tests
), f"{str(self)}: {len(test_results['fail'])}/{len(self.tests)} tests failed."

def to_json_stub(self, skip=True) -> dict:
"""Convert to a JSON frontend stub string for user customisation"""
return {
"name": str(self),
"estimator": "LinearRegressionEstimator",
"estimate_type": "coefficient",
"effect": "direct",
"mutations": [self.treatment_var],
"expected_effect": {self.output_var: "SomeEffect"},
"skip": skip,
}

def __str__(self):
formatted_str = f"{self.treatment_var} --> {self.output_var}"
if self.adjustment_vars:
Expand All @@ -149,6 +165,18 @@ def test_oracle(self, test_results):
len(test_results["fail"]) == 0
), f"{str(self)}: {len(test_results['fail'])}/{len(self.tests)} tests failed."

def to_json_stub(self, skip=True) -> dict:
"""Convert to a JSON frontend stub string for user customisation"""
return {
"name": str(self),
"estimator": "LinearRegressionEstimator",
"estimate_type": "coefficient",
"effect": "direct",
"mutations": [self.treatment_var],
"expected_effect": {self.output_var: "NoEffect"},
"skip": skip,
}

def __str__(self):
formatted_str = f"{self.treatment_var} _||_ {self.output_var}"
if self.adjustment_vars:
Expand Down
2 changes: 1 addition & 1 deletion causal_testing/testing/causal_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
self,
base_test_case: BaseTestCase,
expected_causal_effect: CausalTestOutcome,
control_value: Any,
control_value: Any = None,
treatment_value: Any = None,
estimate_type: str = "ate",
effect_modifier_configuration: dict[Variable:Any] = None,
Expand Down
9 changes: 9 additions & 0 deletions causal_testing/testing/causal_test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ def _return_causal_test_results(self, estimate_type, estimator, causal_test_case
effect_modifier_configuration=causal_test_case.effect_modifier_configuration,
confidence_intervals=confidence_intervals,
)
elif estimate_type == "coefficient":
logger.debug("calculating coefficient")
coefficient, confidence_intervals = estimator.estimate_unit_ate()
causal_test_result = CausalTestResult(
estimator=estimator,
test_value=TestValue("coefficient", coefficient),
effect_modifier_configuration=causal_test_case.effect_modifier_configuration,
confidence_intervals=confidence_intervals,
)
elif estimate_type == "ate":
logger.debug("calculating ate")
ate, confidence_intervals = estimator.estimate_ate()
Expand Down
Loading

0 comments on commit 8fafe9d

Please sign in to comment.