Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(locked_tasks): Add timeout to auto unlock tasks #16

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

gagantrivedi
Copy link
Member

@gagantrivedi gagantrivedi commented Jan 6, 2025

Fixes #17 by picking up locked tasks for processing after a timeout.

@gagantrivedi gagantrivedi force-pushed the fix/locked-tasks branch 3 times, most recently from 28e82e6 to dc63661 Compare January 6, 2025 08:13
@@ -161,6 +167,7 @@ def register_recurring_task(
args: tuple[typing.Any] = (),
kwargs: dict[str, typing.Any] | None = None,
first_run_time: time | None = None,
timeout: timedelta | None = timedelta(minutes=30),
Copy link
Member Author

@gagantrivedi gagantrivedi Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The longest run duration for a recurring task in production is 00:11:49.801789

migrations.AddField(
model_name="task",
name="timeout",
field=models.DurationField(blank=True, null=True),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default not set here to avoid locking the table.

@@ -124,6 +128,7 @@ def register_task_handler( # noqa: C901
queue_size: int | None = None,
priority: TaskPriority = TaskPriority.NORMAL,
transaction_on_commit: bool = True,
timeout: timedelta | None = timedelta(seconds=60),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The longest run duration for a task in production is 00:00:30.459713

@gagantrivedi gagantrivedi changed the title fix(locked-tasks): wip fix(17/recurring-task-lock): Add timeout to auto unlock task Jan 6, 2025
@gagantrivedi gagantrivedi linked an issue Jan 6, 2025 that may be closed by this pull request
@@ -0,0 +1,32 @@
CREATE OR REPLACE FUNCTION get_recurringtasks_to_process(num_tasks integer)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only lines 10, 20, and 25 were updated here.

@gagantrivedi gagantrivedi force-pushed the fix/locked-tasks branch 2 times, most recently from dfd51cd to 6e07161 Compare January 6, 2025 08:55
@gagantrivedi gagantrivedi marked this pull request as ready for review January 6, 2025 10:14
@gagantrivedi gagantrivedi requested review from a team and matthewelwell and removed request for a team January 6, 2025 10:14
Copy link
Contributor

@matthewelwell matthewelwell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few fairly minor comments / questions, but overall this looks excellent 👌

@@ -146,6 +151,9 @@ def mark_success(self):
class RecurringTask(AbstractBaseTask):
run_every = models.DurationField()
first_run_time = models.TimeField(blank=True, null=True)
locked_at = models.DateTimeField(blank=True, null=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't need this on the regular Task model because we can use scheduled_for ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially planned to add the unlock feature only for recurring tasks, but on second thought, I have now added it to regular tasks as well.

task_runs = run_recurring_tasks()

# Then
assert cache.get(DEFAULT_CACHE_KEY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert cache.get(DEFAULT_CACHE_KEY)
assert cache.get(DEFAULT_CACHE_KEY) == DEFAULT_CACHE_VALUE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do also think it's worth adding an assert to the beginning of the test that DEFAULT_CACHE_KEY doesn't already exist in the cache, or a cache.delete(DEFAULT_CACHE_KEY) perhaps?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@gagantrivedi gagantrivedi changed the title fix(17/recurring-task-lock): Add timeout to auto unlock task fix(locked_tasks): Add timeout to auto unlock tasks Jan 8, 2025
@@ -63,6 +70,116 @@ def test_run_task_runs_task_and_creates_task_run_object_when_success(db):
assert task.completed


def test_run_tasks_runs_locked_task_after_tiemout(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_run_tasks_runs_locked_task_after_tiemout(
def test_run_tasks_runs_locked_task_after_timeout(

WHERE id = row_to_return.id;
-- If we don't explicitly update the columns here, the client will receive a row
-- that is locked but still shows `is_locked` as `False` and `locked_at` as `None`.
row_to_return.is_locked := TRUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also do row_to_return.locked_at = NOW() (or maybe NOW() should be stored in a variable for exact accuracy)?

-- If we don't explicitly update the columns here, the client will receive a row
-- that is locked but still shows `is_locked` as `False` and `locked_at` as `None`.
row_to_return.is_locked := TRUE;
row_to_return.locked_at := NOW();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking, but we should set NOW() to some variable for exact accuracy here, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, NOW() returns the same value on every call in a transaction

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Recurring Tasks can get stuck in locked status
2 participants