Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Execution Options are not applied when using admin action #803

Open
leandrodesouzadev opened this issue Sep 12, 2024 · 0 comments
Open

Comments

@leandrodesouzadev
Copy link

Summary:

PeriodicTask that have a expire_seconds defined won't have a expires option set when the task is dispatched by the admin action "Run Selected Tasks"

  • Celery Version: 5.4.0
  • Celery-Beat Version: 2.6.0

Exact steps to reproduce the issue:

  1. Create a PeriodicTask entry with expire_seconds
  2. Run the task via the admin
  3. Try to get the task.request.expires option, it will return None

Detailed information

The expire_seconds option is only applied when the celery is applied from the celery beat process.
In my case, receiving the expires option is crucial to know when the task was queued.

More information about my setup that may help to reproduce the error.

# custom_task.py
from datetime import datetime
from datetime import timedelta

from celery import Task


class ScheduledAtAwareTask(Task):
    @property
    def scheduled_at(self) -> datetime:
        """The exact time that this task was sent to the queue, this means that this task can be retried and this time
        will always be the same"""
        if not self.request.expires:
            raise ValueError(
                "Missing `expires` option, this task was not scheduled or the `expires`",
                "option was not given on BEAT_SCHEDULE",
            )
        # The amount of miliseconds this task will take to expire, with this, we can go back in time
        # per-task basis, because each task can be set with different expires times
        task_expiration = int(self.request.properties["expiration"]) / 1000
        dt = datetime.fromisoformat(self.request.expires).astimezone()
        return dt - timedelta(seconds=task_expiration)


# tasks.py
from celery import shared_task
from custom_task import ScheduledAtAwareTask


@shared_task(bind=True)
def example_task(self: ScheduledAtAwareTask):
    self.scheduled_at  # raises ValueError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant