From 21a70eb0a4de96b86e9da189affdfc5cd006c610 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Fri, 26 Apr 2024 16:26:57 +0530 Subject: [PATCH] Rollback transaction after processing message in spark reader (#2852) This helps reset the connection after every message in case of temporary connection issues or invalid queries causing transaction errors. --- listenbrainz/spark/spark_reader.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/listenbrainz/spark/spark_reader.py b/listenbrainz/spark/spark_reader.py index 76798115f5..c5523a5d63 100644 --- a/listenbrainz/spark/spark_reader.py +++ b/listenbrainz/spark/spark_reader.py @@ -46,7 +46,7 @@ ) from listenbrainz.spark.spark_dataset import CouchDbDataset from listenbrainz.utils import get_fallback_connection_name -from listenbrainz.webserver import create_app +from listenbrainz.webserver import create_app, db_conn, ts_conn class SparkReader(ConsumerMixin): @@ -127,7 +127,9 @@ def process_response(self, response): self.app.logger.error("Error in the spark reader response handler: data: %s", json.dumps(response, indent=4), exc_info=True) sentry_sdk.capture_exception(e) - return + finally: + db_conn.rollback() + ts_conn.rollback() def callback(self, message: Message): """ Handle the data received from the queue and