From 7eea3dad52218fc32a719b32dcbbbc2f53c25f19 Mon Sep 17 00:00:00 2001 From: jrzkaminski <86363785+jrzkaminski@users.noreply.github.com> Date: Thu, 3 Aug 2023 14:18:25 +0300 Subject: [PATCH] middle state commit --- bamt/networks/base.py | 16 ++- bamt/networks/composite_bn.py | 123 +++++++++++++++++++++++- bamt/nodes/base.py | 2 + bamt/nodes/composite_continuous_node.py | 102 ++++++++++++++++++-- bamt/nodes/composite_discrete_node.py | 108 ++++++++++++++++++++- bamt/nodes/conditional_logit_node.py | 3 +- bamt/nodes/logit_node.py | 7 +- 7 files changed, 341 insertions(+), 20 deletions(-) diff --git a/bamt/networks/base.py b/bamt/networks/base.py index 9d46054..f57f269 100644 --- a/bamt/networks/base.py +++ b/bamt/networks/base.py @@ -517,7 +517,9 @@ def fit_parameters(self, data: pd.DataFrame, dropna: bool = True, n_jobs: int = def worker(node): return node.fit_parameters(data) - results = Parallel(n_jobs=n_jobs)(delayed(worker)(node) for node in self.nodes) + # results = Parallel(n_jobs=n_jobs)(delayed(worker)(node) for node in self.nodes) + + results = [worker(node) for node in self.nodes] for result, node in zip(results, self.nodes): self.distributions[node.name] = result @@ -605,7 +607,7 @@ def wrapper(): if any(pd.isnull(pvalue) for pvalue in pvals): output[node.name] = np.nan continue - + print("PVALS \n:", pvals) node_data = self.distributions[node.name] if models_dir and ("hybcprob" in node_data.keys()): for obj, obj_data in node_data["hybcprob"].items(): @@ -633,9 +635,13 @@ def wrapper(): return output if progress_bar: - seq = Parallel(n_jobs=parall_count)( - delayed(wrapper)() for _ in tqdm(range(n), position=0, leave=True) - ) + # seq = Parallel(n_jobs=parall_count)( + # delayed(wrapper)() for _ in tqdm(range(n), position=0, leave=True) + # ) + seq = [] + for _ in tqdm(range(n), position=0, leave=True): + result = wrapper() + seq.append(result) else: seq = Parallel(n_jobs=parall_count)(delayed(wrapper)() for _ in range(n)) seq_df = pd.DataFrame.from_dict(seq, orient="columns") diff --git a/bamt/networks/composite_bn.py b/bamt/networks/composite_bn.py index d5cd464..4a31b45 100644 --- a/bamt/networks/composite_bn.py +++ b/bamt/networks/composite_bn.py @@ -1,7 +1,13 @@ import re +import random + +import numpy as np + +from tqdm import tqdm +from bamt.log import logger_network from bamt.networks.base import BaseNetwork import pandas as pd -from typing import Optional, Dict +from typing import Optional, Dict, Union, List from bamt.builders.composite_builder import CompositeStructureBuilder, CompositeDefiner from bamt.utils.composite_utils.MLUtils import MlModels @@ -109,3 +115,118 @@ def set_regressor(self, regressors: Dict[str, object]): ) else: continue + + def sample( + self, + n: int, + models_dir: Optional[str] = None, + progress_bar: bool = True, + evidence: Optional[Dict[str, Union[str, int, float]]] = None, + as_df: bool = True, + predict: bool = False, + parall_count: int = 1, + ) -> Union[None, pd.DataFrame, List[Dict[str, Union[str, int, float]]]]: + """ + Sampling from Bayesian Network + n: int number of samples + evidence: values for nodes from user + parall_count: number of threads. Defaults to 1. + """ + from joblib import Parallel, delayed + + random.seed() + if not self.distributions.items(): + logger_network.error( + "Parameter learning wasn't done. Call fit_parameters method" + ) + return None + if evidence: + for node in self.nodes: + if (node.type == "Discrete") & (node.name in evidence.keys()): + if not (isinstance(evidence[node.name], str)): + evidence[node.name] = str(int(evidence[node.name])) + + def wrapper(): + output = {} + for node in self.nodes: + parents = node.cont_parents + node.disc_parents + if evidence and node.name in evidence.keys(): + output[node.name] = evidence[node.name] + else: + if not parents: + pvals = None + else: + if self.type == "Discrete": + pvals = [str(output[t]) for t in parents] + elif type(node).__name__ in ("CompositeDiscreteNode", "CompositeContinuousNode"): + pvals = [str(output[t]) for t in parents] + else: + pvals = [output[t] for t in parents] + + # If any nan from parents, sampling from node blocked. + if any(pd.isnull(pvalue) for pvalue in pvals): + output[node.name] = np.nan + continue + print("NODE \n:", node.name) + print("PVALS \n:", pvals) + node_data = self.distributions[node.name] + if models_dir and ("hybcprob" in node_data.keys()): + for obj, obj_data in node_data["hybcprob"].items(): + if "serialization" in obj_data.keys(): + if "gaussian" in node.type.lower(): + model_type = "regressor" + else: + model_type = "classifier" + if ( + obj_data["serialization"] == "joblib" + and obj_data[f"{model_type}_obj"] + ): + new_path = ( + models_dir + + f"\\{node.name.replace(' ', '_')}\\{obj}.joblib.compressed" + ) + node_data["hybcprob"][obj][ + f"{model_type}_obj" + ] = new_path + + if predict: + output[node.name] = node.predict(node_data, pvals=pvals) + else: + output[node.name] = node.choose(node_data, pvals=pvals) + return output + + if progress_bar: + # seq = Parallel(n_jobs=parall_count)( + # delayed(wrapper)() for _ in tqdm(range(n), position=0, leave=True) + # ) + seq = [] + for _ in tqdm(range(n), position=0, leave=True): + result = wrapper() + seq.append(result) + else: + seq = Parallel(n_jobs=parall_count)(delayed(wrapper)() for _ in range(n)) + seq_df = pd.DataFrame.from_dict(seq, orient="columns") + seq_df.dropna(inplace=True) + cont_nodes = [ + c.name for c in self.nodes if c.type != "Discrete" and "Logit" not in c.type + ] + positive_columns = [ + c for c in cont_nodes if self.descriptor["signs"][c] == "pos" + ] + seq_df = seq_df[(seq_df[positive_columns] >= 0).all(axis=1)] + seq_df.reset_index(inplace=True, drop=True) + seq = seq_df.to_dict("records") + sample_output = pd.DataFrame.from_dict(seq, orient="columns") + + if as_df: + if self.has_logit or self.type == "Composite": + for node in self.nodes: + for feature_key, encoder in node.encoders: + sample_output[feature_key] = encoder[ + feature_key + ].inverse_transform(sample_output[feature_key]) + pass + + return sample_output + else: + return seq diff --git a/bamt/nodes/base.py b/bamt/nodes/base.py index 13e2757..47e3478 100644 --- a/bamt/nodes/base.py +++ b/bamt/nodes/base.py @@ -100,6 +100,8 @@ def wrapper(self, data, *args, **kwargs): encoder = LabelEncoder() data[column] = encoder.fit_transform(data[column]) self.encoders[column] = encoder + elif data[column].dtype in ("float64", "int64"): + continue else: logger_nodes.warning( msg="Wrong datatype passed to categorical data encoder" diff --git a/bamt/nodes/composite_continuous_node.py b/bamt/nodes/composite_continuous_node.py index 650991e..95b62b0 100644 --- a/bamt/nodes/composite_continuous_node.py +++ b/bamt/nodes/composite_continuous_node.py @@ -1,3 +1,9 @@ +import pickle +import numpy as np +import joblib +import random +import math + from .base import BaseNode from .gaussian_node import GaussianNode from .schema import GaussianParams, HybcprobParams @@ -6,6 +12,8 @@ from pandas import DataFrame from typing import Optional, List, Union +from ..log import logger_nodes +from sklearn.metrics import mean_squared_error as mse NodeInfo = Union[GaussianParams, HybcprobParams] @@ -18,11 +26,93 @@ def __init__(self, name, regressor: Optional[object] = None): self.regressor = regressor self.type = "CompositeContinuous" + f" ({type(self.regressor).__name__})" - def fit_parameters(self, data: DataFrame) -> NodeInfo: - return GaussianNode(self.name, self.regressor).fit_parameters(data) + @BaseNode.encode_categorical_data_if_any + def fit_parameters(self, data: DataFrame, **kwargs) -> GaussianParams: + parents = self.cont_parents + self.disc_parents + if parents: + self.regressor.fit(data[parents].values, data[self.name].values, **kwargs) + predicted_value = self.regressor.predict(data[parents].values) + variance = mse(data[self.name].values, predicted_value, squared=False) + serialization = self.choose_serialization(self.regressor) + + if serialization == "pickle": + ex_b = pickle.dumps(self.regressor, protocol=4) + # model_ser = ex_b.decode('latin1').replace('\'', '\"') + model_ser = ex_b.decode("latin1") + return { + "mean": np.nan, + "regressor_obj": model_ser, + "regressor": type(self.regressor).__name__, + "variance": variance, + "serialization": "pickle", + } + else: + logger_nodes.warning( + f"{self.name}::Pickle failed. BAMT will use Joblib. | " + + str(serialization.args[0]) + ) + + path = self.get_path_joblib( + node_name=self.name.replace(" ", "_"), + specific=f"{self.name.replace(' ', '_')}", + ) + joblib.dump(self.regressor, path, compress=True, protocol=4) + return { + "mean": np.nan, + "regressor_obj": path, + "regressor": type(self.regressor).__name__, + "variance": variance, + "serialization": "joblib", + } + else: + logger_nodes.warning( + msg="Composite Continuous Node should always have a parent" + ) + + @staticmethod + def choose(node_info: GaussianParams, pvals: List[float]) -> float: + """ + Return value from Logit node + params: + node_info: nodes info from distributions + pvals: parent values + """ + if pvals: + for el in pvals: + if str(el) == "nan": + return np.nan + if node_info["serialization"] == "joblib": + model = joblib.load(node_info["regressor_obj"]) + else: + a = node_info["regressor_obj"].encode("latin1") + model = pickle.loads(a) + + cond_mean = model.predict(np.array(pvals).reshape(1, -1))[0] + var = node_info["variance"] + return random.gauss(cond_mean, var) + else: + return random.gauss(node_info["mean"], math.sqrt(node_info["variance"])) + + @staticmethod + def predict(node_info: GaussianParams, pvals: List[float]) -> float: + """ + Return prediction from Logit node + params: + node_info: nodes info from distributions + pvals: parent values + """ - def choose(self, node_info: NodeInfo, pvals: List[Union[str, float]]) -> float: - return GaussianNode(self.name, self.regressor).choose(node_info, pvals) + if pvals: + for el in pvals: + if str(el) == "nan": + return np.nan + if node_info["serialization"] == "joblib": + model = joblib.load(node_info["regressor_obj"]) + else: + a = node_info["regressor_obj"].encode("latin1") + model = pickle.loads(a) - def predict(self, node_info: NodeInfo, pvals: List[Union[str, float]]) -> float: - return GaussianNode(self.name, self.regressor).predict(node_info, pvals) + pred = model.predict(np.array(pvals).reshape(1, -1))[0] + return pred + else: + return node_info["mean"] diff --git a/bamt/nodes/composite_discrete_node.py b/bamt/nodes/composite_discrete_node.py index 326c5dd..d28152b 100644 --- a/bamt/nodes/composite_discrete_node.py +++ b/bamt/nodes/composite_discrete_node.py @@ -1,18 +1,118 @@ -from typing import Optional +import numpy as np + +import pickle +import joblib +import random + +from .base import BaseNode +from .schema import LogitParams +from bamt.log import logger_nodes + from sklearn import linear_model -from .logit_node import LogitNode +from pandas import DataFrame +from typing import Optional, List, Union -class CompositeDiscreteNode(LogitNode): +class CompositeDiscreteNode(BaseNode): """ Class for composite discrete node. """ def __init__(self, name, classifier: Optional[object] = None): - super().__init__(name, classifier) + super(CompositeDiscreteNode, self).__init__(name) if classifier is None: classifier = linear_model.LogisticRegression( multi_class="multinomial", solver="newton-cg", max_iter=100 ) self.classifier = classifier self.type = "CompositeDiscrete" + f" ({type(self.classifier).__name__})" + + @BaseNode.encode_categorical_data_if_any + def fit_parameters(self, data: DataFrame, **kwargs) -> LogitParams: + model_ser = None + path = None + + parents = self.disc_parents + self.cont_parents + self.classifier.fit(X=data[parents].values, y=data[self.name].values, **kwargs) + serialization = self.choose_serialization(self.classifier) + + if serialization == "pickle": + ex_b = pickle.dumps(self.classifier, protocol=4) + # model_ser = ex_b.decode('latin1').replace('\'', '\"') + model_ser = ex_b.decode("latin1") + serialization_name = "pickle" + else: + logger_nodes.warning( + f"{self.name}::Pickle failed. BAMT will use Joblib. | " + + str(serialization.args[0]) + ) + + path = self.get_path_joblib(self.name, specific=self.name.replace(" ", "_")) + + joblib.dump(self.classifier, path, compress=True, protocol=4) + serialization_name = "joblib" + return { + "classes": list(self.classifier.classes_), + "classifier_obj": path or model_ser, + "classifier": type(self.classifier).__name__, + "serialization": serialization_name, + } + + @staticmethod + def choose(node_info: LogitParams, pvals: List[Union[float]]) -> str: + """ + Return value from Logit node + params: + node_info: nodes info from distributions + pvals: parent values + """ + + rindex = 0 + + if len(node_info["classes"]) > 1: + if node_info["serialization"] == "joblib": + model = joblib.load(node_info["classifier_obj"]) + else: + a = node_info["classifier_obj"].encode("latin1") + model = pickle.loads(a) + distribution = model.predict_proba(np.array(pvals).reshape(1, -1))[0] + + rand = random.random() + lbound = 0 + ubound = 0 + for interval in range(len(node_info["classes"])): + ubound += distribution[interval] + if lbound <= rand < ubound: + rindex = interval + break + else: + lbound = ubound + + return str(node_info["classes"][rindex]) + + else: + return str(node_info["classes"][0]) + + @staticmethod + def predict(node_info: LogitParams, pvals: List[Union[float]]) -> str: + """ + Return prediction from Logit node + params: + node_info: nodes info from distributions + pvals: parent values + """ + + if len(node_info["classes"]) > 1: + if node_info["serialization"] == "joblib": + model = joblib.load(node_info["classifier_obj"]) + else: + # str_model = node_info["classifier_obj"].decode('latin1').replace('\'', '\"') + a = node_info["classifier_obj"].encode("latin1") + model = pickle.loads(a) + + pred = model.predict(np.array(pvals).reshape(1, -1))[0] + + return str(pred) + + else: + return str(node_info["classes"][0]) diff --git a/bamt/nodes/conditional_logit_node.py b/bamt/nodes/conditional_logit_node.py index 3a1b1a0..a12f8c7 100644 --- a/bamt/nodes/conditional_logit_node.py +++ b/bamt/nodes/conditional_logit_node.py @@ -54,7 +54,8 @@ def fit_parameters(self, data: DataFrame) -> Dict[str, Dict[str, LogitParams]]: values = set(new_data[self.name]) if len(values) > 1: model.fit( - new_data[self.cont_parents].values, new_data[self.name].values + X=new_data[self.cont_parents].values, + y=new_data[self.name].values, ) classes = list(model.classes_) serialization = self.choose_serialization(model) diff --git a/bamt/nodes/logit_node.py b/bamt/nodes/logit_node.py index 21558a5..48a1288 100644 --- a/bamt/nodes/logit_node.py +++ b/bamt/nodes/logit_node.py @@ -27,7 +27,6 @@ def __init__(self, name, classifier: Optional[object] = None): self.classifier = classifier self.type = "Logit" + f" ({type(self.classifier).__name__})" - @BaseNode.encode_categorical_data_if_any def fit_parameters(self, data: DataFrame, **kwargs) -> LogitParams: model_ser = None path = None @@ -58,7 +57,8 @@ def fit_parameters(self, data: DataFrame, **kwargs) -> LogitParams: "serialization": serialization_name, } - def choose(self, node_info: LogitParams, pvals: List[Union[float]]) -> str: + @staticmethod + def choose(node_info: LogitParams, pvals: List[Union[float]]) -> str: """ Return value from Logit node params: @@ -94,7 +94,8 @@ def choose(self, node_info: LogitParams, pvals: List[Union[float]]) -> str: else: return str(node_info["classes"][0]) - def predict(self, node_info: LogitParams, pvals: List[Union[float]]) -> str: + @staticmethod + def predict(node_info: LogitParams, pvals: List[Union[float]]) -> str: """ Return prediction from Logit node params: