-
Notifications
You must be signed in to change notification settings - Fork 615
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add BERTScore feature descriptor and feature (#1335)
* feat: Add BERTScore feature descriptor and feature * refactor: Refactor BERTScoreFeature class and add TF-IDF weighting option * update token_counts type * Refactor BERTScoreFeature class and convert IDF scores to numpy arrays * Refactor BERTScoreFeature class and import dependencies locally * added documentation --------- Co-authored-by: Emeli Dral <[email protected]>
- Loading branch information
1 parent
89baac3
commit 6bb10cb
Showing
7 changed files
with
187 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from evidently.features.BERTScore_feature import BERTScoreFeature | ||
from evidently.features.generated_features import FeatureDescriptor | ||
from evidently.features.generated_features import GeneratedFeatures | ||
|
||
|
||
class BERTScore(FeatureDescriptor): | ||
class Config: | ||
type_alias = "evidently:descriptor:BERTScore" | ||
|
||
with_column: str | ||
|
||
def feature(self, column_name: str) -> GeneratedFeatures: | ||
return BERTScoreFeature(columns=[column_name, self.with_column], display_name=self.display_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
from collections import defaultdict | ||
from typing import ClassVar | ||
from typing import Dict | ||
from typing import List | ||
|
||
import numpy as np | ||
import pandas as pd | ||
|
||
from evidently.base_metric import ColumnName | ||
from evidently.core import ColumnType | ||
from evidently.features.generated_features import GeneratedFeature | ||
from evidently.utils.data_preprocessing import DataDefinition | ||
|
||
|
||
class BERTScoreFeature(GeneratedFeature): | ||
class Config: | ||
type_alias = "evidently:feature:BERTScoreFeature" | ||
|
||
__feature_type__: ClassVar = ColumnType.Numerical | ||
columns: List[str] | ||
model: str = "bert-base-uncased" # Pretrained BERT model | ||
tfidf_weighted: bool = False # Whether to weight embeddings with IDF | ||
|
||
def generate_feature(self, data: pd.DataFrame, data_definition: DataDefinition) -> pd.DataFrame: | ||
# Load BERT model and tokenizer | ||
from transformers import BertModel | ||
from transformers import BertTokenizer | ||
|
||
tokenizer = BertTokenizer.from_pretrained(self.model) | ||
model = BertModel.from_pretrained(self.model) | ||
|
||
# Tokenize sentences | ||
tokens_first = tokenizer( | ||
data[self.columns[0]].fillna("").tolist(), return_tensors="pt", padding=True, truncation=True | ||
) | ||
tokens_second = tokenizer( | ||
data[self.columns[1]].fillna("").tolist(), return_tensors="pt", padding=True, truncation=True | ||
) | ||
|
||
# Get embeddings | ||
embeddings_first = model(**tokens_first).last_hidden_state.detach().numpy() | ||
embeddings_second = model(**tokens_second).last_hidden_state.detach().numpy() | ||
# Obtain IDF scores | ||
idf_scores = self.compute_idf_scores(data[self.columns[0]], data[self.columns[1]], tokenizer) | ||
|
||
scores = [] | ||
for i, (emb1, emb2) in enumerate(zip(embeddings_first, embeddings_second)): | ||
recall, precision = self.calculate_scores(emb1, emb2, idf_scores, i) | ||
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 | ||
scores.append(f1_score) | ||
|
||
# Return as a DataFrame | ||
return pd.DataFrame( | ||
{ | ||
self._feature_name(): pd.Series(scores, index=data.index), | ||
} | ||
) | ||
|
||
def compute_idf_scores(self, col1: pd.Series, col2: pd.Series, tokenizer) -> tuple: | ||
# Combine reference sentences | ||
reference_sentences = pd.concat([col1, col2]).dropna().tolist() | ||
M = len(reference_sentences) | ||
|
||
# Compute IDF for each unique token | ||
token_counts: Dict[str, int] = defaultdict(int) | ||
for sentence in reference_sentences: | ||
tokens = [tokenizer.cls_token] + tokenizer.tokenize(sentence) + [tokenizer.sep_token] | ||
unique_tokens = set(tokens) | ||
for token in unique_tokens: | ||
token_counts[token] += 1 | ||
|
||
idf_scores = {token: -np.log(count / M) for token, count in token_counts.items()} | ||
|
||
# Convert IDF scores to numpy arrays | ||
def convert_to_idf_arrays(sentences): | ||
idf_arrays = [] | ||
for sentence in sentences: | ||
tokens = tokenizer.tokenize(sentence) | ||
|
||
# Add special tokens | ||
tokens = [tokenizer.cls_token] + tokens + [tokenizer.sep_token] | ||
# Compute IDF scores for each token including plus one smoothing | ||
idf_array = np.array([idf_scores.get(token, 0) + 1 for token in tokens]) | ||
idf_arrays.append(idf_array) | ||
# Pad sequences to the same length | ||
max_len = max(len(arr) for arr in idf_arrays) | ||
idf_arrays = np.array([np.pad(arr, (0, max_len - len(arr)), "constant") for arr in idf_arrays]) | ||
return idf_arrays | ||
|
||
idf_arrays1 = convert_to_idf_arrays(col1.fillna("").tolist()) | ||
idf_arrays2 = convert_to_idf_arrays(col2.fillna("").tolist()) | ||
return idf_arrays1, idf_arrays2 | ||
|
||
def max_similarity(self, embeddings1, embeddings2): | ||
# Compute max cosine similarity for each token in embeddings1 with respect to embeddings2 | ||
similarity_matrix = np.dot(embeddings1, embeddings2.T) / ( | ||
np.linalg.norm(embeddings1, axis=1, keepdims=True) * np.linalg.norm(embeddings2, axis=1, keepdims=True).T | ||
) | ||
return similarity_matrix.max(axis=1) | ||
|
||
def calculate_scores(self, emb1, emb2, idf_scores, index): | ||
if self.tfidf_weighted: | ||
weighted_scores = np.multiply(self.max_similarity(emb1, emb2), idf_scores[0][index]) | ||
recall = weighted_scores.sum() / idf_scores[0][index].sum() | ||
|
||
weighted_scores = np.multiply(self.max_similarity(emb2, emb1), idf_scores[1][index]) | ||
precision = weighted_scores.sum() / idf_scores[1][index].sum() | ||
else: | ||
recall = self.max_similarity(emb1, emb2).mean() | ||
precision = self.max_similarity(emb2, emb1).mean() | ||
return recall, precision | ||
|
||
def _feature_name(self): | ||
return "|".join(self.columns) | ||
|
||
def _as_column(self) -> "ColumnName": | ||
return self._create_column( | ||
self._feature_name(), | ||
default_display_name=f"BERTScore for {' '.join(self.columns)}.", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import pandas as pd | ||
import pytest | ||
|
||
from evidently.features.BERTScore_feature import BERTScoreFeature | ||
from evidently.pipeline.column_mapping import ColumnMapping | ||
from evidently.utils.data_preprocessing import create_data_definition | ||
|
||
test_data = [ | ||
("The quick brown fox jumps over the lazy dog", "A fast brown fox leaps over a lazy dog"), | ||
("Hello world", "Hi universe"), | ||
("Machine learning is fascinating", "Artificial intelligence is intriguing"), | ||
("I love apples", "I adore oranges"), | ||
("Python is a great programming language", "Python is an excellent coding language"), | ||
] | ||
|
||
|
||
@pytest.mark.parametrize( | ||
( | ||
"column_1", | ||
"column_2", | ||
"expected", | ||
), # expected values obtained from the BERTScore library https://github.com/Tiiiger/bert_score | ||
[ | ||
("The quick brown fox jumps over the lazy dog", "A fast brown fox leaps over a lazy dog", 0.8917), | ||
("Hello world", "Hi universe", 0.7707), | ||
("Machine learning is fascinating", "Artificial intelligence is intriguing", 0.8238), | ||
("I love apples", "I adore oranges", 0.7017), | ||
("Python is a great programming language", "Python is an excellent coding language", 0.8689), | ||
], | ||
) | ||
def test_bert_score_feature(column_1: str, column_2: str, expected: float): | ||
feature_generator = BERTScoreFeature(columns=["column_1", "column_2"], tfidf_weighted=False) | ||
data = pd.DataFrame(dict(column_1=[column_1], column_2=[column_2])) | ||
|
||
result = feature_generator.generate_feature( | ||
data=data, | ||
data_definition=create_data_definition(None, data, ColumnMapping()), | ||
) | ||
column_expected = feature_generator._feature_name() | ||
assert result[column_expected].iloc[0] == pytest.approx(expected, rel=0.1) |