Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multiple threads to insert stats in couchdb #2963

Merged
merged 2 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions listenbrainz/spark/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
handle_dump_imported,
handle_model,
handle_recommendations,
handle_user_daily_activity,
handle_user_entity,
handle_user_listening_activity,
handle_sitewide_entity,
notify_artist_relation_import,
notify_mapping_import,
Expand All @@ -30,7 +27,6 @@
handle_yim_listens_per_day,
handle_yim_listen_counts,
handle_fresh_releases,
handle_entity_listener,
handle_yim_listening_time,
handle_yim_new_artists_discovered_count,
handle_yim_artist_map,
Expand All @@ -40,7 +36,8 @@
handle_yim_playlists,
handle_yim_playlists_end, handle_echo
)
from listenbrainz.spark.spark_dataset import CouchDbDataset
from listenbrainz.spark.spark_dataset import CouchDbDataset, UserEntityStatsDataset, DailyActivityStatsDataset, \
ListeningActivityStatsDataset, EntityListenerStatsDataset
from listenbrainz.db.popularity import get_all_popularity_datasets
from listenbrainz.db.similarity import SimilarRecordingsDataset, SimilarArtistsDataset
from listenbrainz.db.tags import TagsDataset
Expand Down Expand Up @@ -77,6 +74,17 @@ def __init__(self, app):
self.internal_message_queue = Queue()
self.internal_message_ack_queue = Queue()

self.datasets = [
CouchDbDataset,
UserEntityStatsDataset,
DailyActivityStatsDataset,
ListeningActivityStatsDataset,
EntityListenerStatsDataset,
SimilarRecordingsDataset,
SimilarArtistsDataset,
TagsDataset,
*get_all_popularity_datasets()
]
self.response_handlers = {}
self.register_handlers()

Expand Down Expand Up @@ -113,29 +121,26 @@ def run(self):
except Empty:
self.app.logger.debug("Empty internal message queue")

self.stop()


def stop(self):
""" Stop running spark reader and associated handlers """
for dataset in self.datasets:
dataset.handle_shutdown()

def start(self):
""" Start running the background job processor in its own thread """
self.thread = Thread(target=self.run, name="SparkReaderBackgroundJobProcessor")
self.thread.start()

def register_handlers(self):
""" Register handlers for the Spark reader """
datasets = [
CouchDbDataset,
SimilarRecordingsDataset,
SimilarArtistsDataset,
TagsDataset,
*get_all_popularity_datasets()
]
for dataset in datasets:
for dataset in self.datasets:
self.response_handlers.update(dataset.get_handlers())

self.response_handlers.update({
"echo": handle_echo,
"user_entity": handle_user_entity,
"entity_listener": handle_entity_listener,
"user_listening_activity": handle_user_listening_activity,
"user_daily_activity": handle_user_daily_activity,
"sitewide_entity": handle_sitewide_entity,
"sitewide_listening_activity": handle_sitewide_listening_activity,
"fresh_releases": handle_fresh_releases,
Expand Down
91 changes: 91 additions & 0 deletions listenbrainz/spark/spark_dataset.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import abc
import time
from abc import ABC
from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from urllib.error import HTTPError

from flask import current_app
from more_itertools import chunked
from psycopg2.extras import execute_values
from psycopg2.sql import Identifier, SQL, Literal
from sentry_sdk import start_transaction

from listenbrainz.db import couchdb, timescale
import listenbrainz.db.stats as db_stats


class SparkDataset(ABC):
Expand Down Expand Up @@ -50,6 +55,10 @@ def get_handlers(self):
f"{self.name}_end": self.handle_end
}

def handle_shutdown(self):
""" Shutdown method invoked when spark reader is stopping """
pass


class _CouchDbDataset(SparkDataset):
""" Base class for bulk datasets stored in couchdb. """
Expand Down Expand Up @@ -102,6 +111,88 @@ def handle_end(self, message):
CouchDbDataset = _CouchDbDataset()


class _StatsDataset(SparkDataset):

def __init__(self, stats_type):
super().__init__(stats_type)
# doing empirical testing for various numbers of workers, no speedup
# was observed by raising workers to more than 2 because the bottleneck
# shifted to stats generation in spark
self.workers = 2
mayhem marked this conversation as resolved.
Show resolved Hide resolved
self.executor = ThreadPoolExecutor(max_workers=self.workers)

@abc.abstractmethod
def get_key(self, message):
pass

def insert_stats(self, database, stats_range, from_ts, to_ts, data, key):
with start_transaction(op="insert", name=f"insert {self.name} - {stats_range} stats"):
db_stats.insert(
database,
from_ts,
to_ts,
data,
key
)

def handle_insert(self, message):
database = message["database"]
stats_range = message["stats_range"]
from_ts = message["from_ts"]
to_ts = message["to_ts"]

key = self.get_key(message)

futures = []
chunk_size = len(message["data"]) // self.workers
for chunk in chunked(message["data"], chunk_size):
f = self.executor.submit(self.insert_stats, database, stats_range, from_ts, to_ts, chunk, key)
futures.append(f)

for f in as_completed(futures):
try:
f.result()
except Exception:
current_app.logger.error(f"Error in writing {self.name} stats: %s", exc_info=True)

def handle_start(self, message):
raise NotImplementedError()

def handle_end(self, message):
raise NotImplementedError()

def handle_shutdown(self):
self.executor.shutdown()


class _UserStatsDataset(_StatsDataset):

def get_key(self, message):
return "user_id"

UserEntityStatsDataset = _UserStatsDataset("user_entity")
DailyActivityStatsDataset = _UserStatsDataset("user_daily_activity")
ListeningActivityStatsDataset = _UserStatsDataset("user_listening_activity")


class _EntityListenerStatsDataset(_StatsDataset):

def __init__(self):
super().__init__("entity_listener")

def get_key(self, message):
if message["entity"] == "artists":
return "artist_mbid"
elif message["entity"] == "releases":
return "release_mbid"
elif message["entity"] == "release_groups":
return "release_group_mbid"
else:
return "recording_mbid"

EntityListenerStatsDataset = _EntityListenerStatsDataset()


class DatabaseDataset(SparkDataset, ABC):
""" A base class that makes it simple to setup swap tables workflow for datasets generated
in spark and stored in timescale db.
Expand Down