-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(locked_tasks): Add timeout to auto unlock tasks #16
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ class TaskHandler(typing.Generic[P]): | |
"priority", | ||
"transaction_on_commit", | ||
"task_identifier", | ||
"timeout", | ||
) | ||
|
||
unwrapped: typing.Callable[P, None] | ||
|
@@ -38,11 +39,13 @@ def __init__( | |
queue_size: int | None = None, | ||
priority: TaskPriority = TaskPriority.NORMAL, | ||
transaction_on_commit: bool = True, | ||
timeout: timedelta | None = None, | ||
) -> None: | ||
self.unwrapped = f | ||
self.queue_size = queue_size | ||
self.priority = priority | ||
self.transaction_on_commit = transaction_on_commit | ||
self.timeout = timeout | ||
|
||
task_name = task_name or f.__name__ | ||
task_module = getmodule(f).__name__.rsplit(".")[-1] | ||
|
@@ -87,6 +90,7 @@ def delay( | |
scheduled_for=delay_until or timezone.now(), | ||
priority=self.priority, | ||
queue_size=self.queue_size, | ||
timeout=self.timeout, | ||
args=args, | ||
kwargs=kwargs, | ||
) | ||
|
@@ -124,6 +128,7 @@ def register_task_handler( # noqa: C901 | |
queue_size: int | None = None, | ||
priority: TaskPriority = TaskPriority.NORMAL, | ||
transaction_on_commit: bool = True, | ||
timeout: timedelta | None = timedelta(seconds=60), | ||
) -> typing.Callable[[typing.Callable[P, None]], TaskHandler[P]]: | ||
""" | ||
Turn a function into an asynchronous task. | ||
|
@@ -150,6 +155,7 @@ def wrapper(f: typing.Callable[P, None]) -> TaskHandler[P]: | |
queue_size=queue_size, | ||
priority=priority, | ||
transaction_on_commit=transaction_on_commit, | ||
timeout=timeout, | ||
) | ||
|
||
return wrapper | ||
|
@@ -161,6 +167,7 @@ def register_recurring_task( | |
args: tuple[typing.Any] = (), | ||
kwargs: dict[str, typing.Any] | None = None, | ||
first_run_time: time | None = None, | ||
timeout: timedelta | None = timedelta(minutes=30), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The longest run duration for a recurring task in production is |
||
) -> typing.Callable[[typing.Callable[..., None]], RecurringTask]: | ||
if not os.environ.get("RUN_BY_PROCESSOR"): | ||
# Do not register recurring tasks if not invoked by task processor | ||
|
@@ -182,6 +189,7 @@ def decorator(f: typing.Callable[..., None]) -> RecurringTask: | |
"serialized_kwargs": RecurringTask.serialize_data(kwargs or {}), | ||
"run_every": run_every, | ||
"first_run_time": first_run_time, | ||
"timeout": timeout, | ||
}, | ||
) | ||
return task | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# Generated by Django 3.2.23 on 2025-01-06 04:51 | ||
|
||
from task_processor.migrations.helpers import PostgresOnlyRunSQL | ||
import datetime | ||
from django.db import migrations, models | ||
import os | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
("task_processor", "0011_add_priority_to_get_tasks_to_process"), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name="recurringtask", | ||
name="locked_at", | ||
field=models.DateTimeField(blank=True, null=True), | ||
), | ||
migrations.AddField( | ||
model_name="task", | ||
name="locked_at", | ||
field=models.DateTimeField(blank=True, null=True), | ||
), | ||
migrations.AddField( | ||
model_name="recurringtask", | ||
name="timeout", | ||
field=models.DurationField(default=datetime.timedelta(minutes=30)), | ||
), | ||
migrations.AddField( | ||
model_name="task", | ||
name="timeout", | ||
field=models.DurationField(default=datetime.timedelta(minutes=1)), | ||
), | ||
PostgresOnlyRunSQL.from_sql_file( | ||
os.path.join( | ||
os.path.dirname(__file__), | ||
"sql", | ||
"0012_get_recurringtasks_to_process.sql", | ||
), | ||
reverse_sql=os.path.join( | ||
os.path.dirname(__file__), | ||
"sql", | ||
"0008_get_recurringtasks_to_process.sql", | ||
), | ||
), | ||
PostgresOnlyRunSQL.from_sql_file( | ||
os.path.join( | ||
os.path.dirname(__file__), | ||
"sql", | ||
"0012_get_tasks_to_process.sql", | ||
), | ||
reverse_sql=os.path.join( | ||
os.path.dirname(__file__), | ||
"sql", | ||
"0011_get_tasks_to_process.sql", | ||
), | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
CREATE OR REPLACE FUNCTION get_recurringtasks_to_process(num_tasks integer) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only lines 10, 20, and 25 were updated here. |
||
RETURNS SETOF task_processor_recurringtask AS $$ | ||
DECLARE | ||
row_to_return task_processor_recurringtask; | ||
BEGIN | ||
-- Select the tasks that needs to be processed | ||
FOR row_to_return IN | ||
SELECT * | ||
FROM task_processor_recurringtask | ||
WHERE is_locked = FALSE OR (locked_at IS NOT NULL AND locked_at < NOW() - timeout) | ||
ORDER BY id | ||
LIMIT num_tasks | ||
-- Select for update to ensure that no other workers can select these tasks while in this transaction block | ||
FOR UPDATE SKIP LOCKED | ||
LOOP | ||
-- Lock every selected task(by updating `is_locked` to true) | ||
UPDATE task_processor_recurringtask | ||
-- Lock this row by setting is_locked True, so that no other workers can select these tasks after this | ||
-- transaction is complete (but the tasks are still being executed by the current worker) | ||
SET is_locked = TRUE, locked_at = NOW() | ||
WHERE id = row_to_return.id; | ||
-- If we don't explicitly update the columns here, the client will receive a row | ||
-- that is locked but still shows `is_locked` as `False` and `locked_at` as `None`. | ||
row_to_return.is_locked := TRUE; | ||
row_to_return.locked_at := NOW(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpicking, but we should set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, NOW() returns the same value on every call in a transaction |
||
RETURN NEXT row_to_return; | ||
END LOOP; | ||
|
||
RETURN; | ||
END; | ||
$$ LANGUAGE plpgsql | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
CREATE OR REPLACE FUNCTION get_tasks_to_process(num_tasks integer) | ||
RETURNS SETOF task_processor_task AS $$ | ||
DECLARE | ||
row_to_return task_processor_task; | ||
BEGIN | ||
-- Select the tasks that needs to be processed | ||
FOR row_to_return IN | ||
SELECT * | ||
FROM task_processor_task | ||
WHERE num_failures < 3 AND scheduled_for < NOW() AND completed = FALSE AND (is_locked = FALSE OR (locked_at IS NOT NULL AND locked_at < NOW() - timeout)) | ||
ORDER BY priority ASC, scheduled_for ASC, created_at ASC | ||
LIMIT num_tasks | ||
-- Select for update to ensure that no other workers can select these tasks while in this transaction block | ||
FOR UPDATE SKIP LOCKED | ||
LOOP | ||
-- Lock every selected task(by updating `is_locked` to true) | ||
UPDATE task_processor_task | ||
-- Lock this row by setting is_locked True, so that no other workers can select these tasks after this | ||
-- transaction is complete (but the tasks are still being executed by the current worker) | ||
SET is_locked = TRUE, locked_at = NOW() | ||
WHERE id = row_to_return.id; | ||
-- If we don't explicitly update the columns here, the client will receive a row | ||
-- that is locked but still shows `is_locked` as `False` and `locked_at` as `None`. | ||
row_to_return.is_locked := TRUE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this also do |
||
RETURN NEXT row_to_return; | ||
END LOOP; | ||
|
||
RETURN; | ||
END; | ||
$$ LANGUAGE plpgsql | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The longest run duration for a task in production is
00:00:30.459713