Skip to content

Commit

Permalink
Pin submission post processing actions to master database (#2694)
Browse files Browse the repository at this point in the history
* pin submission post processing actions to master database

The post processing actions are part of the write flow and should be pinned to master database. This helps the replicas be more available to serve clients

* fix module duplicate import

* suppress warning import-outside-toplevel
  • Loading branch information
kelvin-muchiri authored Sep 4, 2024
1 parent 2bf3f6a commit 2f530ae
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions onadata/apps/logger/models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext as _
from multidb.pinning import use_master

from celery import current_task
from deprecated import deprecated
Expand Down Expand Up @@ -227,9 +228,6 @@ def update_xform_submission_count_async(self, instance_id, created):
def update_xform_submission_count(instance_id, created):
"""Updates the XForm submissions count on a new submission being created."""
if created:
# pylint: disable=import-outside-toplevel
from multidb.pinning import use_master

with use_master:
try:
instance = (
Expand Down Expand Up @@ -268,11 +266,13 @@ def update_xform_submission_count(instance_id, created):
safe_delete(f"{DATAVIEW_COUNT}{instance.xform_id}")
safe_delete(f"{XFORM_COUNT}{instance.xform_id}")
# Clear project cache
# pylint: disable=import-outside-toplevel
from onadata.apps.logger.models.xform import clear_project_cache

clear_project_cache(instance.xform.project_id)


@use_master
@transaction.atomic()
def _update_xform_submission_count_delete(instance):
"""Updates the XForm submissions count on deletion of a submission."""
Expand Down Expand Up @@ -335,17 +335,19 @@ def save_full_json_async(self, instance_id):
Celery task to asynchrounously generate and save an Instances JSON
once a submission has been made
"""
try:
instance = Instance.objects.get(pk=instance_id)
except Instance.DoesNotExist as e:
if self.request.retries > 2:
msg = f"Failed to save full JSON for Instance {instance_id}"
report_exception(msg, e, sys.exc_info())
self.retry(exc=e, countdown=60 * self.request.retries)
else:
save_full_json(instance)
with use_master:
try:
instance = Instance.objects.get(pk=instance_id)
except Instance.DoesNotExist as e:
if self.request.retries > 2:
msg = f"Failed to save full JSON for Instance {instance_id}"
report_exception(msg, e, sys.exc_info())
self.retry(exc=e, countdown=60 * self.request.retries)
else:
save_full_json(instance)


@use_master
def save_full_json(instance: "Instance", include_related=True):
"""Save full json dict
Expand Down Expand Up @@ -374,6 +376,7 @@ def update_project_date_modified_async(self, instance_id, created):
self.retry(exc=e, countdown=60 * self.request.retries)


@use_master
def update_project_date_modified(instance_id, _):
"""Update the project's date_modified
Expand Down Expand Up @@ -861,6 +864,7 @@ def post_save_submission(sender, instance=None, created=False, **kwargs):


# pylint: disable=unused-argument
@use_master
def permanently_delete_attachments(sender, instance=None, created=False, **kwargs):
if instance:
attachments = instance.attachments.all()
Expand Down

0 comments on commit 2f530ae

Please sign in to comment.