Skip to content

Commit

Permalink
middle state commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jrzkaminski committed Aug 3, 2023
1 parent f909764 commit 7eea3da
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 20 deletions.
16 changes: 11 additions & 5 deletions bamt/networks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,9 @@ def fit_parameters(self, data: pd.DataFrame, dropna: bool = True, n_jobs: int =
def worker(node):
return node.fit_parameters(data)

results = Parallel(n_jobs=n_jobs)(delayed(worker)(node) for node in self.nodes)
# results = Parallel(n_jobs=n_jobs)(delayed(worker)(node) for node in self.nodes)

results = [worker(node) for node in self.nodes]

for result, node in zip(results, self.nodes):
self.distributions[node.name] = result
Expand Down Expand Up @@ -605,7 +607,7 @@ def wrapper():
if any(pd.isnull(pvalue) for pvalue in pvals):
output[node.name] = np.nan
continue

print("PVALS \n:", pvals)
node_data = self.distributions[node.name]
if models_dir and ("hybcprob" in node_data.keys()):
for obj, obj_data in node_data["hybcprob"].items():
Expand Down Expand Up @@ -633,9 +635,13 @@ def wrapper():
return output

if progress_bar:
seq = Parallel(n_jobs=parall_count)(
delayed(wrapper)() for _ in tqdm(range(n), position=0, leave=True)
)
# seq = Parallel(n_jobs=parall_count)(
# delayed(wrapper)() for _ in tqdm(range(n), position=0, leave=True)
# )
seq = []
for _ in tqdm(range(n), position=0, leave=True):
result = wrapper()
seq.append(result)
else:
seq = Parallel(n_jobs=parall_count)(delayed(wrapper)() for _ in range(n))
seq_df = pd.DataFrame.from_dict(seq, orient="columns")
Expand Down
123 changes: 122 additions & 1 deletion bamt/networks/composite_bn.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import re
import random

import numpy as np

from tqdm import tqdm
from bamt.log import logger_network
from bamt.networks.base import BaseNetwork
import pandas as pd
from typing import Optional, Dict
from typing import Optional, Dict, Union, List
from bamt.builders.composite_builder import CompositeStructureBuilder, CompositeDefiner
from bamt.utils.composite_utils.MLUtils import MlModels

Expand Down Expand Up @@ -109,3 +115,118 @@ def set_regressor(self, regressors: Dict[str, object]):
)
else:
continue

def sample(
self,
n: int,
models_dir: Optional[str] = None,
progress_bar: bool = True,
evidence: Optional[Dict[str, Union[str, int, float]]] = None,
as_df: bool = True,
predict: bool = False,
parall_count: int = 1,
) -> Union[None, pd.DataFrame, List[Dict[str, Union[str, int, float]]]]:
"""
Sampling from Bayesian Network
n: int number of samples
evidence: values for nodes from user
parall_count: number of threads. Defaults to 1.
"""
from joblib import Parallel, delayed

random.seed()
if not self.distributions.items():
logger_network.error(
"Parameter learning wasn't done. Call fit_parameters method"
)
return None
if evidence:
for node in self.nodes:
if (node.type == "Discrete") & (node.name in evidence.keys()):
if not (isinstance(evidence[node.name], str)):
evidence[node.name] = str(int(evidence[node.name]))

def wrapper():
output = {}
for node in self.nodes:
parents = node.cont_parents + node.disc_parents
if evidence and node.name in evidence.keys():
output[node.name] = evidence[node.name]
else:
if not parents:
pvals = None
else:
if self.type == "Discrete":
pvals = [str(output[t]) for t in parents]
elif type(node).__name__ in ("CompositeDiscreteNode", "CompositeContinuousNode"):
pvals = [str(output[t]) for t in parents]
else:
pvals = [output[t] for t in parents]

# If any nan from parents, sampling from node blocked.
if any(pd.isnull(pvalue) for pvalue in pvals):
output[node.name] = np.nan
continue
print("NODE \n:", node.name)
print("PVALS \n:", pvals)
node_data = self.distributions[node.name]
if models_dir and ("hybcprob" in node_data.keys()):
for obj, obj_data in node_data["hybcprob"].items():
if "serialization" in obj_data.keys():
if "gaussian" in node.type.lower():
model_type = "regressor"
else:
model_type = "classifier"
if (
obj_data["serialization"] == "joblib"
and obj_data[f"{model_type}_obj"]
):
new_path = (
models_dir
+ f"\\{node.name.replace(' ', '_')}\\{obj}.joblib.compressed"
)
node_data["hybcprob"][obj][
f"{model_type}_obj"
] = new_path

if predict:
output[node.name] = node.predict(node_data, pvals=pvals)
else:
output[node.name] = node.choose(node_data, pvals=pvals)
return output

if progress_bar:
# seq = Parallel(n_jobs=parall_count)(
# delayed(wrapper)() for _ in tqdm(range(n), position=0, leave=True)
# )
seq = []
for _ in tqdm(range(n), position=0, leave=True):
result = wrapper()
seq.append(result)
else:
seq = Parallel(n_jobs=parall_count)(delayed(wrapper)() for _ in range(n))
seq_df = pd.DataFrame.from_dict(seq, orient="columns")
seq_df.dropna(inplace=True)
cont_nodes = [
c.name for c in self.nodes if c.type != "Discrete" and "Logit" not in c.type
]
positive_columns = [
c for c in cont_nodes if self.descriptor["signs"][c] == "pos"
]
seq_df = seq_df[(seq_df[positive_columns] >= 0).all(axis=1)]
seq_df.reset_index(inplace=True, drop=True)
seq = seq_df.to_dict("records")
sample_output = pd.DataFrame.from_dict(seq, orient="columns")

if as_df:
if self.has_logit or self.type == "Composite":
for node in self.nodes:
for feature_key, encoder in node.encoders:
sample_output[feature_key] = encoder[
feature_key
].inverse_transform(sample_output[feature_key])
pass

return sample_output
else:
return seq
2 changes: 2 additions & 0 deletions bamt/nodes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def wrapper(self, data, *args, **kwargs):
encoder = LabelEncoder()
data[column] = encoder.fit_transform(data[column])
self.encoders[column] = encoder
elif data[column].dtype in ("float64", "int64"):
continue
else:
logger_nodes.warning(
msg="Wrong datatype passed to categorical data encoder"
Expand Down
102 changes: 96 additions & 6 deletions bamt/nodes/composite_continuous_node.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import pickle
import numpy as np
import joblib
import random
import math

from .base import BaseNode
from .gaussian_node import GaussianNode
from .schema import GaussianParams, HybcprobParams
Expand All @@ -6,6 +12,8 @@
from pandas import DataFrame
from typing import Optional, List, Union

from ..log import logger_nodes
from sklearn.metrics import mean_squared_error as mse

NodeInfo = Union[GaussianParams, HybcprobParams]

Expand All @@ -18,11 +26,93 @@ def __init__(self, name, regressor: Optional[object] = None):
self.regressor = regressor
self.type = "CompositeContinuous" + f" ({type(self.regressor).__name__})"

def fit_parameters(self, data: DataFrame) -> NodeInfo:
return GaussianNode(self.name, self.regressor).fit_parameters(data)
@BaseNode.encode_categorical_data_if_any
def fit_parameters(self, data: DataFrame, **kwargs) -> GaussianParams:
parents = self.cont_parents + self.disc_parents
if parents:
self.regressor.fit(data[parents].values, data[self.name].values, **kwargs)
predicted_value = self.regressor.predict(data[parents].values)
variance = mse(data[self.name].values, predicted_value, squared=False)
serialization = self.choose_serialization(self.regressor)

if serialization == "pickle":
ex_b = pickle.dumps(self.regressor, protocol=4)
# model_ser = ex_b.decode('latin1').replace('\'', '\"')
model_ser = ex_b.decode("latin1")
return {
"mean": np.nan,
"regressor_obj": model_ser,
"regressor": type(self.regressor).__name__,
"variance": variance,
"serialization": "pickle",
}
else:
logger_nodes.warning(
f"{self.name}::Pickle failed. BAMT will use Joblib. | "
+ str(serialization.args[0])
)

path = self.get_path_joblib(
node_name=self.name.replace(" ", "_"),
specific=f"{self.name.replace(' ', '_')}",
)
joblib.dump(self.regressor, path, compress=True, protocol=4)
return {
"mean": np.nan,
"regressor_obj": path,
"regressor": type(self.regressor).__name__,
"variance": variance,
"serialization": "joblib",
}
else:
logger_nodes.warning(
msg="Composite Continuous Node should always have a parent"
)

@staticmethod
def choose(node_info: GaussianParams, pvals: List[float]) -> float:
"""
Return value from Logit node
params:
node_info: nodes info from distributions
pvals: parent values
"""
if pvals:
for el in pvals:
if str(el) == "nan":
return np.nan
if node_info["serialization"] == "joblib":
model = joblib.load(node_info["regressor_obj"])
else:
a = node_info["regressor_obj"].encode("latin1")
model = pickle.loads(a)

cond_mean = model.predict(np.array(pvals).reshape(1, -1))[0]
var = node_info["variance"]
return random.gauss(cond_mean, var)
else:
return random.gauss(node_info["mean"], math.sqrt(node_info["variance"]))

@staticmethod
def predict(node_info: GaussianParams, pvals: List[float]) -> float:
"""
Return prediction from Logit node
params:
node_info: nodes info from distributions
pvals: parent values
"""

def choose(self, node_info: NodeInfo, pvals: List[Union[str, float]]) -> float:
return GaussianNode(self.name, self.regressor).choose(node_info, pvals)
if pvals:
for el in pvals:
if str(el) == "nan":
return np.nan
if node_info["serialization"] == "joblib":
model = joblib.load(node_info["regressor_obj"])
else:
a = node_info["regressor_obj"].encode("latin1")
model = pickle.loads(a)

def predict(self, node_info: NodeInfo, pvals: List[Union[str, float]]) -> float:
return GaussianNode(self.name, self.regressor).predict(node_info, pvals)
pred = model.predict(np.array(pvals).reshape(1, -1))[0]
return pred
else:
return node_info["mean"]
Loading

0 comments on commit 7eea3da

Please sign in to comment.