Skip to content

Commit

Permalink
some updates
Browse files Browse the repository at this point in the history
  • Loading branch information
gykovacs committed Oct 29, 2024
1 parent 565df3a commit f4bd4ef
Show file tree
Hide file tree
Showing 22 changed files with 2,960 additions and 1,065 deletions.
1 change: 1 addition & 0 deletions mlscorecheck/auc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
from ._auc_aggregated import *
from ._acc_aggregated import *
from ._max_acc_aggregated import *
from ._analytic import *
34 changes: 34 additions & 0 deletions mlscorecheck/auc/_analytic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import numpy as np


def auc_analytic_exponents(row):
frac = (row['sens']*row['p'] + (1 - row['spec'])*row['n']) / (row['p'] + row['n'])

print(frac)

exp_tpr = np.log(row['sens'])/np.log(frac)
exp_fpr = np.log(1 - row['spec'])/np.log(frac)



return exp_fpr, exp_tpr


def auc_analytic(row, frac = None):
if frac is None:
frac = (row['sens']*row['p'] + (1 - row['spec'])*row['n']) / (row['p'] + row['n'])

print(frac)

exp_tpr = np.log(row['sens'])/np.log(frac)
exp_fpr = np.log(1 - row['spec'])/np.log(frac)

print(exp_tpr, exp_fpr)

return float(exp_fpr/(exp_fpr + exp_tpr))

x = np.linspace(0, 1, 100)
tpr = x**exp_tpr
fpr = x**exp_fpr

return float(np.sum((fpr[1:] - fpr[:-1])*(tpr[:-1] + tpr[1:])/2))
207 changes: 203 additions & 4 deletions mlscorecheck/auc/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import numpy as np

from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score

from ..aggregated import determine_fold_configurations

__all__ = [
Expand All @@ -18,7 +21,10 @@
"simplify_roc",
"roc_value_at",
"average_roc_curves_to_1",
"average_n_roc_curves"
"average_n_roc_curves_",
"average_n_roc_curves",
"exponential_fitting",
"exponential_fitting2"
]


Expand Down Expand Up @@ -269,8 +275,14 @@ def simplify_roc(fprs, tprs, ths=None):
if tprs[idx] == tprs[idx+1] and tprs[idx-1] == tprs[idx]:
continue

if np.abs((fprs[idx-1] + fprs[idx+1])/2 - fprs[idx]) < 1e-6 and np.abs((tprs[idx-1] + tprs[idx+1])/2 - tprs[idx]) < 1e-6:
continue
if tprs[idx] != tprs[idx+1] and tprs[idx] != tprs[idx-1]:
if np.abs((fprs[idx-1] - fprs[idx])/(tprs[idx-1] - tprs[idx]) - (fprs[idx] - fprs[idx+1])/(tprs[idx] - tprs[idx+1])) < 1e-6:
continue

if fprs[idx] != fprs[idx+1] and fprs[idx] != fprs[idx-1]:
if np.abs((tprs[idx-1] - tprs[idx])/(fprs[idx-1] - fprs[idx]) - (tprs[idx] - tprs[idx+1])/(fprs[idx] - fprs[idx+1])) < 1e-6:
continue

fprs_simp.append(fprs[idx])
tprs_simp.append(tprs[idx])

Expand All @@ -290,6 +302,17 @@ def simplify_roc(fprs, tprs, ths=None):


def roc_value_at(fpr, fpr_curve, tpr_curve):
"""
Evaluates a ROC curve at a parcitular fpr value
Args:
fpr (float): the point to evaluate at
fpr_curve (np.array): the sequence of fpr values
tpr_curve (np.array): the sequence of tpr values
Returns:
float: the tpr value of the curve at fpr
"""
values = []
for idx in range(len(fpr_curve)):
if fpr_curve[idx] == fpr:
Expand All @@ -309,6 +332,16 @@ def roc_value_at(fpr, fpr_curve, tpr_curve):


def average_roc_curves_to_1(curve0, curves):
"""
Averages ROC curves to one curve
Args:
curve0 (tuple(np.array, np.array)): the curve to average to
curves (list(tuple(np.array, np.array))): the curves to average to curve0
Returns:
np.array, np.array: the average curve
"""
fprs0, tprs0 = curve0

fprs_new = []
Expand All @@ -329,7 +362,16 @@ def average_roc_curves_to_1(curve0, curves):
return np.array(fprs_new), np.array(tprs_new)


def average_n_roc_curves(curves):
def average_n_roc_curves_(curves):
"""
Determines the average of n ROC curves
Args:
curves (list(tuple(np.array, np.array))): the curves to average
Returns:
np.array, np.array: the average curve
"""
fprs_new = []
tprs_new = []

Expand All @@ -353,6 +395,163 @@ def average_n_roc_curves(curves):
fprs5 = rates[:, 0]
tprs5 = rates[:, 1]

assert np.all(np.diff(fprs5) >= -1e-10), np.min(np.diff(fprs5))
assert np.all(np.diff(tprs5) >= -1e-10), np.min(np.diff(fprs5))

fprs5, tprs5 = simplify_roc(fprs5, tprs5)

return fprs5, tprs5


def average_n_roc_curves(curves, random_state=None):
"""
Determines the average of n ROC curves
Args:
curves (list(tuple(np.array, np.array))): the curves to average
random_state (None/int/np.random.RandomState): the random state
governing the selection of the average curve when the averaging
horizontally and vertically leads to curves with the same
number of nodes
Returns:
np.array, np.array: the average curve
"""
if not isinstance(random_state, np.random.RandomState):
random_state = np.random.RandomState(random_state)

fprs0, tprs0 = average_n_roc_curves_(curves)

flipped_curves = []
for fprs, tprs in curves:
flipped_curves.append(((1 - tprs)[::-1], (1 - fprs)[::-1]))

fprs1, tprs1 = average_n_roc_curves_(flipped_curves)

fprs1, tprs1 = (1 - tprs1)[::-1], (1 - fprs1)[::-1]

if len(fprs0) == len(fprs1):
if random_state.randint(2) == 0:
return fprs0, tprs0
return fprs1, tprs1

if len(fprs0) < len(fprs1):
return fprs0, tprs0

return fprs1, tprs1


def exponential_fitting(row, label, frac_label):

values = row[label].copy()
counts = row[frac_label].copy()

mask = values > 1e-6
values_nz = values[mask]
counts_nz = counts[mask]

ln_values = np.log(values_nz)
ln_counts = np.log(counts_nz).reshape(-1, 1)

linreg_a = LinearRegression(fit_intercept=False, positive=True)
pred_values = linreg_a\
.fit(ln_counts, ln_values)\
.predict(ln_counts)

if len(values) > 3:
r2_a = r2_score(ln_values, pred_values)
else:
r2_a = 1.0

values = (1 - values)
counts = (1 - counts)

mask = values > 1e-6
values_nz = values[mask]
counts_nz = counts[mask]

ln_values = np.log(values_nz)
ln_counts = np.log(counts_nz).reshape(-1, 1)

linreg_b = LinearRegression(fit_intercept=False, positive=True)
pred_values = linreg_b\
.fit(ln_counts, ln_values)\
.predict(ln_counts)

if len(values) > 3:
r2_b = r2_score(ln_values, pred_values)
else:
r2_b = 1.0

#print(r2_a, linreg_a.coef_[0], 0)
#print(r2_b, linreg_b.coef_[0], 1)

if r2_a > r2_b:
return (r2_a, linreg_a.coef_[0], 0)
return (r2_b, linreg_b.coef_[0], 1)

if label == 'fprs':
return (r2_b, linreg_b.coef_[0], 1)
return (r2_a, linreg_a.coef_[0], 0)

def exponential_fitting2(row, label, frac_label):
values = row[label].copy()
counts = row[frac_label].copy()

if label == 'tprs':
r2, coef, _ = exponential_fitting2_(values, counts)
return r2, 1/coef, -3
else:
r2, coef, _ = exponential_fitting2_(values, 1 - counts)
return r2, coef, -3

r2a, coefa, _ = exponential_fitting2_(values, counts)
r2b, coefb, _ = exponential_fitting2_(1 - values, 1 - counts)

print(r2a, coefa, r2b, coefb)

if r2a > r2b:
return r2a, coefa, -2
else:
return r2b, 1.0/coefb, -3

def exponential_fitting2_(values, counts):

"""if len(values) <= 3:
return (1.0, 1.0, -1)"""


mask = (values > 1e-6) & (counts > 1e-6)
values_nz = values[mask]
counts_nz = counts[mask]

ln_values = np.log(values_nz)
ln_counts = np.log(counts_nz)

"""values2 = (1 - values)
counts2 = (1 - counts)
mask2 = (values2 > 1e-6) & (values2 < 1)
values2_nz = values2[mask2]
counts2_nz = counts2[mask2]"""

"""ln_values2 = 1/np.log(values2_nz)
ln_counts2 = 1/np.log(counts2_nz)"""

ln_x = np.hstack([ln_counts]).reshape(-1, 1)
ln_y = np.hstack([ln_values])

if len(ln_x) <= 1:
return (1.0, 1.0, -1)

linreg_a = LinearRegression(fit_intercept=False, positive=True)
pred_values = linreg_a\
.fit(ln_x, ln_y)\
.predict(ln_x)

if len(values) >= 3:
r2_a = r2_score(ln_y, pred_values)
else:
r2_a = 1.0

return (r2_a, linreg_a.coef_[0], -1)
Loading

0 comments on commit f4bd4ef

Please sign in to comment.