From aaf9773abbcc1b89f3ba44942d4351f769db6ff1 Mon Sep 17 00:00:00 2001 From: jrzkaminski <86363785+jrzkaminski@users.noreply.github.com> Date: Fri, 4 Aug 2023 15:26:31 +0300 Subject: [PATCH] working version --- bamt/networks/composite_bn.py | 19 ++-- bamt/nodes/base.py | 19 ---- bamt/nodes/composite_continuous_node.py | 112 +----------------------- bamt/nodes/composite_discrete_node.py | 106 +--------------------- bamt/nodes/gaussian_node.py | 10 ++- bamt/nodes/logit_node.py | 8 +- 6 files changed, 31 insertions(+), 243 deletions(-) diff --git a/bamt/networks/composite_bn.py b/bamt/networks/composite_bn.py index ab569de..55fd86b 100644 --- a/bamt/networks/composite_bn.py +++ b/bamt/networks/composite_bn.py @@ -207,9 +207,14 @@ def wrapper(): seq = Parallel(n_jobs=parall_count)(delayed(wrapper)() for _ in range(n)) seq_df = pd.DataFrame.from_dict(seq, orient="columns") seq_df.dropna(inplace=True) + print("Descriptor \n", self.descriptor) cont_nodes = [ - c.name for c in self.nodes if c.type != "Discrete" and "Logit" not in c.type + c.name + for c in self.nodes + if type(c).__name__ + not in ("DiscreteNode", "LogitNode", "CompositeDiscreteNode") ] + print("Cont_nodes \n", cont_nodes) positive_columns = [ c for c in cont_nodes if self.descriptor["signs"][c] == "pos" ] @@ -219,8 +224,7 @@ def wrapper(): sample_output = pd.DataFrame.from_dict(seq, orient="columns") if as_df: - sample_output = self.decode_categorical_data(sample_output) - + sample_output = self._decode_categorical_data(sample_output) return sample_output else: return seq @@ -243,7 +247,7 @@ def fit_parameters(self, data: pd.DataFrame, dropna: bool = True, n_jobs: int = index = sorted([int(id) for id in os.listdir(STORAGE)])[-1] + 1 os.makedirs(os.path.join(STORAGE, str(index))) - data = self.encode_categorical_data(data) + data = self._encode_categorical_data(data) # Turn all discrete values to str for learning algorithm if "disc_num" in self.descriptor["types"].values(): @@ -264,14 +268,15 @@ def worker(node): for result, node in zip(results, self.nodes): self.distributions[node.name] = result - def encode_categorical_data(self, data): - for column in data.select_dtypes(include=['object', 'string']).columns: + def _encode_categorical_data(self, data): + for column in data.select_dtypes(include=["object", "string"]).columns: encoder = LabelEncoder() data[column] = encoder.fit_transform(data[column]) self.encoders[column] = encoder return data - def decode_categorical_data(self, data): + def _decode_categorical_data(self, data): + data = data.apply(lambda col: pd.to_numeric(col).astype(int) if col.dtype == 'object' else col) for column, encoder in self.encoders.items(): data[column] = encoder.inverse_transform(data[column]) return data diff --git a/bamt/nodes/base.py b/bamt/nodes/base.py index 37972f9..74029e1 100644 --- a/bamt/nodes/base.py +++ b/bamt/nodes/base.py @@ -1,10 +1,6 @@ from bamt.config import config -import numpy as np from typing import Union -from sklearn.preprocessing import LabelEncoder -from functools import wraps -from bamt.log import logger_nodes import pickle import os @@ -91,18 +87,3 @@ def get_path_joblib(node_name: str, specific: str = "") -> str: os.path.join(path_to_check, f"{specific}.joblib.compressed") ) return path - - def encode_categorical_data_if_any(self, data): - for column in self.disc_parents + [self.name]: - if data[column].dtype in ("object", "str", "string"): - encoder = LabelEncoder() - data[column] = encoder.fit_transform(data[column]) - self.encoders[column] = encoder - elif np.issubdtype(data[column].dtype, np.number): - continue - else: - logger_nodes.warning( - msg="Wrong datatype passed to categorical data encoder" - ) - return data - diff --git a/bamt/nodes/composite_continuous_node.py b/bamt/nodes/composite_continuous_node.py index 522b2b4..92ba502 100644 --- a/bamt/nodes/composite_continuous_node.py +++ b/bamt/nodes/composite_continuous_node.py @@ -1,124 +1,16 @@ -import pickle -import numpy as np -import joblib -import random -import math - -from numpy import array -from .base import BaseNode from .gaussian_node import GaussianNode from .schema import GaussianParams, HybcprobParams from sklearn import linear_model -from pandas import DataFrame -from typing import Optional, List, Union, Dict - -from ..log import logger_nodes -from sklearn.metrics import mean_squared_error as mse +from typing import Optional, Union NodeInfo = Union[GaussianParams, HybcprobParams] -class CompositeContinuousNode(BaseNode): +class CompositeContinuousNode(GaussianNode): def __init__(self, name, regressor: Optional[object] = None): super(CompositeContinuousNode, self).__init__(name) if regressor is None: regressor = linear_model.LinearRegression() self.regressor = regressor self.type = "CompositeContinuous" + f" ({type(self.regressor).__name__})" - - def fit_parameters(self, data: DataFrame, **kwargs) -> GaussianParams: - parents = self.cont_parents + self.disc_parents - if parents: - self.regressor.fit(data[parents].values, data[self.name].values, **kwargs) - predicted_value = self.regressor.predict(data[parents].values) - variance = mse(data[self.name].values, predicted_value, squared=False) - serialization = self.choose_serialization(self.regressor) - - if serialization == "pickle": - ex_b = pickle.dumps(self.regressor, protocol=4) - # model_ser = ex_b.decode('latin1').replace('\'', '\"') - model_ser = ex_b.decode("latin1") - return { - "mean": np.nan, - "regressor_obj": model_ser, - "regressor": type(self.regressor).__name__, - "variance": variance, - "serialization": "pickle", - } - else: - logger_nodes.warning( - f"{self.name}::Pickle failed. BAMT will use Joblib. | " - + str(serialization.args[0]) - ) - - path = self.get_path_joblib( - node_name=self.name.replace(" ", "_"), - specific=f"{self.name.replace(' ', '_')}", - ) - joblib.dump(self.regressor, path, compress=True, protocol=4) - return { - "mean": np.nan, - "regressor_obj": path, - "regressor": type(self.regressor).__name__, - "variance": variance, - "serialization": "joblib", - } - else: - mean_base = np.mean(data[self.name].values) - variance = np.var(data[self.name].values) - return { - "mean": mean_base, - "regressor_obj": None, - "regressor": None, - "variance": variance, - "serialization": None, - } - - @staticmethod - def choose(node_info: GaussianParams, pvals: List[float]) -> float: - """ - Return value from Logit node - params: - node_info: nodes info from distributions - pvals: parent values - """ - if pvals: - for el in pvals: - if str(el) == "nan": - return np.nan - if node_info["serialization"] == "joblib": - model = joblib.load(node_info["regressor_obj"]) - else: - a = node_info["regressor_obj"].encode("latin1") - model = pickle.loads(a) - - cond_mean = model.predict(np.array(pvals).reshape(1, -1))[0] - var = node_info["variance"] - return random.gauss(cond_mean, var) - else: - return random.gauss(node_info["mean"], math.sqrt(node_info["variance"])) - - @staticmethod - def predict(node_info: GaussianParams, pvals: List[float]) -> float: - """ - Return prediction from Logit node - params: - node_info: nodes info from distributions - pvals: parent values - """ - - if pvals: - for el in pvals: - if str(el) == "nan": - return np.nan - if node_info["serialization"] == "joblib": - model = joblib.load(node_info["regressor_obj"]) - else: - a = node_info["regressor_obj"].encode("latin1") - model = pickle.loads(a) - - pred = model.predict(np.array(pvals).reshape(1, -1))[0] - return pred - else: - return node_info["mean"] diff --git a/bamt/nodes/composite_discrete_node.py b/bamt/nodes/composite_discrete_node.py index 5ad6764..e3d9463 100644 --- a/bamt/nodes/composite_discrete_node.py +++ b/bamt/nodes/composite_discrete_node.py @@ -1,19 +1,10 @@ -import numpy as np - -import pickle -import joblib -import random - -from .base import BaseNode -from .schema import LogitParams -from bamt.log import logger_nodes +from .logit_node import LogitNode from sklearn import linear_model -from pandas import DataFrame -from typing import Optional, List, Union, Dict +from typing import Optional -class CompositeDiscreteNode(BaseNode): +class CompositeDiscreteNode(LogitNode): """ Class for composite discrete node. """ @@ -26,94 +17,3 @@ def __init__(self, name, classifier: Optional[object] = None): ) self.classifier = classifier self.type = "CompositeDiscrete" + f" ({type(self.classifier).__name__})" - - def fit_parameters(self, data: DataFrame, **kwargs) -> LogitParams: - model_ser = None - path = None - - parents = self.disc_parents + self.cont_parents - self.classifier.fit(X=data[parents].values, y=data[self.name].values, **kwargs) - serialization = self.choose_serialization(self.classifier) - - if serialization == "pickle": - ex_b = pickle.dumps(self.classifier, protocol=4) - # model_ser = ex_b.decode('latin1').replace('\'', '\"') - model_ser = ex_b.decode("latin1") - serialization_name = "pickle" - else: - logger_nodes.warning( - f"{self.name}::Pickle failed. BAMT will use Joblib. | " - + str(serialization.args[0]) - ) - - path = self.get_path_joblib(self.name, specific=self.name.replace(" ", "_")) - - joblib.dump(self.classifier, path, compress=True, protocol=4) - serialization_name = "joblib" - return { - "classes": list(self.classifier.classes_), - "classifier_obj": path or model_ser, - "classifier": type(self.classifier).__name__, - "serialization": serialization_name, - } - - @staticmethod - def choose(node_info: LogitParams, pvals: List[Union[float]]) -> str: - """ - Return value from Logit node - params: - node_info: nodes info from distributions - pvals: parent values - """ - - rindex = 0 - - if len(node_info["classes"]) > 1: - if node_info["serialization"] == "joblib": - model = joblib.load(node_info["classifier_obj"]) - else: - # str_model = node_info["classifier_obj"].decode('latin1').replace('\'', '\"') - a = node_info["classifier_obj"].encode("latin1") - model = pickle.loads(a) - distribution = model.predict_proba(np.array(pvals).reshape(1, -1))[0] - - # choose - rand = random.random() - lbound = 0 - ubound = 0 - for interval in range(len(node_info["classes"])): - ubound += distribution[interval] - if lbound <= rand < ubound: - rindex = interval - break - else: - lbound = ubound - - return str(node_info["classes"][rindex]) - - else: - return str(node_info["classes"][0]) - - @staticmethod - def predict(node_info: LogitParams, pvals: List[Union[float]]) -> str: - """ - Return prediction from Logit node - params: - node_info: nodes info from distributions - pvals: parent values - """ - - if len(node_info["classes"]) > 1: - if node_info["serialization"] == "joblib": - model = joblib.load(node_info["classifier_obj"]) - else: - # str_model = node_info["classifier_obj"].decode('latin1').replace('\'', '\"') - a = node_info["classifier_obj"].encode("latin1") - model = pickle.loads(a) - - pred = model.predict(np.array(pvals).reshape(1, -1))[0] - - return str(pred) - - else: - return str(node_info["classes"][0]) diff --git a/bamt/nodes/gaussian_node.py b/bamt/nodes/gaussian_node.py index 97a898c..019e28f 100644 --- a/bamt/nodes/gaussian_node.py +++ b/bamt/nodes/gaussian_node.py @@ -31,6 +31,8 @@ def __init__(self, name, regressor: Optional[object] = None): def fit_parameters(self, data: DataFrame, **kwargs) -> GaussianParams: parents = self.cont_parents + if type(self).__name__ == "CompositeContinuousNode": + parents = parents + self.disc_parents if parents: self.regressor.fit(data[parents].values, data[self.name].values, **kwargs) predicted_value = self.regressor.predict(data[parents].values) @@ -77,8 +79,7 @@ def fit_parameters(self, data: DataFrame, **kwargs) -> GaussianParams: "serialization": None, } - @staticmethod - def choose(node_info: GaussianParams, pvals: List[float]) -> float: + def choose(self, node_info: GaussianParams, pvals: List[float]) -> float: """ Return value from Logit node params: @@ -95,6 +96,11 @@ def choose(node_info: GaussianParams, pvals: List[float]) -> float: a = node_info["regressor_obj"].encode("latin1") model = pickle.loads(a) + if type(self).__name__ == "CompositeContinuousNode": + pvals = [ + int(item) if isinstance(item, str) else item for item in pvals + ] + cond_mean = model.predict(np.array(pvals).reshape(1, -1))[0] var = node_info["variance"] return random.gauss(cond_mean, var) diff --git a/bamt/nodes/logit_node.py b/bamt/nodes/logit_node.py index 48a1288..ba938cb 100644 --- a/bamt/nodes/logit_node.py +++ b/bamt/nodes/logit_node.py @@ -57,8 +57,7 @@ def fit_parameters(self, data: DataFrame, **kwargs) -> LogitParams: "serialization": serialization_name, } - @staticmethod - def choose(node_info: LogitParams, pvals: List[Union[float]]) -> str: + def choose(self, node_info: LogitParams, pvals: List[Union[float]]) -> str: """ Return value from Logit node params: @@ -75,6 +74,11 @@ def choose(node_info: LogitParams, pvals: List[Union[float]]) -> str: # str_model = node_info["classifier_obj"].decode('latin1').replace('\'', '\"') a = node_info["classifier_obj"].encode("latin1") model = pickle.loads(a) + + if type(self).__name__ == "CompositeDiscreteNode": + pvals = [ + int(item) if isinstance(item, str) else item for item in pvals + ] distribution = model.predict_proba(np.array(pvals).reshape(1, -1))[0] # choose