Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorized sampling (WIP) #101

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
217 changes: 114 additions & 103 deletions bamt/external/pyitlib/DiscreteRandomVariableUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@


def information_mutual_conditional(
x,
y,
z,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
alphabet_x=None,
alphabet_y=None,
Alphabet_Z=None,
keep_dims=False,
x,
y,
z,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
alphabet_x=None,
alphabet_y=None,
Alphabet_Z=None,
keep_dims=False,
):
x, fill_value_X = _sanitise_array_input(x, fill_value)
y, fill_value_Y = _sanitise_array_input(y, fill_value)
Expand Down Expand Up @@ -113,6 +113,7 @@ def information_mutual_conditional(
Alphabet_Z = np.reshape(Alphabet_Z, (-1, Alphabet_Z.shape[-1]))
I = []
for i in range(z.shape[0]):

def f(X, Y, Alphabet_X, Alphabet_Y):
return information_mutual_conditional(
X,
Expand Down Expand Up @@ -150,28 +151,28 @@ def f(X, Y, Alphabet_X, Alphabet_Y):

for i in range(x.shape[0]):
I_ = (
entropy_joint(
np.vstack((x[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_x[i], Alphabet_Z[i]), fill_value),
)
+ entropy_joint(
np.vstack((y[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_y[i], Alphabet_Z[i]), fill_value),
)
- entropy_joint(
np.vstack((x[i], y[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_x[i], alphabet_y[i], Alphabet_Z[i]), fill_value),
)
- entropy_joint(z[i], base, fill_value, estimator, Alphabet_Z[i])
entropy_joint(
np.vstack((x[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_x[i], Alphabet_Z[i]), fill_value),
)
+ entropy_joint(
np.vstack((y[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_y[i], Alphabet_Z[i]), fill_value),
)
- entropy_joint(
np.vstack((x[i], y[i], z[i])),
base,
fill_value,
estimator,
_vstack_pad((alphabet_x[i], alphabet_y[i], Alphabet_Z[i]), fill_value),
)
- entropy_joint(z[i], base, fill_value, estimator, Alphabet_Z[i])
)
I[i] = I_

Expand All @@ -185,15 +186,15 @@ def f(X, Y, Alphabet_X, Alphabet_Y):


def information_mutual(
X,
Y=None,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
Alphabet_X=None,
Alphabet_Y=None,
keep_dims=False,
X,
Y=None,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
Alphabet_X=None,
Alphabet_Y=None,
keep_dims=False,
):
H_conditional = entropy_conditional(
X, Y, cartesian_product, base, fill_value, estimator, Alphabet_X, Alphabet_Y
Expand Down Expand Up @@ -236,7 +237,7 @@ def entropy_pmf(P, base=2, require_valid_pmf=True, keep_dims=False):


def entropy_joint(
X, base=2, fill_value=-1, estimator="ML", Alphabet_X=None, keep_dims=False
X, base=2, fill_value=-1, estimator="ML", Alphabet_X=None, keep_dims=False
):
X, fill_value_X = _sanitise_array_input(X, fill_value)
if Alphabet_X is not None:
Expand Down Expand Up @@ -302,15 +303,15 @@ def entropy_joint(


def entropy_conditional(
X,
Y=None,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
Alphabet_X=None,
Alphabet_Y=None,
keep_dims=False,
X,
Y=None,
cartesian_product=False,
base=2,
fill_value=-1,
estimator="ML",
Alphabet_X=None,
Alphabet_Y=None,
keep_dims=False,
):
if Y is None:
Y = X
Expand Down Expand Up @@ -485,10 +486,10 @@ def entropy(X, base=2, fill_value=-1, estimator="ML", Alphabet_X=None, keep_dims
# P_0 is the probability mass assigned to each additional empty bin
P, P_0 = _estimate_probabilities(L, estimator, n_additional_empty_bins)
H_0 = (
n_additional_empty_bins
* P_0
* -np.log2(P_0 + np.spacing(0))
/ np.log2(base)
n_additional_empty_bins
* P_0
* -np.log2(P_0 + np.spacing(0))
/ np.log2(base)
)
H[i] = entropy_pmf(P, base, require_valid_pmf=False) + H_0

Expand Down Expand Up @@ -571,7 +572,7 @@ def _map_observations_to_integers(Symbol_matrices, Fill_values):
assert len(Symbol_matrices) == len(Fill_values)
FILL_VALUE = -1
if np.any([A.dtype != "int" for A in Symbol_matrices]) or np.any(
np.array(Fill_values) != FILL_VALUE
np.array(Fill_values) != FILL_VALUE
):
L = sklearn.preprocessing.LabelEncoder()
F = [np.atleast_1d(v) for v in Fill_values]
Expand Down Expand Up @@ -606,7 +607,7 @@ def _isnan_element(x):


def _determine_number_additional_empty_bins(
Counts, Alphabet, Full_Alphabet, fill_value
Counts, Alphabet, Full_Alphabet, fill_value
):
alphabet_sizes = np.sum(np.atleast_2d(Full_Alphabet) != fill_value, axis=-1)
if np.any(alphabet_sizes != fill_value):
Expand All @@ -630,34 +631,35 @@ def _estimate_probabilities(Counts, estimator, n_additional_empty_bins=0):
# 2) James-Stein approach may be used as an alternative
# 3) Dirichlet prior may be used in all other cases

assert (np.sum(Counts) > 0)
assert (np.all(Counts.astype('int') == Counts))
assert (n_additional_empty_bins >= 0)
Counts = Counts.astype('int')
assert np.sum(Counts) > 0
assert np.all(Counts.astype("int") == Counts)
assert n_additional_empty_bins >= 0
Counts = Counts.astype("int")

if isinstance(estimator, str):
estimator = estimator.upper().replace(' ', '')
estimator = estimator.upper().replace(" ", "")

if np.isreal(estimator) or estimator in ('ML', 'PERKS', 'MINIMAX'):
if np.isreal(estimator) or estimator in ("ML", "PERKS", "MINIMAX"):
if np.isreal(estimator):
alpha = estimator
elif estimator == 'PERKS':
elif estimator == "PERKS":
alpha = 1.0 / (Counts.size + n_additional_empty_bins)
elif estimator == 'MINIMAX':
alpha = np.sqrt(np.sum(Counts)) / \
(Counts.size + n_additional_empty_bins)
elif estimator == "MINIMAX":
alpha = np.sqrt(np.sum(Counts)) / (Counts.size + n_additional_empty_bins)
else:
alpha = 0
Theta = (Counts + alpha) / \
(1.0 * np.sum(Counts) + alpha * (Counts.size + n_additional_empty_bins))
Theta = (Counts + alpha) / (
1.0 * np.sum(Counts) + alpha * (Counts.size + n_additional_empty_bins)
)
# Theta_0 is the probability mass assigned to each additional empty bin
if n_additional_empty_bins > 0:
Theta_0 = alpha / (1.0 * np.sum(Counts) +
alpha * (Counts.size + n_additional_empty_bins))
Theta_0 = alpha / (
1.0 * np.sum(Counts) + alpha * (Counts.size + n_additional_empty_bins)
)
else:
Theta_0 = 0

elif estimator == 'GOOD-TURING':
elif estimator == "GOOD-TURING":
# TODO We could also add a Chen-Chao vocabulary size estimator (See
# Bhat Suma's thesis)

Expand All @@ -679,34 +681,35 @@ def _estimate_probabilities(Counts, estimator, n_additional_empty_bins=0):

# Fit least squares regression line to plot of log(Z_r) versus log(r)
x = np.log10(np.arange(1, Z_r.size))
with np.errstate(invalid='ignore', divide='ignore'):
with np.errstate(invalid="ignore", divide="ignore"):
y = np.log10(Z_r[1:])
x = x[np.isfinite(y)]
y = y[np.isfinite(y)]
m, c = np.linalg.lstsq(np.vstack([x, np.ones(x.size)]).T, y,
rcond=None)[0]
m, c = np.linalg.lstsq(np.vstack([x, np.ones(x.size)]).T, y, rcond=None)[0]
if m >= -1:
warnings.warn("Regression slope < -1 requirement in linear "
"Good-Turing estimate not satisfied")
warnings.warn(
"Regression slope < -1 requirement in linear "
"Good-Turing estimate not satisfied"
)
# Compute smoothed value of N_r based on interpolation
# We need to refer to SmoothedN_{r+1} for all observed values of r
SmoothedN_r = np.zeros(N_r.size + 1)
SmoothedN_r[1:] = 10 ** (np.log10(np.arange(1, SmoothedN_r.size)) *
m + c)
SmoothedN_r[1:] = 10 ** (np.log10(np.arange(1, SmoothedN_r.size)) * m + c)

# Determine threshold value of r at which to use smoothed values of N_r
# (SmoothedN_r), as apposed to straightforward N_r.
# Variance of Turing estimate
with np.errstate(invalid='ignore', divide='ignore'):
VARr_T = (np.arange(N_r.size) + 1) ** 2 * \
(1.0 * np.append(N_r[1:], 0) / (N_r ** 2)) * \
(1 + np.append(N_r[1:], 0) / N_r)
with np.errstate(invalid="ignore", divide="ignore"):
VARr_T = (
(np.arange(N_r.size) + 1) ** 2
* (1.0 * np.append(N_r[1:], 0) / (N_r**2))
* (1 + np.append(N_r[1:], 0) / N_r)
)
x = (np.arange(N_r.size) + 1) * 1.0 * np.append(N_r[1:], 0) / N_r
y = (np.arange(N_r.size) + 1) * \
1.0 * SmoothedN_r[1:] / (SmoothedN_r[:-1])
assert (np.isinf(VARr_T[0]) or np.isnan(VARr_T[0]))
y = (np.arange(N_r.size) + 1) * 1.0 * SmoothedN_r[1:] / (SmoothedN_r[:-1])
assert np.isinf(VARr_T[0]) or np.isnan(VARr_T[0])
turing_is_sig_diff = np.abs(x - y) > 1.96 * np.sqrt(VARr_T)
assert (turing_is_sig_diff[0] == np.array(False))
assert turing_is_sig_diff[0] == np.array(False)
# NB: 0th element can be safely ignored, since always 0
T = np.where(turing_is_sig_diff == np.array(False))[0]
if T.size > 1:
Expand All @@ -722,8 +725,12 @@ def _estimate_probabilities(Counts, estimator, n_additional_empty_bins=0):
# objects observed r times, r>0
p_r = np.zeros(N_r.size)
N = np.sum(Counts)
p_r[1:] = (np.arange(1, N_r.size) + 1) * \
1.0 * SmoothedN_r[2:] / (SmoothedN_r[1:-1] * N)
p_r[1:] = (
(np.arange(1, N_r.size) + 1)
* 1.0
* SmoothedN_r[2:]
/ (SmoothedN_r[1:-1] * N)
)
# Estimate probability of observing any unseen symbol
p_r[0] = 1.0 * N_r[1] / N

Expand All @@ -735,30 +742,34 @@ def _estimate_probabilities(Counts, estimator, n_additional_empty_bins=0):
if np.any(Counts == 0) or n_additional_empty_bins > 0:
Theta = (1 - p_r[0]) * Theta / np.sum(Theta)
else:
warnings.warn("No unobserved outcomes specified. Disregarding the "
"probability mass allocated to any unobserved "
"outcomes.")
warnings.warn(
"No unobserved outcomes specified. Disregarding the "
"probability mass allocated to any unobserved "
"outcomes."
)
Theta = Theta / np.sum(Theta)

# Divide p_0 among unobserved symbols
with np.errstate(invalid='ignore', divide='ignore'):
p_emptybin = p_r[0] / (np.sum(Counts == 0) +
n_additional_empty_bins)
with np.errstate(invalid="ignore", divide="ignore"):
p_emptybin = p_r[0] / (np.sum(Counts == 0) + n_additional_empty_bins)
Theta[Counts == 0] = p_emptybin
# Theta_0 is the probability mass assigned to each additional empty bin
if n_additional_empty_bins > 0:
Theta_0 = p_emptybin
else:
Theta_0 = 0

elif estimator == 'JAMES-STEIN':
Theta, _ = _estimate_probabilities(Counts, 'ML')
elif estimator == "JAMES-STEIN":
Theta, _ = _estimate_probabilities(Counts, "ML")
p_uniform = 1.0 / (Counts.size + n_additional_empty_bins)
with np.errstate(invalid='ignore', divide='ignore'):
Lambda = (1 - np.sum(Theta ** 2)) / \
((np.sum(Counts) - 1) *
(np.sum((p_uniform - Theta) ** 2) +
n_additional_empty_bins * p_uniform ** 2))
with np.errstate(invalid="ignore", divide="ignore"):
Lambda = (1 - np.sum(Theta**2)) / (
(np.sum(Counts) - 1)
* (
np.sum((p_uniform - Theta) ** 2)
+ n_additional_empty_bins * p_uniform**2
)
)

if Lambda > 1:
Lambda = 1
Expand Down
4 changes: 3 additions & 1 deletion bamt/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def switch_console_out(self, value: bool):
for logger in self.loggers.values():
if self.has_handler(logger, handler_class):
self.remove_handler_type(logger, handler_class)
logger.addHandler(logging.NullHandler() if not value else logging.root.handlers[0])
logger.addHandler(
logging.NullHandler() if not value else logging.root.handlers[0]
)

def switch_file_out(self, value: bool, log_file: str):
"""
Expand Down
Loading
Loading