Skip to content

Commit

Permalink
use channels to process ready tasks (#25)
Browse files Browse the repository at this point in the history
This introduces a trigger which will notify a channel when tasks are
inserted or updated in a "pending" state. Workers now listen on this
channel and if their queue is named as the one the notification is for
begin processing the next task.

Because channel notifications can fail, we retain the polling mechanism
only now it's set to a default polling interval of one minute. This can
be configured to be shorter or longer as well.

Closes #19
  • Loading branch information
maxcountryman authored Oct 25, 2024
1 parent a898a8d commit eeffc47
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 116 deletions.
19 changes: 19 additions & 0 deletions migrations/20241024174106_1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- function to notify about task changes
create or replace function underway.task_change_notify()
returns trigger as $$
begin
if (new.state = 'pending') then
perform pg_notify('task_change', json_build_object(
'task_queue_name', new.task_queue_name
)::text);
end if;

return new;
end;
$$ language plpgsql;

-- trigger that calls the function after task changes
create trigger task_changed
after insert or update on underway.task
for each row
execute procedure underway.task_change_notify();
2 changes: 1 addition & 1 deletion src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,7 @@ mod tests {
job.start();

// Give the job a moment to process.
tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

assert_eq!(
*state.data.lock().expect("Mutex should not be poisoned"),
Expand Down
Loading

0 comments on commit eeffc47

Please sign in to comment.