Skip to content
This repository has been archived by the owner on Aug 24, 2024. It is now read-only.

Race conditions throw by mongodb #77

Open
Jean-PhilippeD opened this issue Jul 30, 2023 · 4 comments
Open

Race conditions throw by mongodb #77

Jean-PhilippeD opened this issue Jul 30, 2023 · 4 comments

Comments

@Jean-PhilippeD
Copy link

Jean-PhilippeD commented Jul 30, 2023

Hi,

I got Race conditions errors, and sometimes, when I edit a schedule, bunch of other tasks are executed without respecting their schedules.

Reading the code, I think it could be optimized but I'm not sure enough, I post it here if someone reads me ;)

We have this in MongoSchedulers:

 def get_from_database(self):
        self.sync()
        d = {}
        for doc in self.Model.objects.filter(enabled=True):
            d[doc.name] = self.Entry(doc)
        return d

    @property
    def schedule(self):
        if self.requires_update():
            self._schedule = self.get_from_database()
            self._last_updated = datetime.datetime.now()
        return self._schedule

    def sync(self):
        logger.debug('Writing entries...')
        for entry in self._schedule.values():
            entry.save()

While in beat.py from celery, at each tick, we verify difference between previous schedule and new one, so we trigger a get_from_database quiet often because the require_upates refresh every 5sec, the same as tick time interval.

So every 5 sec, we sync, we fetch from database, and once finished, we look if we need to sync again and probably we do.

So we could let beat.py :

    def should_sync(self):
        return (
            (not self._last_sync or
             (time.monotonic() - self._last_sync) > self.sync_every) or
            (self.sync_every_tasks and
             self._tasks_since_sync >= self.sync_every_tasks)
        )

decide if we should sync :

   @property
   def schedule(self):
       return self._schedule

   def sync(self):
       logger.debug('Writing entries...')
       for entry in self._schedule.values():
           entry.save()
       self._schedule = {}
       for doc in self.Model.objects.filter(enabled=True):
           self._schedule[doc.name] = self.Entry(doc)

I need to test it.

@Jean-PhilippeD
Copy link
Author

In fact, beat sync every 180s by default, doing this, you need to wait 3min before taking into acount new schedules, that's not statisfying...

Still testing why I take Race conditions.

@Jean-PhilippeD
Copy link
Author

Jean-PhilippeD commented Aug 1, 2023

I've added in schedulers.py:

line 7: import pytz

line 45: elif self._task.last_run_at.tzinfo != self._default_now().tzinfo:
line 46:             self._task.last_run_at = self._task.last_run_at.replace(tzinfo=pytz.UTC).astimezone(tz=pytz.timezone(current_app.conf.get('timezone', pytz.UTC)))

To handle timezone because mongodb give last_run_at with UTC timezone, while celery beat look for local timezone which is not the same.

This works with celery 5 using the config key timezone when loading celery

@Jean-PhilippeD
Copy link
Author

The race conditions appears when sync come from beat in the same time we sync from beatmongo

@Jean-PhilippeD
Copy link
Author

Jean-PhilippeD commented Aug 2, 2023

To fix this quickly, I just changed to this :

def save(self):
        if self.total_run_count > self._task.total_run_count:
            self._task.total_run_count = self.total_run_count
        if self.last_run_at and self._task.last_run_at and self.last_run_at > self._task.last_run_at:
            self._task.last_run_at = self.last_run_at
        self._task.run_immediately = False
        try:
            self._task.save(save_condition={})
        except mongoengine.errors.SaveConditionError:
            get_logger(__name__).warning('Race conditions dectected, re-saving')
            self._task.save(save_condition={})
        except Exception:
            self._task.save(save_condition={})

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant