Skip to content

Commit

Permalink
black formatting and imports
Browse files Browse the repository at this point in the history
  • Loading branch information
jrzkaminski committed Dec 29, 2023
1 parent af4d8dc commit 689f164
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 111 deletions.
217 changes: 114 additions & 103 deletions bamt/external/pyitlib/DiscreteRandomVariableUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@


def information_mutual_conditional(
x,
y,
z,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
alphabet_x=None,
alphabet_y=None,
Alphabet_Z=None,
keep_dims=False,
x,
y,
z,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
alphabet_x=None,
alphabet_y=None,
Alphabet_Z=None,
keep_dims=False,
):
x, fill_value_X = _sanitise_array_input(x, fill_value)
y, fill_value_Y = _sanitise_array_input(y, fill_value)
Expand Down Expand Up @@ -113,6 +113,7 @@ def information_mutual_conditional(
Alphabet_Z = np.reshape(Alphabet_Z, (-1, Alphabet_Z.shape[-1]))
I = []
for i in range(z.shape[0]):

def f(X, Y, Alphabet_X, Alphabet_Y):
return information_mutual_conditional(
X,
Expand Down Expand Up @@ -150,28 +151,28 @@ def f(X, Y, Alphabet_X, Alphabet_Y):

for i in range(x.shape[0]):
I_ = (
entropy_joint(
np.vstack((x[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_x[i], Alphabet_Z[i]), fill_value),
)
+ entropy_joint(
np.vstack((y[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_y[i], Alphabet_Z[i]), fill_value),
)
- entropy_joint(
np.vstack((x[i], y[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_x[i], alphabet_y[i], Alphabet_Z[i]), fill_value),
)
- entropy_joint(z[i], base, fill_value, estimator, Alphabet_Z[i])
entropy_joint(
np.vstack((x[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_x[i], Alphabet_Z[i]), fill_value),
)
+ entropy_joint(
np.vstack((y[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_y[i], Alphabet_Z[i]), fill_value),
)
- entropy_joint(
np.vstack((x[i], y[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_x[i], alphabet_y[i], Alphabet_Z[i]), fill_value),
)
- entropy_joint(z[i], base, fill_value, estimator, Alphabet_Z[i])
)
I[i] = I_

Expand All @@ -185,15 +186,15 @@ def f(X, Y, Alphabet_X, Alphabet_Y):


def information_mutual(
X,
Y=None,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
Alphabet_X=None,
Alphabet_Y=None,
keep_dims=False,
X,
Y=None,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
Alphabet_X=None,
Alphabet_Y=None,
keep_dims=False,
):
H_conditional = entropy_conditional(
X, Y, cartesian_product, base, fill_value, estimator, Alphabet_X, Alphabet_Y
Expand Down Expand Up @@ -236,7 +237,7 @@ def entropy_pmf(P, base=2, require_valid_pmf=True, keep_dims=False):


def entropy_joint(
X, base=2, fill_value=-1, estimator="ML", Alphabet_X=None, keep_dims=False
X, base=2, fill_value=-1, estimator="ML", Alphabet_X=None, keep_dims=False
):
X, fill_value_X = _sanitise_array_input(X, fill_value)
if Alphabet_X is not None:
Expand Down Expand Up @@ -302,15 +303,15 @@ def entropy_joint(


def entropy_conditional(
X,
Y=None,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
Alphabet_X=None,
Alphabet_Y=None,
keep_dims=False,
X,
Y=None,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
Alphabet_X=None,
Alphabet_Y=None,
keep_dims=False,
):
if Y is None:
Y = X
Expand Down Expand Up @@ -485,10 +486,10 @@ def entropy(X, base=2, fill_value=-1, estimator="ML", Alphabet_X=None, keep_dims
# P_0 is the probability mass assigned to each additional empty bin
P, P_0 = _estimate_probabilities(L, estimator, n_additional_empty_bins)
H_0 = (
n_additional_empty_bins
* P_0
* -np.log2(P_0 + np.spacing(0))
/ np.log2(base)
n_additional_empty_bins
* P_0
* -np.log2(P_0 + np.spacing(0))
/ np.log2(base)
)
H[i] = entropy_pmf(P, base, require_valid_pmf=False) + H_0

Expand Down Expand Up @@ -571,7 +572,7 @@ def _map_observations_to_integers(Symbol_matrices, Fill_values):
assert len(Symbol_matrices) == len(Fill_values)
FILL_VALUE = -1
if np.any([A.dtype != "int" for A in Symbol_matrices]) or np.any(
np.array(Fill_values) != FILL_VALUE
np.array(Fill_values) != FILL_VALUE
):
L = sklearn.preprocessing.LabelEncoder()
F = [np.atleast_1d(v) for v in Fill_values]
Expand Down Expand Up @@ -606,7 +607,7 @@ def _isnan_element(x):


def _determine_number_additional_empty_bins(
Counts, Alphabet, Full_Alphabet, fill_value
Counts, Alphabet, Full_Alphabet, fill_value
):
alphabet_sizes = np.sum(np.atleast_2d(Full_Alphabet) != fill_value, axis=-1)
if np.any(alphabet_sizes != fill_value):
Expand All @@ -630,34 +631,35 @@ def _estimate_probabilities(Counts, estimator, n_additional_empty_bins=0):
# 2) James-Stein approach may be used as an alternative
# 3) Dirichlet prior may be used in all other cases

assert (np.sum(Counts) > 0)
assert (np.all(Counts.astype('int') == Counts))
assert (n_additional_empty_bins >= 0)
Counts = Counts.astype('int')
assert np.sum(Counts) > 0
assert np.all(Counts.astype("int") == Counts)
assert n_additional_empty_bins >= 0
Counts = Counts.astype("int")

if isinstance(estimator, str):
estimator = estimator.upper().replace(' ', '')
estimator = estimator.upper().replace(" ", "")

if np.isreal(estimator) or estimator in ('ML', 'PERKS', 'MINIMAX'):
if np.isreal(estimator) or estimator in ("ML", "PERKS", "MINIMAX"):
if np.isreal(estimator):
alpha = estimator
elif estimator == 'PERKS':
elif estimator == "PERKS":
alpha = 1.0 / (Counts.size + n_additional_empty_bins)
elif estimator == 'MINIMAX':
alpha = np.sqrt(np.sum(Counts)) / \
(Counts.size + n_additional_empty_bins)
elif estimator == "MINIMAX":
alpha = np.sqrt(np.sum(Counts)) / (Counts.size + n_additional_empty_bins)
else:
alpha = 0
Theta = (Counts + alpha) / \
(1.0 * np.sum(Counts) + alpha * (Counts.size + n_additional_empty_bins))
Theta = (Counts + alpha) / (
1.0 * np.sum(Counts) + alpha * (Counts.size + n_additional_empty_bins)
)
# Theta_0 is the probability mass assigned to each additional empty bin
if n_additional_empty_bins > 0:
Theta_0 = alpha / (1.0 * np.sum(Counts) +
alpha * (Counts.size + n_additional_empty_bins))
Theta_0 = alpha / (
1.0 * np.sum(Counts) + alpha * (Counts.size + n_additional_empty_bins)
)
else:
Theta_0 = 0

elif estimator == 'GOOD-TURING':
elif estimator == "GOOD-TURING":
# TODO We could also add a Chen-Chao vocabulary size estimator (See
# Bhat Suma's thesis)

Expand All @@ -679,34 +681,35 @@ def _estimate_probabilities(Counts, estimator, n_additional_empty_bins=0):

# Fit least squares regression line to plot of log(Z_r) versus log(r)
x = np.log10(np.arange(1, Z_r.size))
with np.errstate(invalid='ignore', divide='ignore'):
with np.errstate(invalid="ignore", divide="ignore"):
y = np.log10(Z_r[1:])
x = x[np.isfinite(y)]
y = y[np.isfinite(y)]
m, c = np.linalg.lstsq(np.vstack([x, np.ones(x.size)]).T, y,
rcond=None)[0]
m, c = np.linalg.lstsq(np.vstack([x, np.ones(x.size)]).T, y, rcond=None)[0]
if m >= -1:
warnings.warn("Regression slope < -1 requirement in linear "
"Good-Turing estimate not satisfied")
warnings.warn(
"Regression slope < -1 requirement in linear "
"Good-Turing estimate not satisfied"
)
# Compute smoothed value of N_r based on interpolation
# We need to refer to SmoothedN_{r+1} for all observed values of r
SmoothedN_r = np.zeros(N_r.size + 1)
SmoothedN_r[1:] = 10 ** (np.log10(np.arange(1, SmoothedN_r.size)) *
m + c)
SmoothedN_r[1:] = 10 ** (np.log10(np.arange(1, SmoothedN_r.size)) * m + c)

# Determine threshold value of r at which to use smoothed values of N_r
# (SmoothedN_r), as apposed to straightforward N_r.
# Variance of Turing estimate
with np.errstate(invalid='ignore', divide='ignore'):
VARr_T = (np.arange(N_r.size) + 1) ** 2 * \
(1.0 * np.append(N_r[1:], 0) / (N_r ** 2)) * \
(1 + np.append(N_r[1:], 0) / N_r)
with np.errstate(invalid="ignore", divide="ignore"):
VARr_T = (
(np.arange(N_r.size) + 1) ** 2
* (1.0 * np.append(N_r[1:], 0) / (N_r**2))
* (1 + np.append(N_r[1:], 0) / N_r)
)
x = (np.arange(N_r.size) + 1) * 1.0 * np.append(N_r[1:], 0) / N_r
y = (np.arange(N_r.size) + 1) * \
1.0 * SmoothedN_r[1:] / (SmoothedN_r[:-1])
assert (np.isinf(VARr_T[0]) or np.isnan(VARr_T[0]))
y = (np.arange(N_r.size) + 1) * 1.0 * SmoothedN_r[1:] / (SmoothedN_r[:-1])
assert np.isinf(VARr_T[0]) or np.isnan(VARr_T[0])
turing_is_sig_diff = np.abs(x - y) > 1.96 * np.sqrt(VARr_T)
assert (turing_is_sig_diff[0] == np.array(False))
assert turing_is_sig_diff[0] == np.array(False)
# NB: 0th element can be safely ignored, since always 0
T = np.where(turing_is_sig_diff == np.array(False))[0]
if T.size > 1:
Expand All @@ -722,8 +725,12 @@ def _estimate_probabilities(Counts, estimator, n_additional_empty_bins=0):
# objects observed r times, r>0
p_r = np.zeros(N_r.size)
N = np.sum(Counts)
p_r[1:] = (np.arange(1, N_r.size) + 1) * \
1.0 * SmoothedN_r[2:] / (SmoothedN_r[1:-1] * N)
p_r[1:] = (
(np.arange(1, N_r.size) + 1)
* 1.0
* SmoothedN_r[2:]
/ (SmoothedN_r[1:-1] * N)
)
# Estimate probability of observing any unseen symbol
p_r[0] = 1.0 * N_r[1] / N

Expand All @@ -735,30 +742,34 @@ def _estimate_probabilities(Counts, estimator, n_additional_empty_bins=0):
if np.any(Counts == 0) or n_additional_empty_bins > 0:
Theta = (1 - p_r[0]) * Theta / np.sum(Theta)
else:
warnings.warn("No unobserved outcomes specified. Disregarding the "
"probability mass allocated to any unobserved "
"outcomes.")
warnings.warn(
"No unobserved outcomes specified. Disregarding the "
"probability mass allocated to any unobserved "
"outcomes."
)
Theta = Theta / np.sum(Theta)

# Divide p_0 among unobserved symbols
with np.errstate(invalid='ignore', divide='ignore'):
p_emptybin = p_r[0] / (np.sum(Counts == 0) +
n_additional_empty_bins)
with np.errstate(invalid="ignore", divide="ignore"):
p_emptybin = p_r[0] / (np.sum(Counts == 0) + n_additional_empty_bins)
Theta[Counts == 0] = p_emptybin
# Theta_0 is the probability mass assigned to each additional empty bin
if n_additional_empty_bins > 0:
Theta_0 = p_emptybin
else:
Theta_0 = 0

elif estimator == 'JAMES-STEIN':
Theta, _ = _estimate_probabilities(Counts, 'ML')
elif estimator == "JAMES-STEIN":
Theta, _ = _estimate_probabilities(Counts, "ML")
p_uniform = 1.0 / (Counts.size + n_additional_empty_bins)
with np.errstate(invalid='ignore', divide='ignore'):
Lambda = (1 - np.sum(Theta ** 2)) / \
((np.sum(Counts) - 1) *
(np.sum((p_uniform - Theta) ** 2) +
n_additional_empty_bins * p_uniform ** 2))
with np.errstate(invalid="ignore", divide="ignore"):
Lambda = (1 - np.sum(Theta**2)) / (
(np.sum(Counts) - 1)
* (
np.sum((p_uniform - Theta) ** 2)
+ n_additional_empty_bins * p_uniform**2
)
)

if Lambda > 1:
Lambda = 1
Expand Down
4 changes: 3 additions & 1 deletion bamt/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def switch_console_out(self, value: bool):
for logger in self.loggers.values():
if self.has_handler(logger, handler_class):
self.remove_handler_type(logger, handler_class)
logger.addHandler(logging.NullHandler() if not value else logging.root.handlers[0])
logger.addHandler(
logging.NullHandler() if not value else logging.root.handlers[0]
)

def switch_file_out(self, value: bool, log_file: str):
"""
Expand Down
6 changes: 3 additions & 3 deletions bamt/nodes/conditional_mixture_gaussian_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ def choose(
mean, covariance, w = self.get_dist(node_info, pvals)

# check if w is nan or list of weights
if not isinstance(w, np.ndarray):
if not isinstance(w, np.ndarray):
return np.nan

n_comp = len(w)

gmm = GMM(
n_components=n_comp,
priors=w,
Expand Down
3 changes: 1 addition & 2 deletions bamt/utils/composite_utils/MLUtils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import json
from random import choice

import pkg_resources
from typing import Union

import pkg_resources
from catboost import CatBoostClassifier, CatBoostRegressor
from golem.core.dag.graph_node import GraphNode
from sklearn.cluster import KMeans
Expand Down
1 change: 0 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.

import datetime
import sys
from pathlib import Path

Expand Down
1 change: 0 additions & 1 deletion tests/sendingRegressors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from catboost import CatBoostRegressor
from sklearn import preprocessing as pp
from sklearn.ensemble import RandomForestRegressor

# from sklearn.linear_model import ElasticNet
from sklearn.tree import DecisionTreeRegressor

Expand Down

0 comments on commit 689f164

Please sign in to comment.