Skip to content

Commit

Permalink
MobAd BigData Beeline Kazakhstan: add lift table and lift curve metri…
Browse files Browse the repository at this point in the history
…cs (#544)

* Add new method for lift table calculation

* Add annotations, docstrings and some explanations

* add lift curve and lift table metrics

* Update of functions to match requirements

* some minor updates

* Update according linter requirements

* Update according linter mypy requirements

---------

Co-authored-by: ITokhtakhunov <[email protected]>
  • Loading branch information
IlmuratTokhtakhunov and ITokhtakhunov authored Oct 25, 2023
1 parent 20ab57e commit 53bda86
Show file tree
Hide file tree
Showing 4 changed files with 415 additions and 0 deletions.
44 changes: 44 additions & 0 deletions src/evidently/calculations/classification_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,50 @@ def calculate_pr_table(binded):
return result


def calculate_lift_table(binded):
result = []
binded.sort(key=lambda item: item[1], reverse=True)
data_size = len(binded)
target_class_size = sum([x[0] for x in binded])
# we don't use declared STEP_SIZE due to specifics
# of lift metric calculation and visualization
offset = int(max(np.floor(data_size * 0.01), 1))

for step in np.arange(offset, data_size + 1, offset):
count = min(step, data_size)
prob = round(binded[min(step, data_size - 1)][1], 2)
top = round(100.0 * min(step, data_size) / data_size)
tp = sum([x[0] for x in binded[: min(step, data_size)]])
fp = count - tp
precision = round(100.0 * tp / count, 1)
recall = round(100.0 * tp / target_class_size, 1)
f1_score = round(2 / (1 / precision + 1 / recall), 1)
lift = round(recall / top, 2)
if count <= target_class_size:
max_lift = round(100.0 * count / target_class_size / top, 2)
else:
max_lift = round(100.0 / top, 2)
relative_lift = round(lift / max_lift, 2)
percent = round(100 * target_class_size / data_size, 2)
result.append(
[
top,
int(count),
prob,
int(tp),
int(fp),
precision,
recall,
f1_score,
lift,
max_lift,
relative_lift,
percent,
]
)
return result


def calculate_matrix(target: pd.Series, prediction: pd.Series, labels: List[Union[str, int]]) -> ConfusionMatrix:
sorted_labels = sorted(labels)
matrix = metrics.confusion_matrix(target, prediction, labels=sorted_labels)
Expand Down
133 changes: 133 additions & 0 deletions src/evidently/metrics/classification_performance/lift_curve_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from typing import List
from typing import Optional

import pandas as pd

from evidently.base_metric import InputData
from evidently.base_metric import MetricResult
from evidently.calculations.classification_performance import PredictionData
from evidently.calculations.classification_performance import calculate_lift_table
from evidently.calculations.classification_performance import get_prediction_data
from evidently.metrics.base_metric import Metric
from evidently.model.widget import BaseWidgetInfo
from evidently.renderers.base_renderer import MetricRenderer
from evidently.renderers.base_renderer import default_renderer
from evidently.renderers.html_widgets import TabData
from evidently.renderers.html_widgets import get_lift_plot_data
from evidently.renderers.html_widgets import header_text
from evidently.renderers.html_widgets import widget_tabs
from evidently.utils.data_operations import process_columns


class ClassificationLiftCurveResults(MetricResult):
current_lift_curve: Optional[dict] = None
reference_lift_curve: Optional[dict] = None


class ClassificationLiftCurve(Metric[ClassificationLiftCurveResults]):
def calculate(self, data: InputData) -> ClassificationLiftCurveResults:
dataset_columns = process_columns(data.current_data, data.column_mapping)
target_name = dataset_columns.utility_columns.target
prediction_name = dataset_columns.utility_columns.prediction
if target_name is None or prediction_name is None:
raise ValueError("The columns 'target' and 'prediction' " "columns should be present")
curr_predictions = get_prediction_data(data.current_data, dataset_columns, data.column_mapping.pos_label)
curr_lift_curve = self.calculate_metrics(data.current_data[target_name], curr_predictions)
ref_lift_curve = None
if data.reference_data is not None:
ref_predictions = get_prediction_data(
data.reference_data,
dataset_columns,
data.column_mapping.pos_label,
)
ref_lift_curve = self.calculate_metrics(data.reference_data[target_name], ref_predictions)
return ClassificationLiftCurveResults(
current_lift_curve=curr_lift_curve,
reference_lift_curve=ref_lift_curve,
)

def calculate_metrics(self, target_data: pd.Series, prediction: PredictionData):
labels = prediction.labels
if prediction.prediction_probas is None:
raise ValueError("Lift Curve can be calculated only " "on binary probabilistic predictions")
binaraized_target = (target_data.values.reshape(-1, 1) == labels).astype(int)
lift_curve = {}
lift_table = {}
if len(labels) <= 2:
binaraized_target = pd.DataFrame(binaraized_target[:, 0])
binaraized_target.columns = ["target"]

binded = list(
zip(
binaraized_target["target"].tolist(),
prediction.prediction_probas.iloc[:, 0].tolist(),
)
)
lift_table[int(prediction.prediction_probas.columns[0])] = calculate_lift_table(binded)

lift_curve[int(prediction.prediction_probas.columns[0])] = {
"lift": [i[8] for i in lift_table[prediction.prediction_probas.columns[0]]],
"top": [i[0] for i in lift_table[prediction.prediction_probas.columns[0]]],
"count": [i[1] for i in lift_table[prediction.prediction_probas.columns[0]]],
"prob": [i[2] for i in lift_table[prediction.prediction_probas.columns[0]]],
"tp": [i[3] for i in lift_table[prediction.prediction_probas.columns[0]]],
"fp": [i[4] for i in lift_table[prediction.prediction_probas.columns[0]]],
"precision": [i[5] for i in lift_table[prediction.prediction_probas.columns[0]]],
"recall": [i[6] for i in lift_table[prediction.prediction_probas.columns[0]]],
"f1_score": [i[7] for i in lift_table[prediction.prediction_probas.columns[0]]],
"max_lift": [i[9] for i in lift_table[prediction.prediction_probas.columns[0]]],
"relative_lift": [i[10] for i in lift_table[prediction.prediction_probas.columns[0]]],
"percent": lift_table[prediction.prediction_probas.columns[0]][0][11],
}
else:
binaraized_target = pd.DataFrame(binaraized_target)
binaraized_target.columns = labels

for label in labels:
binded = list(
zip(
binaraized_target[label].tolist(),
prediction.prediction_probas[label],
)
)
lift_table[int(label)] = calculate_lift_table(binded)

for label in labels:

lift_curve[int(prediction.prediction_probas.columns[0])] = {
"lift": [i[8] for i in lift_table[prediction.prediction_probas.columns[0]]],
"top": [i[0] for i in lift_table[prediction.prediction_probas.columns[0]]],
"count": [i[1] for i in lift_table[prediction.prediction_probas.columns[0]]],
"prob": [i[2] for i in lift_table[prediction.prediction_probas.columns[0]]],
"tp": [i[3] for i in lift_table[prediction.prediction_probas.columns[0]]],
"fp": [i[4] for i in lift_table[prediction.prediction_probas.columns[0]]],
"precision": [i[5] for i in lift_table[prediction.prediction_probas.columns[0]]],
"recall": [i[6] for i in lift_table[prediction.prediction_probas.columns[0]]],
"f1_score": [i[7] for i in lift_table[prediction.prediction_probas.columns[0]]],
"max_lift": [i[9] for i in lift_table[prediction.prediction_probas.columns[0]]],
"relative_lift": [i[10] for i in lift_table[prediction.prediction_probas.columns[0]]],
"percent": lift_table[prediction.prediction_probas.columns[0]][0][11],
}
return lift_curve


@default_renderer(wrap_type=ClassificationLiftCurve)
class ClassificationLiftCurveRenderer(MetricRenderer):
def render_html(self, obj: ClassificationLiftCurve) -> List[BaseWidgetInfo]:
current_lift_curve = obj.get_result().current_lift_curve
reference_lift_curve = obj.get_result().reference_lift_curve
if current_lift_curve is None:
return []

tab_data = get_lift_plot_data(
current_lift_curve,
reference_lift_curve,
color_options=self.color_options,
)
if len(tab_data) == 1:
return [header_text(label="Lift Curve"), tab_data[0][1]]
tabs = [TabData(name, widget) for name, widget in tab_data]
return [
header_text(label="Lift Curve"),
widget_tabs(title="", tabs=tabs),
]
162 changes: 162 additions & 0 deletions src/evidently/metrics/classification_performance/lift_table_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from typing import List
from typing import Optional

import pandas as pd

from evidently.base_metric import InputData
from evidently.base_metric import MetricResult
from evidently.calculations.classification_performance import PredictionData
from evidently.calculations.classification_performance import calculate_lift_table
from evidently.calculations.classification_performance import get_prediction_data
from evidently.metrics.base_metric import Metric
from evidently.model.widget import BaseWidgetInfo
from evidently.renderers.base_renderer import MetricRenderer
from evidently.renderers.base_renderer import default_renderer
from evidently.renderers.html_widgets import TabData
from evidently.renderers.html_widgets import WidgetSize
from evidently.renderers.html_widgets import table_data
from evidently.renderers.html_widgets import widget_tabs
from evidently.utils.data_operations import process_columns


class ClassificationLiftTableResults(MetricResult):
current_lift_table: Optional[dict] = None
reference_lift_table: Optional[dict] = None
top: Optional[int] = 10


class ClassificationLiftTable(Metric[ClassificationLiftTableResults]):
"""
Evidently metric with inherited behaviour, provides data for lift analysis
Parameters
----------
top: Optional[dict] = 10
Limit top percentiles for displaying in report
"""

top: int

def __init__(self, top: int = 10) -> None:
self.top = top

def calculate(self, data: InputData) -> ClassificationLiftTableResults:
dataset_columns = process_columns(data.current_data, data.column_mapping)
target_name = dataset_columns.utility_columns.target
prediction_name = dataset_columns.utility_columns.prediction
if target_name is None or prediction_name is None:
raise ValueError(("The columns 'target' and 'prediction' " "columns should be present"))
curr_prediction = get_prediction_data(data.current_data, dataset_columns, data.column_mapping.pos_label)
curr_lift_table = self.calculate_metrics(data.current_data[target_name], curr_prediction)
ref_lift_table = None
if data.reference_data is not None:
ref_prediction = get_prediction_data(
data.reference_data,
dataset_columns,
data.column_mapping.pos_label,
)
ref_lift_table = self.calculate_metrics(data.reference_data[target_name], ref_prediction)
return ClassificationLiftTableResults(
current_lift_table=curr_lift_table,
reference_lift_table=ref_lift_table,
top=self.top,
)

def calculate_metrics(self, target_data: pd.Series, prediction: PredictionData):
labels = prediction.labels
if prediction.prediction_probas is None:
raise ValueError("Lift Table can be calculated only on " "binary probabilistic predictions")
binaraized_target = (target_data.values.reshape(-1, 1) == labels).astype(int)
lift_table = {}
if len(labels) <= 2:
binaraized_target = pd.DataFrame(binaraized_target[:, 0])
binaraized_target.columns = ["target"]

binded = list(
zip(
binaraized_target["target"].tolist(),
prediction.prediction_probas.iloc[:, 0].tolist(),
)
)
lift_table[int(prediction.prediction_probas.columns[0])] = calculate_lift_table(binded)
else:
binaraized_target = pd.DataFrame(binaraized_target)
binaraized_target.columns = labels

for label in labels:
binded = list(
zip(
binaraized_target[label].tolist(),
prediction.prediction_probas[label],
)
)
lift_table[int(label)] = calculate_lift_table(binded)
return lift_table


@default_renderer(wrap_type=ClassificationLiftTable)
class ClassificationLiftTableRenderer(MetricRenderer):
def render_html(self, obj: ClassificationLiftTable) -> List[BaseWidgetInfo]:
reference_lift_table = obj.get_result().reference_lift_table
current_lift_table = obj.get_result().current_lift_table
top = obj.get_result().top
columns = [
"Top(%)",
"Count",
"Prob",
"TP",
"FP",
"Precision",
"Recall",
"F1 score",
"Lift",
"Max lift",
"Relative lift",
"Percent",
]
result = []
size = WidgetSize.FULL
if current_lift_table is not None:
if len(current_lift_table.keys()) == 1:
result.append(
table_data(
column_names=columns,
data=current_lift_table[list(current_lift_table.keys())[0]][:top],
title="Current: Lift Table",
size=size,
)
)
else:
tab_data = []
for label in current_lift_table.keys():
table = table_data(
column_names=columns,
data=current_lift_table[label],
title="",
size=size,
)
tab_data.append(TabData(label, table))
result.append(widget_tabs(title="Current: Lift Table", tabs=tab_data))
if reference_lift_table is not None:
if len(reference_lift_table.keys()) == 1:
result.append(
table_data(
column_names=columns,
data=reference_lift_table[list(reference_lift_table.keys())[0]][:top],
title="Reference: Lift Table",
size=size,
)
)
else:
tab_data = []
for label in reference_lift_table.keys():
table = table_data(
column_names=columns,
data=reference_lift_table[label],
title="",
size=size,
)
tab_data.append(TabData(label, table))
result.append(widget_tabs(title="Reference: Lift Table", tabs=tab_data))
return result
Loading

0 comments on commit 53bda86

Please sign in to comment.