-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(locked_tasks): Add timeout to auto unlock tasks #16
base: main
Are you sure you want to change the base?
Conversation
28e82e6
to
dc63661
Compare
@@ -161,6 +167,7 @@ def register_recurring_task( | |||
args: tuple[typing.Any] = (), | |||
kwargs: dict[str, typing.Any] | None = None, | |||
first_run_time: time | None = None, | |||
timeout: timedelta | None = timedelta(minutes=30), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The longest run duration for a recurring task in production is 00:11:49.801789
migrations.AddField( | ||
model_name="task", | ||
name="timeout", | ||
field=models.DurationField(blank=True, null=True), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default not set here to avoid locking the table.
dc63661
to
cba7ead
Compare
@@ -124,6 +128,7 @@ def register_task_handler( # noqa: C901 | |||
queue_size: int | None = None, | |||
priority: TaskPriority = TaskPriority.NORMAL, | |||
transaction_on_commit: bool = True, | |||
timeout: timedelta | None = timedelta(seconds=60), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The longest run duration for a task in production is 00:00:30.459713
cba7ead
to
f0ac380
Compare
@@ -0,0 +1,32 @@ | |||
CREATE OR REPLACE FUNCTION get_recurringtasks_to_process(num_tasks integer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only lines 10, 20, and 25 were updated here.
dfd51cd
to
6e07161
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few fairly minor comments / questions, but overall this looks excellent 👌
task_processor/models.py
Outdated
@@ -146,6 +151,9 @@ def mark_success(self): | |||
class RecurringTask(AbstractBaseTask): | |||
run_every = models.DurationField() | |||
first_run_time = models.TimeField(blank=True, null=True) | |||
locked_at = models.DateTimeField(blank=True, null=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we don't need this on the regular Task
model because we can use scheduled_for
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially planned to add the unlock feature only for recurring tasks, but on second thought, I have now added it to regular tasks as well.
task_processor/migrations/sql/0012_get_recurringtasks_to_process.sql
Outdated
Show resolved
Hide resolved
task_runs = run_recurring_tasks() | ||
|
||
# Then | ||
assert cache.get(DEFAULT_CACHE_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert cache.get(DEFAULT_CACHE_KEY) | |
assert cache.get(DEFAULT_CACHE_KEY) == DEFAULT_CACHE_VALUE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do also think it's worth adding an assert to the beginning of the test that DEFAULT_CACHE_KEY
doesn't already exist in the cache, or a cache.delete(DEFAULT_CACHE_KEY)
perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
ed052fa
to
d8a4c37
Compare
@@ -63,6 +70,116 @@ def test_run_task_runs_task_and_creates_task_run_object_when_success(db): | |||
assert task.completed | |||
|
|||
|
|||
def test_run_tasks_runs_locked_task_after_tiemout( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def test_run_tasks_runs_locked_task_after_tiemout( | |
def test_run_tasks_runs_locked_task_after_timeout( |
WHERE id = row_to_return.id; | ||
-- If we don't explicitly update the columns here, the client will receive a row | ||
-- that is locked but still shows `is_locked` as `False` and `locked_at` as `None`. | ||
row_to_return.is_locked := TRUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also do row_to_return.locked_at = NOW()
(or maybe NOW()
should be stored in a variable for exact accuracy)?
-- If we don't explicitly update the columns here, the client will receive a row | ||
-- that is locked but still shows `is_locked` as `False` and `locked_at` as `None`. | ||
row_to_return.is_locked := TRUE; | ||
row_to_return.locked_at := NOW(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpicking, but we should set NOW()
to some variable for exact accuracy here, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, NOW() returns the same value on every call in a transaction
Fixes #17 by picking up locked tasks for processing after a timeout.