Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use proper fields for finding prunable jobs #1145

Merged
merged 4 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/oban/engines/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ defmodule Oban.Engines.Basic do
subquery =
queryable
|> select([:id, :queue, :state])
|> where([j], j.state in ~w(completed cancelled discarded))
|> where([j], j.state == "completed" and j.scheduled_at < ^time)
|> or_where([j], j.state == "cancelled" and j.cancelled_at < ^time)
|> or_where([j], j.state == "discarded" and j.discarded_at < ^time)
|> where([j], not is_nil(j.queue))
|> where([j], j.scheduled_at < ^time)
|> limit(^limit)

query =
Expand Down
5 changes: 3 additions & 2 deletions lib/oban/engines/lite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ defmodule Oban.Engines.Lite do
select_query =
queryable
|> select([j], map(j, [:id, :queue, :state]))
|> where([j], j.state in ~w(completed cancelled discarded))
|> where([j], j.scheduled_at < ^time)
|> where([j], j.state == "completed" and j.scheduled_at < ^time)
|> or_where([j], j.state == "cancelled" and j.cancelled_at < ^time)
|> or_where([j], j.state == "discarded" and j.discarded_at < ^time)
|> limit(^limit)

pruned = Repo.all(conf, select_query)
Expand Down
8 changes: 7 additions & 1 deletion test/oban/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,13 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do
TelemetryHandler.attach_events(span_type: [:job, [:engine, :prune_jobs]])

for state <- Job.states(), seconds <- 59..61 do
opts = [state: to_string(state), scheduled_at: seconds_ago(seconds)]
opts = [
state: to_string(state),
completed_at: seconds_ago(seconds),
discarded_at: seconds_ago(seconds),
cancelled_at: seconds_ago(seconds),
scheduled_at: seconds_ago(seconds)
]

# Insert one job at a time to avoid a "Cell-wise defaults" error in SQLite.
Oban.insert!(name, Worker.new(%{}, opts))
Expand Down
21 changes: 15 additions & 6 deletions test/oban/plugins/pruner_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,26 @@ defmodule Oban.Plugins.PrunerTest do
test "historic jobs are pruned when they are older than the configured age" do
TelemetryHandler.attach_events()

%Job{id: _id_} = insert!(%{}, state: "completed", scheduled_at: seconds_ago(61))
%Job{id: _id_} = insert!(%{}, state: "cancelled", scheduled_at: seconds_ago(61))
%Job{id: _id_} = insert!(%{}, state: "discarded", scheduled_at: seconds_ago(61))
%Job{id: id_1} = insert!(%{}, state: "completed", scheduled_at: seconds_ago(59))
%Job{id: _id_} = insert_historical("cancelled", :cancelled_at, 61, 61)
%Job{id: _id_} = insert_historical("cancelled", :cancelled_at, 61, 59)
%Job{id: _id_} = insert_historical("discarded", :discarded_at, 61, 61)
%Job{id: _id_} = insert_historical("discarded", :discarded_at, 61, 59)
%Job{id: _id_} = insert_historical("completed", :completed_at, 61, 61)
%Job{id: _id_} = insert_historical("completed", :completed_at, 59, 61)

%Job{id: id_1} = insert_historical("cancelled", :cancelled_at, 59, 61)
%Job{id: id_2} = insert_historical("cancelled", :cancelled_at, 59, 59)
%Job{id: id_3} = insert_historical("discarded", :discarded_at, 59, 61)
%Job{id: id_4} = insert_historical("discarded", :discarded_at, 59, 59)
%Job{id: id_5} = insert_historical("completed", :completed_at, 59, 59)
%Job{id: id_6} = insert_historical("completed", :completed_at, 61, 59)

start_supervised_oban!(plugins: [{Pruner, interval: 10, max_age: 60}])

assert_receive {:event, :stop, _, %{plugin: Pruner} = meta}
assert %{pruned_count: 3, pruned_jobs: [_ | _]} = meta
assert %{pruned_count: 6, pruned_jobs: [_ | _]} = meta

assert [id_1] ==
assert [id_1, id_2, id_3, id_4, id_5, id_6] ==
Job
|> select([j], j.id)
|> order_by(asc: :id)
Expand Down
13 changes: 13 additions & 0 deletions test/support/case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ defmodule Oban.Case do
Oban.insert!(oban, changeset)
end

# Pruning test helpers

def insert_historical(state, timestamp_field, timestamp_age, scheduled_age) do
opts =
Keyword.put(
[state: state, scheduled_at: seconds_ago(scheduled_age)],
timestamp_field,
seconds_ago(timestamp_age)
)

insert!(%{}, opts)
end

# Time

def seconds_from_now(seconds) do
Expand Down