Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use proper fields for finding prunable jobs #1145

Merged
merged 4 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/oban/engines/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ defmodule Oban.Engines.Basic do
subquery =
queryable
|> select([:id, :queue, :state])
|> where([j], j.state in ~w(completed cancelled discarded))
|> where([j], j.state == "completed" and j.completed_at < ^time)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We switched to the scheduled_at timestamp to make use of the compound index and speed up pruning. There are likely to be many more completed jobs than cancelled or discarded, and a completed job has to execute to end up in that state. So, let's use scheduled_at for the completed state to make use of that index.

Suggested change
|> where([j], j.state == "completed" and j.completed_at < ^time)
|> where([j], j.state == "completed" and j.scheduled_at < ^time)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes perfect sense performance-wise

If you do not do some tricky stuff, completed, cancelled and discarded rows never change. Probably they could all use a single timestamp field instead of 3, with a compound index on state and that field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this redundant? queue column is not nullable

where([j], not is_nil(j.queue))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The is_nil check is pointless with the current index structure, but it was required to force index usage with the old compound table where queue was before state. The index change was optional (way back when), and some people never chose that option.

We can probably remove it at this point, but it doesn't change the query plan.

|> or_where([j], j.state == "cancelled" and j.cancelled_at < ^time)
|> or_where([j], j.state == "discarded" and j.discarded_at < ^time)
|> where([j], not is_nil(j.queue))
|> where([j], j.scheduled_at < ^time)
|> limit(^limit)

query =
Expand Down
5 changes: 3 additions & 2 deletions lib/oban/engines/lite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ defmodule Oban.Engines.Lite do
select_query =
queryable
|> select([j], map(j, [:id, :queue, :state]))
|> where([j], j.state in ~w(completed cancelled discarded))
|> where([j], j.scheduled_at < ^time)
|> where([j], j.state == "completed" and j.completed_at < ^time)
sorentwo marked this conversation as resolved.
Show resolved Hide resolved
|> or_where([j], j.state == "cancelled" and j.cancelled_at < ^time)
|> or_where([j], j.state == "discarded" and j.discarded_at < ^time)
|> limit(^limit)

pruned = Repo.all(conf, select_query)
Expand Down
8 changes: 7 additions & 1 deletion test/oban/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,13 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do
TelemetryHandler.attach_events(span_type: [:job, [:engine, :prune_jobs]])

for state <- Job.states(), seconds <- 59..61 do
opts = [state: to_string(state), scheduled_at: seconds_ago(seconds)]
opts = [
state: to_string(state),
completed_at: seconds_ago(seconds),
discarded_at: seconds_ago(seconds),
cancelled_at: seconds_ago(seconds),
scheduled_at: seconds_ago(59)
]

# Insert one job at a time to avoid a "Cell-wise defaults" error in SQLite.
Oban.insert!(name, Worker.new(%{}, opts))
Expand Down
21 changes: 15 additions & 6 deletions test/oban/plugins/pruner_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,26 @@ defmodule Oban.Plugins.PrunerTest do
test "historic jobs are pruned when they are older than the configured age" do
TelemetryHandler.attach_events()

%Job{id: _id_} = insert!(%{}, state: "completed", scheduled_at: seconds_ago(61))
%Job{id: _id_} = insert!(%{}, state: "cancelled", scheduled_at: seconds_ago(61))
%Job{id: _id_} = insert!(%{}, state: "discarded", scheduled_at: seconds_ago(61))
%Job{id: id_1} = insert!(%{}, state: "completed", scheduled_at: seconds_ago(59))
%Job{id: _id_} = insert_historical("cancelled", :cancelled_at, 61, 61)
%Job{id: _id_} = insert_historical("cancelled", :cancelled_at, 61, 59)
%Job{id: _id_} = insert_historical("discarded", :discarded_at, 61, 61)
%Job{id: _id_} = insert_historical("discarded", :discarded_at, 61, 59)
%Job{id: _id_} = insert_historical("completed", :completed_at, 61, 61)
%Job{id: _id_} = insert_historical("completed", :completed_at, 61, 59)

%Job{id: id_1} = insert_historical("cancelled", :cancelled_at, 59, 61)
%Job{id: id_2} = insert_historical("cancelled", :cancelled_at, 59, 59)
%Job{id: id_3} = insert_historical("discarded", :discarded_at, 59, 61)
%Job{id: id_4} = insert_historical("discarded", :discarded_at, 59, 59)
%Job{id: id_5} = insert_historical("completed", :completed_at, 59, 61)
%Job{id: id_6} = insert_historical("completed", :completed_at, 59, 59)

start_supervised_oban!(plugins: [{Pruner, interval: 10, max_age: 60}])

assert_receive {:event, :stop, _, %{plugin: Pruner} = meta}
assert %{pruned_count: 3, pruned_jobs: [_ | _]} = meta
assert %{pruned_count: 6, pruned_jobs: [_ | _]} = meta

assert [id_1] ==
assert [id_1, id_2, id_3, id_4, id_5, id_6] ==
Job
|> select([j], j.id)
|> order_by(asc: :id)
Expand Down
10 changes: 10 additions & 0 deletions test/support/case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ defmodule Oban.Case do
Oban.insert!(oban, changeset)
end

# Pruning test helpers

def insert_historical(state, timestamp_field, timestamp_age, scheduled_age) do
opts =
[state: state, scheduled_at: seconds_ago(scheduled_age)]
|> Keyword.put(timestamp_field, seconds_ago(timestamp_age))

insert!(%{}, opts)
end

# Time

def seconds_from_now(seconds) do
Expand Down
Loading