Skip to content

Commit

Permalink
pre-alpha
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman223 committed Feb 5, 2022
1 parent 48db6b0 commit 681c255
Show file tree
Hide file tree
Showing 11 changed files with 960 additions and 140 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
*\.ipynb_checkpoints
__pycache__/
*.log
*.ini

31 changes: 22 additions & 9 deletions Networks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import pandas as pd
import numpy as np
import json
import os

from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from Utils import GraphUtils as gru
from pyvis.network import Network
Expand All @@ -17,7 +19,9 @@
from Builders import ParamDict

from log import logger_network
from config import config

STORAGE = config.get('NODES', 'models_storage', fallback='models_storage is not defined')

class BaseNetwork(object):
"""
Expand All @@ -40,6 +44,7 @@ def __init__(self):
self.has_logit = False
self.use_mixture = False


@property
def nodes_names(self) -> List[str]:
return [node.name for node in self.nodes]
Expand Down Expand Up @@ -81,7 +86,7 @@ def add_nodes(self, descriptor: Dict[str, Dict[str, str]]):
self.nodes = worker_1.vertices

def add_edges(self, data: pd.DataFrame, scoring_function: Union[Tuple[str, Callable], Tuple[str]],
classifier: object = None,
classifier: Optional[object] = None,
params: Optional[ParamDict] = None, optimizer: str = 'HC'):
"""
Base function for Structure learning
Expand All @@ -91,7 +96,7 @@ def add_edges(self, data: pd.DataFrame, scoring_function: Union[Tuple[str, Calla
remove_init_edges: allows changes in model defined by user
white_list: list of allowed edges
"""
if not self.has_logit:
if not self.has_logit and classifier:
logger_network.error("Classifiers dict will be ignored since logit nodes are forbidden.")
return None

Expand Down Expand Up @@ -143,6 +148,7 @@ def get_params_tree(self, outdir: str):
return None
with open(outdir, 'w+') as out:
json.dump(self.distributions, out)
return True

def fit_parameters(self, data: pd.DataFrame, dropna: bool = True):
"""
Expand All @@ -152,6 +158,16 @@ def fit_parameters(self, data: pd.DataFrame, dropna: bool = True):
data = data.dropna()
data.reset_index(inplace=True, drop=True)

if self.has_logit:
if any(['Logit' in node.type for node in self.nodes]):
if not os.path.isdir(STORAGE):
os.makedirs(os.path.join(STORAGE, "0"))
elif os.listdir(STORAGE):
index = sorted(
[int(id) for id in os.listdir(STORAGE)]
)[-1] + 1
os.makedirs(os.path.join(STORAGE, str(index)))

# Turn all discrete values to str for learning algorithm
if 'disc_num' in self.descriptor['types'].values():
columns_names = [name for name, t in self.descriptor['types'].items() if t in ['disc_num']]
Expand Down Expand Up @@ -179,7 +195,6 @@ def get_info(self, as_df: bool = True) -> Optional[pd.DataFrame]:
types_d = []
parents = []
parents_types = []
import pandas as pd
for n in self.nodes:
names.append(n)
types_n.append(n.type)
Expand Down Expand Up @@ -229,7 +244,6 @@ def sample(self, n: int, evidence: Optional[Dict[str, Union[str, int, float]]] =
return seq

def predict(self, test: pd.DataFrame, parall_count: int = 1) -> Dict[str, Union[List[str], List[int], List[float]]]:
from joblib import Parallel, delayed
"""
Function to predict columns from given data.
Note that train data and test data must have different columns.
Expand All @@ -242,23 +256,22 @@ def predict(self, test: pd.DataFrame, parall_count: int = 1) -> Dict[str, Union[
Returns:
predicted data (dict): dict with column as key and predicted data as value
"""
from joblib import Parallel, delayed

def wrapper(bn: HybridBN, test: pd.DataFrame, columns: List[str]):
preds = {column_name: list() for column_name in columns}

if len(test) == 1:
for i in range(test.shape[0]):
print(i)
test_row = dict(test.iloc[i, :])
for n, key in enumerate(columns):
try:
sample = bn.sample(2000, evidence=test_row)
if bn[key].type.startswith(('Discrete', 'Logit', 'ConditionalLogit',)):
sample = self.sample(2000, evidence=test_row)
count_stats = sample[key].value_counts()
preds[key].append(count_stats.index[0])
else:
sample = bn.sample(2000, evidence=test_row)
if self.descriptor['signs'][key] == 'pos':
if bn.descriptor['signs'][key] == 'pos':
sample = sample.loc[sample[key] >= 0]
if sample.shape[0] == 0:
preds[key].append(np.nan)
Expand All @@ -281,7 +294,7 @@ def wrapper(bn: HybridBN, test: pd.DataFrame, columns: List[str]):
preds = {column_name: list() for column_name in columns}

processed_list = Parallel(n_jobs=parall_count)(
delayed(wrapper)(self, test.loc[[i]], columns) for i in test.index)
delayed(wrapper)(self, test.loc[[i]], columns) for i in tqdm(test.index, position=0, leave=True))

for i in range(test.shape[0]):
curr_pred = processed_list[i]
Expand Down
135 changes: 77 additions & 58 deletions Nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import itertools
import random
import joblib
import pickle
import os

from log import logger_nodes
from config import config

STORAGE = config.get('NODES', 'models_storage', fallback='models_storage is not defined')


class DiscreteParams(TypedDict):
Expand Down Expand Up @@ -44,6 +48,17 @@ def __init__(self, name: str):
def __repr__(self):
return f"{self.name}"

@staticmethod
def choose_serialization(model) -> Union[str, Exception]:
try:
ex_b = pickle.dumps(model, protocol=4)
model_ser = ex_b.decode('latin1').replace('\'', '\"')
a = model_ser.replace('\"', '\'').encode('latin1')
classifier_body = pickle.loads(a)
return 'pickle'
except Exception as ex:
return ex


class DiscreteNode(BaseNode):
"""
Expand Down Expand Up @@ -463,7 +478,8 @@ def choose(node_info: Dict[str, Dict[str, CondMixtureGaussParams]],
class LogitParams(TypedDict):
classes: List[int]
classifier: str
classifier_obj: Optional[Union[str, bool, object]]
classifier_obj: Optional[Union[str, bool, bytes]]
serialization: str


class LogitNode(BaseNode):
Expand All @@ -479,28 +495,33 @@ def __init__(self, name, classifier: Optional[object] = None):
self.type = 'Logit' + f" ({type(self.classifier).__name__})"

def fit_parameters(self, data: DataFrame) -> LogitParams:
if not os.path.isdir(f"{self.name.replace(' ', '_')}"):
os.mkdir(f"{self.name.replace(' ', '_')}")
parents = self.disc_parents + self.cont_parents
self.classifier.fit(data[parents].values, data[self.name].values)
# Saving model by serializing it with Joblib module
try:
path = os.path.abspath(f"{self.name.replace(' ', '_')}/{self.name.replace(' ', '_')}.joblib.compressed")
joblib.dump(self.classifier, path, compress=True, protocol=4)
model_ser = joblib.load(path)
except Exception as ex:
path = False
logger_nodes.warning(
f"{self.name}::Joblib failed. BAMT will save entire model as python object. | " + ex.args[0])
serialization = self.choose_serialization(self.classifier)

if path:
if serialization == 'pickle':
ex_b = pickle.dumps(self.classifier, protocol=4)
# model_ser = ex_b.decode('latin1').replace('\'', '\"')
model_ser = ex_b.decode('latin1')
return {'classes': list(self.classifier.classes_),
'classifier_obj': path,
'classifier': type(self.classifier).__name__}
'classifier_obj': model_ser,
'classifier': type(self.classifier).__name__,
'serialization': 'pickle'}
else:
logger_nodes.warning(f"{self.name}::Pickle failed. BAMT will use Joblib. | " + str(serialization.args[0]))
index = str(int(os.listdir(STORAGE)[-1]))
if not os.path.isdir(os.path.join(STORAGE, index, f"{self.name.replace(' ', '_')}")):
os.makedirs(os.path.join(STORAGE, index, f"{self.name.replace(' ', '_')}"))
path = os.path.abspath(os.path.join(STORAGE,
index,
f"{self.name.replace(' ', '_')}",
f"{self.name.replace(' ', '_')}.joblib.compressed"))

joblib.dump(self.classifier, path, compress=True, protocol=4)
return {'classes': list(self.classifier.classes_),
'classifier_obj': self.classifier,
'classifier': type(self.classifier).__name__}
'classifier_obj': path,
'classifier': type(self.classifier).__name__,
'serialization': 'joblib'}

def choose(self, node_info: LogitParams, pvals: List[Union[str, float]]) -> str:
"""
Expand All @@ -512,12 +533,14 @@ def choose(self, node_info: LogitParams, pvals: List[Union[str, float]]) -> str:
pvals = [str(p) for p in pvals]

rindex = 0
# JOBLIB

if len(node_info["classes"]) > 1:
if isinstance(node_info["classifier_obj"], str):
if node_info["serialization"] == 'joblib':
model = joblib.load(node_info["classifier_obj"])
else:
model = node_info["classifier_obj"]
# str_model = node_info["classifier_obj"].decode('latin1').replace('\'', '\"')
a = node_info["classifier_obj"].encode('latin1')
model = pickle.loads(a)

distribution = model.predict_proba(np.array(pvals).reshape(1, -1))[0]

Expand Down Expand Up @@ -557,8 +580,6 @@ def fit_parameters(self, data: DataFrame) -> Dict[str, Dict[str, LogitParams]]:
Return:
{"hybcprob": {<combination of outputs from discrete parents> : LogitParams}}
"""
if not os.path.isdir(f"{self.name.replace(' ', '_')}"):
os.mkdir(f"{self.name.replace(' ', '_')}")
hycprob = dict()
values = []
combinations = []
Expand All @@ -578,46 +599,43 @@ def fit_parameters(self, data: DataFrame) -> Dict[str, Dict[str, LogitParams]]:
model = self.classifier
values = set(new_data[self.name])
if len(values) > 1:
if self.name == 'Period' or comb in [['BACKARC', 'LIMESTONE'], ['BACKARC', 'LOW-RESISTIVITY SANDSTONE']]:
# print('BP')
pass
model.fit(new_data[self.cont_parents].values, new_data[self.name].values)
classes = model.classes_
# Saving model by serializing it with Pickle module
# try:
# ex_b = pickle.dumps(model, protocol=4)
# model_ser = ex_b.decode('latin1').replace('\'', '\"')
# a = model_ser.replace('\"', '\'').encode('latin1')
# classifier_body = pickle.loads(a)
# model = None # clear memory
# except Exception as ex:
# model_ser = False
# logger_nodes.warning(
# f"{self.name} {comb}::Pickle failed. BAMT will save entire model as python object. | " +
# ex.args[
# 0])
try:
path = os.path.abspath(f"{self.name.replace(' ', '_')}/{str(key_comb)}.joblib.compressed")
joblib.dump(self.classifier, path, compress=True, protocol=4)
model_ser = joblib.load(path)
except Exception as ex:
path = False
classes = list(model.classes_)
serialization = self.choose_serialization(model)

if serialization == 'pickle':
ex_b = pickle.dumps(self.classifier, protocol=4)
model_ser = ex_b.decode('latin1')

# model_ser = pickle.dumps(self.classifier, protocol=4)
hycprob[str(key_comb)] = {'classes': classes,
'classifier_obj': model_ser,
'classifier': type(self.classifier).__name__,
'serialization': 'pickle'}
else:
logger_nodes.warning(
f"{self.name} {comb}::Joblib failed. BAMT will save entire model as python object. | " +
ex.args[0])
f"{self.name} {comb}::Pickle failed. BAMT will use Joblib. | " + str(serialization.args[0]))
index = str(int(os.listdir(STORAGE)[-1]))
if not os.path.isdir(os.path.join(STORAGE, index, f"{self.name.replace(' ', '_')}")):
os.makedirs(os.path.join(STORAGE, index, f"{self.name.replace(' ', '_')}"))
path = os.path.abspath(os.path.join(STORAGE,
index,
f"{self.name.replace(' ', '_')}",
f"{comb}.joblib.compressed"))

joblib.dump(model, path, compress=True, protocol=4)
hycprob[str(key_comb)] = {'classes': classes,
'classifier_obj': path,
'classifier': type(self.classifier).__name__,
'serialization': 'joblib'}
else:
path = 'unknown'
classes = list(values)
hycprob[str(key_comb)] = {'classes': classes, 'classifier': type(self.classifier).__name__,
'classifier_obj': None, 'serialization': None}

if not path:
hycprob[str(key_comb)] = {'classes': list(classes), 'classifier': type(model).__name__,
'classifier_obj': model}
else:
hycprob[str(key_comb)] = {'classes': list(classes), 'classifier': type(model).__name__,
'classifier_obj': path}
else:
hycprob[str(key_comb)] = {'classes': list(classes), 'classifier': type(self.classifier).__name__,
'classifier_obj': None}
'classifier_obj': None, 'serialization': None}
return {"hybcprob": hycprob}

def choose(self, node_info: Dict[str, Dict[str, LogitParams]], pvals: List[Union[str, float]]) -> str:
Expand All @@ -640,11 +658,12 @@ def choose(self, node_info: Dict[str, Dict[str, LogitParams]], pvals: List[Union

# JOBLIB
if len(lgdistribution["classes"]) > 1:
if isinstance(lgdistribution["classifier_obj"], str):
if lgdistribution["serialization"] == 'joblib':
model = joblib.load(lgdistribution["classifier_obj"])
else:
model = lgdistribution["classifier_obj"]

# str_model = lgdistribution["classifier_obj"].decode('latin1').replace('\'', '\"')
bytes_model = lgdistribution["classifier_obj"].encode('latin1')
model = pickle.loads(bytes_model)

distribution = model.predict_proba(np.array(lgpvals).reshape(1, -1))[0]

Expand Down
2 changes: 2 additions & 0 deletions Tests/ConditionalGaussiansTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
t2 = time.time()
print(f'PL elaspsed: {t2 - t1}')

bn.get_params_tree("final.json")

# # bn.plot('Hybrid_hackp')
# for num, el in enumerate(bn.sample(10, as_df=False), 1):
# print('\n', num)
Expand Down
4 changes: 0 additions & 4 deletions Tests/HybridTest1.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,3 @@
print(f'PL elaspsed: {t2-t1}')
# for node, d in bn.distributions.items():
# print(node,":", d)

import json
with open("../test_hybrid_out.json", 'w+') as out:
json.dump(bn.distributions, out)
Loading

0 comments on commit 681c255

Please sign in to comment.