diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 8ccf126..b1dd87d 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -21,4 +21,4 @@ jobs: aws-secret-access-key: ${{ secrets.GDBP_AWS_SECRET_ACCESS_KEY }} aws-region: us-west-2 - name: Generalized Deployments - uses: brave-intl/general-docker-build-pipeline-action@v1.0.9 + uses: brave-intl/general-docker-build-pipeline-action@v1 diff --git a/README.md b/README.md index 8f4c591..c5aa4bd 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,32 @@ # brave-news-source-suggestion -Pipeline for producing the source embedding representations and similarity matrix needed for source suggestion feature in Brave News. +Service for producing the source embedding representations and similarity matrix needed for source suggestion feature in Brave News. -## Scripts -Run the scripts in the order in which they are presented. - -`source-feed-accumulator.py`: parses periodically Brave News's feed, creating articles buckets for each source. These buckets are collected in `articles_history.csv` and catalogued by the `publisher_id` attribute. - -`sources-similarity-matrix.py`: takes in the source buckets and produces an 512-dimensional embedding for each source, built as the mean of the 512-dimensional embeddings of all articles belonging to the source, as generated by the Universal Sentence Encoder model (https://arxiv.org/abs/1803.11175). +## Installation -## Outputs - -`source_embeddings.csv`: [`index | publisher_id | 0 | 1 ... | ... 511`] stores all the 512-dimensional embeddings for each source under its `publisher_name`. - -`source_similarity_t10.json` stores the top-10 most similar sources, with similarity score, for each source. +``` +pip install -r requirements.txt +``` +## Scripts +**source-feed-accumulator.py**: parses Brave News feed periodically, collecting articles for each source in `articles_history.csv`. For each article, we store the `publisher_id` attribute. + +**sources-similarity-matrix.py**: takes as input the article history and produces a 384-dimensional embedding for each source, using the `sentence-transformer` package. More in particular: +- `all-MiniLM-L6-v2` for english language sources. +- `paraphrase-multilingual-MiniLM-L12-v2` for non-english language sources. +Once all source embeddings are generated, a pairwise source similarity matrix is produced. + +## Running locally +To collect and accumulate article history: +``` +export NO_UPLOAD=1 +export NO_DOWNLOAD=1 +python source-feed-accumulator.py +``` + +To computed source embeddings and produce the source similarity matrix: +``` +export NO_UPLOAD=1 +export NO_DOWNLOAD=1 +python sources-similarity-matrix.py +``` diff --git a/config.py b/config.py index c4a963e..1601864 100644 --- a/config.py +++ b/config.py @@ -11,12 +11,13 @@ BRAVE_TODAY_CLOUDFRONT_CANONICAL_ID = os.getenv('BRAVE_TODAY_CLOUDFRONT_CANONICAL_ID', None) LANG_REGION_MODEL_MAP = os.getenv('LANG_REGION_MODEL_MAP', [ - ('en_US', "https://tfhub.dev/google/universal-sentence-encoder/4"), - ('en_CA', "https://tfhub.dev/google/universal-sentence-encoder/4"), - ('es_ES', 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'), - ('es_MX', 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'), - ('pt_BR', 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'), - ('ja_JP', 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'), + ('en_US', "sentence-transformers/all-MiniLM-L6-v2"), + ('en_CA', "sentence-transformers/all-MiniLM-L6-v2"), + ('en_GB', "sentence-transformers/all-MiniLM-L6-v2"), + ('es_ES', 'sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2'), + ('es_MX', 'sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2'), + ('pt_BR', 'sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2'), + ('ja_JP', 'sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2'), ]) SOURCES_JSON_FILE = os.getenv('SOURCES_JSON_FILE', 'sources.{LANG_REGION}') @@ -32,3 +33,7 @@ SOURCE_SIMILARITY_T10_HR = os.getenv('SOURCE_SIMILARITY_T10_HR', "source_similarity_t10_hr.{LANG_REGION}") SOURCE_EMBEDDINGS = os.getenv('SOURCE_EMBEDDINGS', "SOURCE_EMBEDDINGS.{LANG_REGION}") + +if SENTRY_URL := os.getenv('SENTRY_URL'): + import sentry_sdk + sentry_sdk.init(dsn=SENTRY_URL, traces_sample_rate=0) diff --git a/embeddings.py b/embeddings.py new file mode 100644 index 0000000..bf62d94 --- /dev/null +++ b/embeddings.py @@ -0,0 +1,31 @@ +import numpy as np +from sentence_transformers import util +from structlog import get_logger + +import config + +EMBEDDING_DIMENSIONALITY = 384 + +logger = get_logger() + + +def compute_source_similarity(source_1, source_2, function='cosine'): + if function == 'dot': + return util.dot_score(source_1, np.transpose(source_2)) + elif function == 'cosine': + return util.pytorch_cos_sim(source_1, source_2)[0][0] + + +def get_source_representation_from_titles(titles, model): + if len(titles) < config.MINIMUM_ARTICLE_HISTORY_SIZE: + return np.zeros((1, EMBEDDING_DIMENSIONALITY)) + + return model.encode(titles).mean(axis=0) + + +def compute_source_representation_from_articles(articles_df, publisher_id, model): + publisher_bucket_df = articles_df[articles_df.publisher_id == publisher_id] + + titles = [ + title for title in publisher_bucket_df.title.to_numpy() if title is not None] + return get_source_representation_from_titles(titles, model) diff --git a/requirements.txt b/requirements.txt index 37edd34..2e6d53a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,14 @@ feedparser==6.0.10 numpy==1.23.5 pandas==1.5.1 -requests==2.28.1 +requests==2.31.0 scipy==1.9.3 -tensorflow==2.9.3 -tensorflow_text==2.9.0 -tensorflow_hub==0.12.0 -tqdm==4.64.1 +sentence-transformers==2.2.2 +sentry-sdk==1.28.1 +tqdm==4.65.0 boto3==1.26.14 botocore==1.29.14 -structlog==22.2.0 \ No newline at end of file +structlog==22.3.0 +torch==2.0.1 +torchvision==0.15.2 +transformers==4.31.0 diff --git a/source-similarity-matrix.py b/source-similarity-matrix.py index 355f8a3..b239163 100644 --- a/source-similarity-matrix.py +++ b/source-similarity-matrix.py @@ -4,55 +4,24 @@ import numpy as np import pandas as pd -import tensorflow as tf -import tensorflow_hub as hub -import tensorflow_text +from sentence_transformers import SentenceTransformer from structlog import get_logger from tqdm import tqdm import config -from utils import download_file, upload_file +from embeddings import (EMBEDDING_DIMENSIONALITY, + compute_source_representation_from_articles, + compute_source_similarity) +from utils import (clean_source_similarity_file, download_file, + get_source_id_for_title, upload_file) logger = get_logger() -def embed(input): - return model(input) - - -# Take centroid of 512-d embeddings -def get_source_representation_from_titles(titles): - source_repr = np.zeros((1, 512)) - if len(titles) < config.MINIMUM_ARTICLE_HISTORY_SIZE: - return source_repr - - for title in titles: - source_repr += embed([title])[0] - norm_repr = tf.nn.l2_normalize(source_repr / len(titles), axis=1) - return norm_repr.numpy() - - -def compute_source_similarity(source1, source2, t='dot'): - cosine_similarities = np.dot(source1, np.transpose(source2)) - clip_cosine_similarity = tf.clip_by_value(cosine_similarities, -1.0, 1.0) - score = 1.0 - tf.acos(clip_cosine_similarity) / math.pi - return score - - -def compute_source_representation_from_articles(articles_df, publisher_id): - publisher_bucket_df = articles_df[articles_df.publisher_id == publisher_id] - - source_titles = [title for title in publisher_bucket_df.title.to_numpy() if title is not None] - return get_source_representation_from_titles(source_titles) - - -def get_source_id_for_title(title, sources_df): - return sources_df[sources_df.publisher_name == title].publisher_id.to_numpy()[0] - - # Compute similarity matrix for all existing LANG_REGION pairs -for lang_region, model_url in config.LANG_REGION_MODEL_MAP: - logger.info(f"Started computing similarity matrix for {lang_region} using {model_url}") +for lang_region, model_name in config.LANG_REGION_MODEL_MAP: + logger.info( + f"Started computing similarity matrix for {lang_region} using {model_name}") pathlib.Path(config.OUTPUT_DIR).mkdir(parents=True, exist_ok=True) @@ -76,10 +45,9 @@ def get_source_id_for_title(title, sources_df): header=None) articles_df.columns = ['title', 'description', 'timestamp', 'publisher_id'] - logger.info("Loading Universal Sentence Encoder...") - module_url = model_url # @param ["https://tfhub.dev/google/universal-sentence-encoder/4", "https://tfhub.dev/google/universal-sentence-encoder-large/5"] - model = hub.load(module_url) - logger.info(f"USE ({module_url}) loaded") + logger.info("Loading Embedding Model...") + model = SentenceTransformer(model_name) + logger.info(f"Model ({model_name}) loaded") logger.info("Building sources embeddings...") publisher_ids = sources_df.publisher_id.to_numpy() @@ -87,13 +55,18 @@ def get_source_id_for_title(title, sources_df): # For each publisher, compute source representations from all stored # articles for that publisher. - reprs = np.zeros((publisher_ids.size, 512)) + reprs = np.zeros((publisher_ids.size, EMBEDDING_DIMENSIONALITY)) for i, publisher_id in tqdm(enumerate(publisher_ids)): - reprs[i, :] = compute_source_representation_from_articles(articles_df, publisher_id) + reprs[i, :] = compute_source_representation_from_articles( + articles_df, publisher_id, model) + if not reprs[i, :].any(): + logger.warning( + f"Source {sources_df[sources_df.publisher_id == publisher_id].publisher_name.item()} has no articles. Skipping...") logger.info(f"Computing sources representations for {lang_region}") sources_representation = pd.DataFrame({'publisher_id': publisher_ids}) - sources_representation = pd.concat([sources_representation, pd.DataFrame(reprs)], axis=1) + sources_representation = pd.concat( + [sources_representation, pd.DataFrame(reprs)], axis=1) sources_representation.to_csv( f'output/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv', header=None) logger.info("Finished building source embeddings.") @@ -130,19 +103,24 @@ def get_source_id_for_title(title, sources_df): # Only include suggestion if within 10% of the best match's score top_similarity_score = 0.0 if sources_ranking: - top_similarity_score = sources_ranking[0][1] + top_similarity_score = sources_ranking[0][1] similarity_cutoff = config.SIMILARITY_CUTOFF_RATIO * top_similarity_score - top10_dictionary[source_id] = [{'source': get_source_id_for_title(source[0], sources_df), 'score': source[1]} for + top10_dictionary[source_id] = [{'source': get_source_id_for_title(source[0], sources_df), 'score': source[1]} + for source in sources_ranking[:10] if source[1] > similarity_cutoff] top10_dictionary_human_readable[feed] = [{'source': source[0], 'score': source[1]} for source in sources_ranking[:10] if source[1] > similarity_cutoff] + logger.info("Removing un-matched sources") + top10_dictionary = clean_source_similarity_file( + sources_data, top10_dictionary) + logger.info("Outputting sources similarities files") - with open(f'output/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json', 'w', encoding='utf-8') as f: - json.dump(top10_dictionary, f, ensure_ascii=True) - with open(f'output/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json', 'w', encoding='utf-8') as f: - json.dump(top10_dictionary_human_readable, - f, ensure_ascii=True) + with open(f'output/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json', 'w') as f: + json.dump(top10_dictionary, f) + with open(f'output/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json', 'w') as f: + json.dump(top10_dictionary_human_readable, f) + logger.info("Script has finished running.") if not config.NO_UPLOAD: @@ -150,9 +128,11 @@ def get_source_id_for_title(title, sources_df): config.PUB_S3_BUCKET, f"source-suggestions/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json") - upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json', - config.PUB_S3_BUCKET, - f"source-suggestions/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json") + upload_file( + config.OUTPUT_DIR + "/" + + f'/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json', + config.PUB_S3_BUCKET, + f"source-suggestions/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json") upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv', config.PUB_S3_BUCKET, diff --git a/utils.py b/utils.py index 0c70ac6..06c98f9 100644 --- a/utils.py +++ b/utils.py @@ -1,6 +1,8 @@ import logging +import mimetypes import boto3 +import numpy as np from botocore.exceptions import ClientError import config @@ -17,9 +19,11 @@ def upload_file(file_name, bucket, object_name=None): if object_name is None: object_name = file_name try: + content_type = mimetypes.guess_type(file_name)[0] or 'binary/octet-stream' s3_client.upload_file(file_name, bucket, object_name, ExtraArgs={ 'GrantRead': f'id={config.BRAVE_TODAY_CLOUDFRONT_CANONICAL_ID}', - 'GrantFullControl': f'id={config.BRAVE_TODAY_CANONICAL_ID}' + 'GrantFullControl': f'id={config.BRAVE_TODAY_CANONICAL_ID}', + 'ContentType': content_type }) except ClientError as e: @@ -39,3 +43,23 @@ def download_file(file_name, bucket, object_name=None): logging.error(e) return False return True + + +def clean_source_similarity_file(sources_data, sources_sim_data): + sources_id = [sources.get("publisher_id") for sources in sources_data] + + for s_id in sources_id: + if s_id not in sources_sim_data: + sources_sim_data.pop(s_id, None) + continue + + if s_id in sources_sim_data: + for index, suggestion in enumerate(sources_sim_data[s_id]): + if suggestion["source"] not in sources_id: + sources_sim_data[s_id].pop(index) + + return sources_sim_data + + +def get_source_id_for_title(title, sources_df): + return sources_df[sources_df.publisher_name == title].publisher_id.to_numpy()[0]