diff --git a/src/evidently/calculations/classification_performance.py b/src/evidently/calculations/classification_performance.py index b45a73b9c2..3ffbb5675a 100644 --- a/src/evidently/calculations/classification_performance.py +++ b/src/evidently/calculations/classification_performance.py @@ -268,6 +268,50 @@ def calculate_pr_table(binded): return result +def calculate_lift_table(binded): + result = [] + binded.sort(key=lambda item: item[1], reverse=True) + data_size = len(binded) + target_class_size = sum([x[0] for x in binded]) + # we don't use declared STEP_SIZE due to specifics + # of lift metric calculation and visualization + offset = int(max(np.floor(data_size * 0.01), 1)) + + for step in np.arange(offset, data_size + 1, offset): + count = min(step, data_size) + prob = round(binded[min(step, data_size - 1)][1], 2) + top = round(100.0 * min(step, data_size) / data_size) + tp = sum([x[0] for x in binded[: min(step, data_size)]]) + fp = count - tp + precision = round(100.0 * tp / count, 1) + recall = round(100.0 * tp / target_class_size, 1) + f1_score = round(2 / (1 / precision + 1 / recall), 1) + lift = round(recall / top, 2) + if count <= target_class_size: + max_lift = round(100.0 * count / target_class_size / top, 2) + else: + max_lift = round(100.0 / top, 2) + relative_lift = round(lift / max_lift, 2) + percent = round(100 * target_class_size / data_size, 2) + result.append( + [ + top, + int(count), + prob, + int(tp), + int(fp), + precision, + recall, + f1_score, + lift, + max_lift, + relative_lift, + percent, + ] + ) + return result + + def calculate_matrix(target: pd.Series, prediction: pd.Series, labels: List[Union[str, int]]) -> ConfusionMatrix: sorted_labels = sorted(labels) matrix = metrics.confusion_matrix(target, prediction, labels=sorted_labels) diff --git a/src/evidently/metrics/classification_performance/lift_curve_metric.py b/src/evidently/metrics/classification_performance/lift_curve_metric.py new file mode 100644 index 0000000000..cb9ef751e7 --- /dev/null +++ b/src/evidently/metrics/classification_performance/lift_curve_metric.py @@ -0,0 +1,133 @@ +from typing import List +from typing import Optional + +import pandas as pd + +from evidently.base_metric import InputData +from evidently.base_metric import MetricResult +from evidently.calculations.classification_performance import PredictionData +from evidently.calculations.classification_performance import calculate_lift_table +from evidently.calculations.classification_performance import get_prediction_data +from evidently.metrics.base_metric import Metric +from evidently.model.widget import BaseWidgetInfo +from evidently.renderers.base_renderer import MetricRenderer +from evidently.renderers.base_renderer import default_renderer +from evidently.renderers.html_widgets import TabData +from evidently.renderers.html_widgets import get_lift_plot_data +from evidently.renderers.html_widgets import header_text +from evidently.renderers.html_widgets import widget_tabs +from evidently.utils.data_operations import process_columns + + +class ClassificationLiftCurveResults(MetricResult): + current_lift_curve: Optional[dict] = None + reference_lift_curve: Optional[dict] = None + + +class ClassificationLiftCurve(Metric[ClassificationLiftCurveResults]): + def calculate(self, data: InputData) -> ClassificationLiftCurveResults: + dataset_columns = process_columns(data.current_data, data.column_mapping) + target_name = dataset_columns.utility_columns.target + prediction_name = dataset_columns.utility_columns.prediction + if target_name is None or prediction_name is None: + raise ValueError("The columns 'target' and 'prediction' " "columns should be present") + curr_predictions = get_prediction_data(data.current_data, dataset_columns, data.column_mapping.pos_label) + curr_lift_curve = self.calculate_metrics(data.current_data[target_name], curr_predictions) + ref_lift_curve = None + if data.reference_data is not None: + ref_predictions = get_prediction_data( + data.reference_data, + dataset_columns, + data.column_mapping.pos_label, + ) + ref_lift_curve = self.calculate_metrics(data.reference_data[target_name], ref_predictions) + return ClassificationLiftCurveResults( + current_lift_curve=curr_lift_curve, + reference_lift_curve=ref_lift_curve, + ) + + def calculate_metrics(self, target_data: pd.Series, prediction: PredictionData): + labels = prediction.labels + if prediction.prediction_probas is None: + raise ValueError("Lift Curve can be calculated only " "on binary probabilistic predictions") + binaraized_target = (target_data.values.reshape(-1, 1) == labels).astype(int) + lift_curve = {} + lift_table = {} + if len(labels) <= 2: + binaraized_target = pd.DataFrame(binaraized_target[:, 0]) + binaraized_target.columns = ["target"] + + binded = list( + zip( + binaraized_target["target"].tolist(), + prediction.prediction_probas.iloc[:, 0].tolist(), + ) + ) + lift_table[int(prediction.prediction_probas.columns[0])] = calculate_lift_table(binded) + + lift_curve[int(prediction.prediction_probas.columns[0])] = { + "lift": [i[8] for i in lift_table[prediction.prediction_probas.columns[0]]], + "top": [i[0] for i in lift_table[prediction.prediction_probas.columns[0]]], + "count": [i[1] for i in lift_table[prediction.prediction_probas.columns[0]]], + "prob": [i[2] for i in lift_table[prediction.prediction_probas.columns[0]]], + "tp": [i[3] for i in lift_table[prediction.prediction_probas.columns[0]]], + "fp": [i[4] for i in lift_table[prediction.prediction_probas.columns[0]]], + "precision": [i[5] for i in lift_table[prediction.prediction_probas.columns[0]]], + "recall": [i[6] for i in lift_table[prediction.prediction_probas.columns[0]]], + "f1_score": [i[7] for i in lift_table[prediction.prediction_probas.columns[0]]], + "max_lift": [i[9] for i in lift_table[prediction.prediction_probas.columns[0]]], + "relative_lift": [i[10] for i in lift_table[prediction.prediction_probas.columns[0]]], + "percent": lift_table[prediction.prediction_probas.columns[0]][0][11], + } + else: + binaraized_target = pd.DataFrame(binaraized_target) + binaraized_target.columns = labels + + for label in labels: + binded = list( + zip( + binaraized_target[label].tolist(), + prediction.prediction_probas[label], + ) + ) + lift_table[int(label)] = calculate_lift_table(binded) + + for label in labels: + + lift_curve[int(prediction.prediction_probas.columns[0])] = { + "lift": [i[8] for i in lift_table[prediction.prediction_probas.columns[0]]], + "top": [i[0] for i in lift_table[prediction.prediction_probas.columns[0]]], + "count": [i[1] for i in lift_table[prediction.prediction_probas.columns[0]]], + "prob": [i[2] for i in lift_table[prediction.prediction_probas.columns[0]]], + "tp": [i[3] for i in lift_table[prediction.prediction_probas.columns[0]]], + "fp": [i[4] for i in lift_table[prediction.prediction_probas.columns[0]]], + "precision": [i[5] for i in lift_table[prediction.prediction_probas.columns[0]]], + "recall": [i[6] for i in lift_table[prediction.prediction_probas.columns[0]]], + "f1_score": [i[7] for i in lift_table[prediction.prediction_probas.columns[0]]], + "max_lift": [i[9] for i in lift_table[prediction.prediction_probas.columns[0]]], + "relative_lift": [i[10] for i in lift_table[prediction.prediction_probas.columns[0]]], + "percent": lift_table[prediction.prediction_probas.columns[0]][0][11], + } + return lift_curve + + +@default_renderer(wrap_type=ClassificationLiftCurve) +class ClassificationLiftCurveRenderer(MetricRenderer): + def render_html(self, obj: ClassificationLiftCurve) -> List[BaseWidgetInfo]: + current_lift_curve = obj.get_result().current_lift_curve + reference_lift_curve = obj.get_result().reference_lift_curve + if current_lift_curve is None: + return [] + + tab_data = get_lift_plot_data( + current_lift_curve, + reference_lift_curve, + color_options=self.color_options, + ) + if len(tab_data) == 1: + return [header_text(label="Lift Curve"), tab_data[0][1]] + tabs = [TabData(name, widget) for name, widget in tab_data] + return [ + header_text(label="Lift Curve"), + widget_tabs(title="", tabs=tabs), + ] diff --git a/src/evidently/metrics/classification_performance/lift_table_metric.py b/src/evidently/metrics/classification_performance/lift_table_metric.py new file mode 100644 index 0000000000..adcf196ee2 --- /dev/null +++ b/src/evidently/metrics/classification_performance/lift_table_metric.py @@ -0,0 +1,162 @@ +from typing import List +from typing import Optional + +import pandas as pd + +from evidently.base_metric import InputData +from evidently.base_metric import MetricResult +from evidently.calculations.classification_performance import PredictionData +from evidently.calculations.classification_performance import calculate_lift_table +from evidently.calculations.classification_performance import get_prediction_data +from evidently.metrics.base_metric import Metric +from evidently.model.widget import BaseWidgetInfo +from evidently.renderers.base_renderer import MetricRenderer +from evidently.renderers.base_renderer import default_renderer +from evidently.renderers.html_widgets import TabData +from evidently.renderers.html_widgets import WidgetSize +from evidently.renderers.html_widgets import table_data +from evidently.renderers.html_widgets import widget_tabs +from evidently.utils.data_operations import process_columns + + +class ClassificationLiftTableResults(MetricResult): + current_lift_table: Optional[dict] = None + reference_lift_table: Optional[dict] = None + top: Optional[int] = 10 + + +class ClassificationLiftTable(Metric[ClassificationLiftTableResults]): + """ + Evidently metric with inherited behaviour, provides data for lift analysis + + Parameters + ---------- + top: Optional[dict] = 10 + Limit top percentiles for displaying in report + + """ + + top: int + + def __init__(self, top: int = 10) -> None: + self.top = top + + def calculate(self, data: InputData) -> ClassificationLiftTableResults: + dataset_columns = process_columns(data.current_data, data.column_mapping) + target_name = dataset_columns.utility_columns.target + prediction_name = dataset_columns.utility_columns.prediction + if target_name is None or prediction_name is None: + raise ValueError(("The columns 'target' and 'prediction' " "columns should be present")) + curr_prediction = get_prediction_data(data.current_data, dataset_columns, data.column_mapping.pos_label) + curr_lift_table = self.calculate_metrics(data.current_data[target_name], curr_prediction) + ref_lift_table = None + if data.reference_data is not None: + ref_prediction = get_prediction_data( + data.reference_data, + dataset_columns, + data.column_mapping.pos_label, + ) + ref_lift_table = self.calculate_metrics(data.reference_data[target_name], ref_prediction) + return ClassificationLiftTableResults( + current_lift_table=curr_lift_table, + reference_lift_table=ref_lift_table, + top=self.top, + ) + + def calculate_metrics(self, target_data: pd.Series, prediction: PredictionData): + labels = prediction.labels + if prediction.prediction_probas is None: + raise ValueError("Lift Table can be calculated only on " "binary probabilistic predictions") + binaraized_target = (target_data.values.reshape(-1, 1) == labels).astype(int) + lift_table = {} + if len(labels) <= 2: + binaraized_target = pd.DataFrame(binaraized_target[:, 0]) + binaraized_target.columns = ["target"] + + binded = list( + zip( + binaraized_target["target"].tolist(), + prediction.prediction_probas.iloc[:, 0].tolist(), + ) + ) + lift_table[int(prediction.prediction_probas.columns[0])] = calculate_lift_table(binded) + else: + binaraized_target = pd.DataFrame(binaraized_target) + binaraized_target.columns = labels + + for label in labels: + binded = list( + zip( + binaraized_target[label].tolist(), + prediction.prediction_probas[label], + ) + ) + lift_table[int(label)] = calculate_lift_table(binded) + return lift_table + + +@default_renderer(wrap_type=ClassificationLiftTable) +class ClassificationLiftTableRenderer(MetricRenderer): + def render_html(self, obj: ClassificationLiftTable) -> List[BaseWidgetInfo]: + reference_lift_table = obj.get_result().reference_lift_table + current_lift_table = obj.get_result().current_lift_table + top = obj.get_result().top + columns = [ + "Top(%)", + "Count", + "Prob", + "TP", + "FP", + "Precision", + "Recall", + "F1 score", + "Lift", + "Max lift", + "Relative lift", + "Percent", + ] + result = [] + size = WidgetSize.FULL + if current_lift_table is not None: + if len(current_lift_table.keys()) == 1: + result.append( + table_data( + column_names=columns, + data=current_lift_table[list(current_lift_table.keys())[0]][:top], + title="Current: Lift Table", + size=size, + ) + ) + else: + tab_data = [] + for label in current_lift_table.keys(): + table = table_data( + column_names=columns, + data=current_lift_table[label], + title="", + size=size, + ) + tab_data.append(TabData(label, table)) + result.append(widget_tabs(title="Current: Lift Table", tabs=tab_data)) + if reference_lift_table is not None: + if len(reference_lift_table.keys()) == 1: + result.append( + table_data( + column_names=columns, + data=reference_lift_table[list(reference_lift_table.keys())[0]][:top], + title="Reference: Lift Table", + size=size, + ) + ) + else: + tab_data = [] + for label in reference_lift_table.keys(): + table = table_data( + column_names=columns, + data=reference_lift_table[label], + title="", + size=size, + ) + tab_data.append(TabData(label, table)) + result.append(widget_tabs(title="Reference: Lift Table", tabs=tab_data)) + return result diff --git a/src/evidently/renderers/html_widgets.py b/src/evidently/renderers/html_widgets.py index 0b1079100e..51a7e19800 100644 --- a/src/evidently/renderers/html_widgets.py +++ b/src/evidently/renderers/html_widgets.py @@ -728,6 +728,82 @@ def get_pr_rec_plot_data( return additional_plots +def get_lift_plot_data( + current_lift_curve: dict, + reference_lift_curve: Optional[dict], + color_options: ColorOptions, +) -> List[Tuple[str, BaseWidgetInfo]]: + """ + Forms plot data for lift metric visualization + + Parameters + ---------- + current_lift_curve: dict + Calculated lift table data for current sample + reference_lift_curve: Optional[dict] + Calculated lift table data for reference sample + color_options: ColorOptions + Standard Evidently class-collection of colors for data visualization + + Return values + ------------- + additional_plots: List[Tuple[str, BaseWidgetInfo]] + Plot objects within List + """ + additional_plots = [] + cols = 1 + subplot_titles = [""] + if reference_lift_curve is not None: + cols = 2 + subplot_titles = ["current", "reference"] + for label in current_lift_curve.keys(): + fig = make_subplots(rows=1, cols=cols, subplot_titles=subplot_titles, shared_yaxes=True) + trace = go.Scatter( + x=current_lift_curve[label]["top"], + y=current_lift_curve[label]["lift"], + mode="lines+markers", + name="Lift", + hoverinfo="text", + text=[ + f"top: {str(int(current_lift_curve[label]['top'][i]))}, " + f"lift={str(current_lift_curve[label]['lift'][i])}" + for i in range(100) + ], + legendgroup="Lift", + marker=dict( + size=6, + color=color_options.get_current_data_color(), + ), + ) + fig.add_trace(trace, 1, 1) + fig.update_xaxes(title_text="Top", row=1, col=1) + if reference_lift_curve is not None: + trace = go.Scatter( + x=reference_lift_curve[label]["top"], + y=reference_lift_curve[label]["lift"], + mode="lines+markers", + name="Lift", + hoverinfo="text", + text=[ + f"top: {str(int(reference_lift_curve[label]['top'][i]))}, " + f"lift={str(reference_lift_curve[label]['lift'][i])}" + for i in range(100) + ], + legendgroup="Lift", + showlegend=False, + marker=dict( + size=6, + color=color_options.get_current_data_color(), + ), + ) + fig.add_trace(trace, 1, 2) + fig.update_xaxes(title_text="Top", row=1, col=2) + fig.update_layout(yaxis_title="Lift", showlegend=True) + + additional_plots.append((str(label), plotly_figure(title="", figure=fig))) + return additional_plots + + def class_separation_traces_raw(df, label, target_name, color_options): traces = [] traces.append(