Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deactivate periodic tasks in DB used by Beat if not present in Django (e.g. after code change). #768

Open
m-r-k-f opened this issue Jul 10, 2024 · 2 comments

Comments

@m-r-k-f
Copy link

m-r-k-f commented Jul 10, 2024

Summary: I found a solution to run only periodic tasks (Beat) which are present in Django Project. All renamed or removed tasks will be deactivated in DB used by Beat.

We got trouble with tasks triggered by Beat. After changing the name of a task the 'old' task entry in DB also was running. Even with removing the task definitions from code all already activated periodic tasks were running (no adjustment with current tasks present in Django Project).

One workaround was to delete all entries used by Beat in DB between shut down and start up of Beat. But I though there must be a better way to solve this.

Environment / Packages

  • Celery Version: 5.4.0
  • Celery-Beat Version: 2.6.0
  • Django-Version: 5.0.6
  • Python-Version: 3.12.0
  • OS: Linux (Ubuntu) - but same behave on Windows Platform.

Exact steps to reproduce the issue:

  1. Already initialized Django Project with Celery (celery[redis]).
  2. add 'django-celery-results' to the project.
  3. add 'django-celery-beat' to the project.
  4. add both to the installed apps in settings.
  5. in settings: CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' to use the DB as storage for Beat.
  6. migrate.
  7. other settings as described in Celery Documentation.
  8. create tasks.
  9. run worker.
  10. run beat.
  11. stops all.
  12. change the names of the tasks.
  13. start worker and beat again.
  14. 'old' task entries and 'new' (changed) task entries are running.

Detailed information

I debugged with 'inspect' to understand the flow when Beat is running.
After some minutes I found the solution (... I am joking - after many hours).
My solution is to disable all periodic tasks in DB used by Beat.
After this I enable each task which was found in the Django project.
In DB I have enabled (and currently present tasks from Project) and also
deactivated not running tasks.

I made short tests - in future I will go deeper.
After using CELERY_BEAT_SCHEDULER = 'celery.beat:PersistentScheduler' I recognized the same wrong behave. But I do not use file based storage (maybe I fix this in future - should be the same way).

Here is my change in the original django_celery_beat.schedulers.DatabaseScheduler after debugging (for a useful solution step to the next point):

lib/python3.12/site-packages/django_celery_beat/schedulers.py

class DatabaseScheduler(Scheduler):
.
.
.
    def setup_schedule(self):
        self.disable_all_periodic_tasks()  # new
        self.install_default_entries(self.schedule)
        self.update_from_dict(self.app.conf.beat_schedule)
    
    def disable_all_periodic_tasks(self):  # new method
        """disable all periodic tasks to be sure that
        tasks removed from Django code are deactivated in DB
        after Beat restart. But disabled tasks are still
        present and reachable in DB (not deleted)."""
        current_periodic_tasks = PeriodicTask.objects.all()
        for periodic_task in current_periodic_tasks:
            periodic_task.enabled = False
            periodic_task.save()
.
.
.
    def update_from_dict(self, mapping):
        s = {}
        for name, entry_fields in mapping.items():
            print(f"{name=} {entry_fields=}")
            # enable each task found in code of Django.
            # one exception: "enabled" key is already present 
            # in 'entry_fields' - this means an explicit setting.
            if "enabled" not in entry_fields:  # new
                entry_fields["enabled"] = True  # new
            try:
                entry = self.Entry.from_entry(name,
                                              app=self.app,
                                              **entry_fields)
                if entry.model.enabled:
                    s[name] = entry
            except Exception as exc:
                logger.exception(ADD_ENTRY_ERROR, name, exc, entry_fields)
        self.schedule.update(s)
.
.
.

Useful Solution

I inherited from the original DatabaseScheduler and overwrote two methods (from above) and added the third. After this I changed my scheduler to use in settings.py to CELERY_BEAT_SCHEDULER = 'myapp.own_scheduler:OwnScheduler'.

project/myapp/own_scheduler.py

from django_celery_beat.schedulers import (
    DatabaseScheduler,
    ADD_ENTRY_ERROR,
    PeriodicTask,
    logger,
)

class OwnScheduler(DatabaseScheduler):
    
    def setup_schedule(self):
        self.disable_all_periodic_tasks()
        self.install_default_entries(self.schedule)
        self.update_from_dict(self.app.conf.beat_schedule)
    
    def disable_all_periodic_tasks(self):
        """disable all periodic tasks to be sure that
        tasks removed from Django code are deactivated in DB
        after Beat restart. But disabled tasks are still
        present and reachable in DB (not deleted)."""
        current_periodic_tasks = PeriodicTask.objects.all()
        for periodic_task in current_periodic_tasks:
            periodic_task.enabled = False
            periodic_task.save()

    def update_from_dict(self, mapping):
        s = {}
        for name, entry_fields in mapping.items():
            # enable each task found in code of Django.
            # one exception: "enabled" key is already present 
            # in 'entry_fields' - this means an explicit setting.
            if "enabled" not in entry_fields:
                entry_fields["enabled"] = True
            try:
                entry = self.Entry.from_entry(name,
                                              app=self.app,
                                              **entry_fields)
                if entry.model.enabled:
                    s[name] = entry
            except Exception as exc:
                logger.exception(ADD_ENTRY_ERROR, name, exc, entry_fields)
        self.schedule.update(s)

Have fun. mrkf

@kylebebak
Copy link

kylebebak commented Jul 11, 2024

This is really basic functionality -- if the records in django_celery_beat_periodictask (the DB) don't match what's in beat_schedule (application code), this library is broken =(

I noted the same thing in #757


On a more productive note, a straightforward way to fix this would be to delete and recreate all the schedule records on startup. For syncing state, delete/recreate is in general slower but easier than diffing. For most use cases, with e.g. ~5-50 scheduled tasks, it won't seriously affect startup time, especially if records are batch inserted

This could be configurable behavior. But yeah, we just ran into this same bug -- we removed a task from the beat schedule, redeployed and restarted beat. Beat kept running the (non-existent) task, raising NotRegistered every time

@m-r-k-f
Copy link
Author

m-r-k-f commented Jul 11, 2024

Thanks for your reaction.

My interesting was to understand the inner structure of Celery + Beat and the question: did they really forget to implement a way to deactivate (or remove) periodic tasks after a change in code?

The second point was (and is): I want to hold my entries in DB for analysis (also triggered by a desire of my mentor).
And this is the different. I think it is case dependent.

Have fun. mrkf

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants