Skip to content

Commit

Permalink
Rollback transaction after processing message in spark reader (#2852)
Browse files Browse the repository at this point in the history
This helps reset the connection after every message in case of temporary connection
issues or invalid queries causing transaction errors.
  • Loading branch information
amCap1712 authored Apr 26, 2024
1 parent b40c5fd commit 21a70eb
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions listenbrainz/spark/spark_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
)
from listenbrainz.spark.spark_dataset import CouchDbDataset
from listenbrainz.utils import get_fallback_connection_name
from listenbrainz.webserver import create_app
from listenbrainz.webserver import create_app, db_conn, ts_conn


class SparkReader(ConsumerMixin):
Expand Down Expand Up @@ -127,7 +127,9 @@ def process_response(self, response):
self.app.logger.error("Error in the spark reader response handler: data: %s",
json.dumps(response, indent=4), exc_info=True)
sentry_sdk.capture_exception(e)
return
finally:
db_conn.rollback()
ts_conn.rollback()

def callback(self, message: Message):
""" Handle the data received from the queue and
Expand Down

0 comments on commit 21a70eb

Please sign in to comment.