Skip to content

Commit

Permalink
Emit queue shutdown telemetry event with logging
Browse files Browse the repository at this point in the history
A new queue shutdown event, `[:oban, :queue, :shutdown]`, is emitted by
each queue when it terminates. The event originates from the `watchman`
process, which tracks the total ellapsed time from when termination
starts to when all jobs complete or the allotted period is exhausted.

Any jobs that take longer than the `:shutdown_grace_period` (by default
15 seconds) are brutally killed and left as orphans. The ids of jobs
left in an executing state are listed in the event's `orphaned` meta.

This also adds `queue:shutdown` logging to the default logger. Only
queues that shutdown with orphaned jobs are logged, which makes it
easier to detect orphaned jobs and which jobs were affected.

Inspired by #1076 from @axelson. Closes #1076.
  • Loading branch information
sorentwo committed May 6, 2024
1 parent a906299 commit e7f91f4
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 42 deletions.
2 changes: 1 addition & 1 deletion lib/oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ defmodule Oban do
When running a mix release on a Heroku node, the node is alive even if not part of a
distributed system. In order to use the `DYNO` value, configure the node value using runtime
configuration via `config/runtime.exs:
configuration via `config/runtime.exs`:
config :my_app, Oban,
node: System.get_env("DYNO", "nonode@nohost")
Expand Down
2 changes: 1 addition & 1 deletion lib/oban/queue/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule Oban.Queue.Supervisor do
|> Keyword.put_new(:dispatch_cooldown, conf.dispatch_cooldown)

watch_opts = [
foreman: fore_name,
conf: conf,
name: watch_name,
producer: prod_name,
shutdown: conf.shutdown_grace_period
Expand Down
43 changes: 19 additions & 24 deletions lib/oban/queue/watchman.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,16 @@ defmodule Oban.Queue.Watchman do
alias Oban.Queue.Producer
alias __MODULE__, as: State

@type option ::
{:foreman, GenServer.name()}
| {:name, module()}
| {:producer, GenServer.name()}
| {:shutdown, timeout()}
defstruct [:conf, :producer, :shutdown, interval: 10]

defstruct [:foreman, :producer, :shutdown, interval: 10]

@spec child_spec([option]) :: Supervisor.child_spec()
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(opts) do
shutdown =
case opts[:shutdown] do
0 -> :brutal_kill
value -> value
end
shutdown = Keyword.fetch!(opts, :shutdown) + Keyword.get(opts, :interval, 10)

%{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}, shutdown: shutdown}
end

@spec start_link([option]) :: GenServer.on_start()
@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts) do
{name, opts} = Keyword.pop(opts, :name)

Expand All @@ -41,27 +31,32 @@ defmodule Oban.Queue.Watchman do

@impl GenServer
def terminate(_reason, %State{} = state) do
# There is a chance that the foreman doesn't exist, and we never want to raise another error
# as part of the shut down process.
# The producer may not exist, and we don't want to raise during shutdown.
try do
:ok = Producer.shutdown(state.producer)
:ok = wait_for_executing(state)
:ok = wait_for_executing(0, state)
catch
:exit, _reason -> :ok
end

:ok
end

defp wait_for_executing(state) do
case DynamicSupervisor.count_children(state.foreman) do
%{active: 0} ->
:ok
defp wait_for_executing(ellapsed, state) do
check = Producer.check(state.producer)

if check.running == [] or ellapsed >= state.shutdown do
:telemetry.execute(
[:oban, :queue, :shutdown],
%{ellapsed: ellapsed},
%{conf: state.conf, orphaned: check.running, queue: check.queue}
)

_ ->
:ok = Process.sleep(state.interval)
:ok
else
:ok = Process.sleep(state.interval)

wait_for_executing(state)
wait_for_executing(ellapsed + state.interval, state)
end
end
end
69 changes: 54 additions & 15 deletions lib/oban/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Oban.Telemetry do
| `:stop` | `:duration`, `:queue_time` | `:conf`, `:job`, `:state`, `:result` |
| `:exception` | `:duration`, `:queue_time` | `:conf`, `:job`, `:state`, `:kind`, `:reason`, `:result`, `:stacktrace` |
### Metadata
#### Metadata
* `:conf` — the executing Oban instance's config
* `:job` — the executing `Oban.Job`
Expand Down Expand Up @@ -97,7 +97,7 @@ defmodule Oban.Telemetry do
| `:stop` | `:duration` | `:conf`, `:engine`, `:job` |
| `:exception` | `:duration` | `:conf`, `:engine`, `:job`, `:kind`, `:reason`, `:stacktrace` |
### Metadata
#### Metadata
* `:conf` — the Oban supervisor's config
* `:engine` — the module of the engine used
Expand All @@ -117,7 +117,7 @@ defmodule Oban.Telemetry do
| `:stop` | `:duration` | `:conf`, `:channel`, `:payload` |
| `:exception` | `:duration` | `:conf`, `:channel`, `:payload`, `:kind`, `:reason`, `:stacktrace` |
### Metadata
#### Metadata
* `:conf` — the Oban supervisor's config
* `:channel` — the channel on which the notification was sent
Expand All @@ -134,7 +134,7 @@ defmodule Oban.Telemetry do
| ------------ | --------- | ------------------ |
| `:switch` | | `:conf`, `:status` |
### Metadata
#### Metadata
* `:conf` — see the explanation in metadata above
* `:status` — one of `:isolated`, `:solitary`, or `:clustered`, see
Expand Down Expand Up @@ -179,12 +179,29 @@ defmodule Oban.Telemetry do
| `:stop` | `:duration` | `:conf`, `:leader`, `:peer`, |
| `:exception` | `:duration` | `:conf`, `:leader`, `:peer`, `:kind`, `:reason`, `:stacktrace` |
### Metadata
#### Metadata
* `:conf`, `:kind`, `:reason`, `:stacktrace` — see the explanation in notifier metadata above
* `:leader` — whether the peer is the current leader
* `:peer` — the module used for peering
## Queue Shutdown Events
Oban emits an event when a queue shuts down cleanly, e.g. without being brutally killed. Event
emission isn't guaranteed because it is emitted part of a `terminate/1` callback.
* `[:oban, :queue, :shutdown]`
| event | measures | metadata |
| ----------- | ----------- | ------------------------------ |
| `:shutdown` | `:ellapsed` | `:conf`, `:orphaned`, `:queue` |
#### Metadata
* `:conf` — see the explanation in metadata above
* `:orphaned` — a list of job id's left in an `executing` state because they couldn't finish
* `:queue` — the stringified queue name
## Stager Events
Oban emits an event any time the Stager switches between `local` and `global` modes:
Expand All @@ -195,7 +212,7 @@ defmodule Oban.Telemetry do
| ------------ | --------- | ---------------- |
| `:switch` | | `:conf`, `:mode` |
### Metadata
#### Metadata
* `:conf` — see the explanation in metadata above
* `:mode` — either `local` for polling mode or `global` in the more efficient pub-sub mode
Expand Down Expand Up @@ -272,7 +289,10 @@ defmodule Oban.Telemetry do
@doc """
Attaches a default structured JSON Telemetry handler for logging.
This function attaches a handler that outputs logs with the following fields for job events:
This function attaches a handler that outputs logs with `message` and `source` fields, along
with some event specific fields.
#### Job Events
* `args` — a map of the job's raw arguments
* `attempt` — the job's execution atttempt
Expand All @@ -282,27 +302,31 @@ defmodule Oban.Telemetry do
* `id` — the job's id
* `meta` — a map of the job's raw metadata
* `queue` — the job's queue
* `source` — always "oban"
* `state` — the execution state, one of "success", "failure", "cancelled", "discard", or
"snoozed"
* `system_time` — when the job started, in microseconds
* `tags` — the job's tags
* `worker` — the job's worker module
For stager events:
#### Queue Shutdown Events
* `event` — always `stager:switch`
* `message` — information about the mode switch
* `mode` — either `"local"` or `"global"`
* `source` — always "oban"
* `ellapsed` — the amount of time the queue waited for shutdown, in milliseconds
* `event` — always `queue:shutdown`
* `orphaned` — a list of any job id's left in an `executing` state
* `queue` — the queue name
For notifier events:
#### Notifier Events
* `event` — always `notifier:switch`
* `message` — information about the status switch
* `source` — always "oban"
* `status` — either `"isolated"`, `"solitary"`, or `"clustered"`
#### Stager Events
* `event` — always `stager:switch`
* `message` — information about the mode switch
* `mode` — either `"local"` or `"global"`
## Options
* `:level` — The log level to use for logging output, defaults to `:info`
Expand Down Expand Up @@ -336,6 +360,7 @@ defmodule Oban.Telemetry do
[:oban, :job, :stop],
[:oban, :job, :exception],
[:oban, :notifier, :switch],
[:oban, :queue, :shutdown],
[:oban, :stager, :switch]
]

Expand Down Expand Up @@ -428,6 +453,18 @@ defmodule Oban.Telemetry do
end)
end

def handle_event([:oban, :queue, :shutdown], measure, %{orphaned: [_ | _]} = meta, opts) do
log(opts, fn ->
%{
ellapsed: measure.ellapsed,
event: "queue:shutdown",
orphaned: meta.orphaned,
queue: meta.queue,
message: "jobs were orphaned because they didn't finish executing in the allotted time"
}
end)
end

def handle_event([:oban, :stager, :switch], _measure, %{mode: mode}, opts) do
log(opts, fn ->
case mode do
Expand All @@ -450,6 +487,8 @@ defmodule Oban.Telemetry do
end)
end

def handle_event(_event, _measure, _meta, _opts), do: :ok

defp log(opts, fun) do
level = Keyword.fetch!(opts, :level)

Expand Down
2 changes: 1 addition & 1 deletion test/oban/queue/watchman_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Oban.Queue.WatchmanTest do
opts = [
name: WatchmanTest,
shutdown: 1_000,
foreman: Watchman.Foreman
producer: Watchman.Producer
]

start_supervised!({Watchman, opts})
Expand Down
49 changes: 49 additions & 0 deletions test/oban/telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ defmodule Oban.TelemetryTest do
} = error_meta
end

test "reporting jobs still executing after the shutdown period" do
ref = :telemetry_test.attach_event_handlers(self(), [[:oban, :queue, :shutdown]])

name = start_supervised_oban!(queues: [alpha: 10], shutdown_grace_period: 5)

insert!(name, [ref: 1, sleep: 500], queue: :alpha)
insert!(name, [ref: 2, sleep: 500], queue: :alpha)

assert_receive {:started, 1}
assert_receive {:started, 2}

stop_supervised(name)

assert_receive {_event, ^ref, %{ellapsed: 10}, %{orphaned: [_, _], queue: "alpha"}}
end

test "the default handler logs detailed event information" do
:ok = Telemetry.attach_default_logger(:warning)

Expand Down Expand Up @@ -168,6 +184,39 @@ defmodule Oban.TelemetryTest do
Telemetry.detach_default_logger()
end

test "the default handler logs orphaned jobs at queue shutdown" do
:ok = Telemetry.attach_default_logger(:warning)

logged =
capture_log(fn ->
:telemetry.execute([:oban, :queue, :shutdown], %{ellapsed: 500}, %{
queue: "alpha",
orphaned: [100, 101, 102]
})
end)

assert logged =~ ~s|"source":"oban"|
assert logged =~ ~s|"event":"queue:shutdown"|
assert logged =~ ~s|"orphaned":[100,101,102]|
assert logged =~ ~s|"queue":"alpha"|
assert logged =~ ~s|"message":"jobs were orphaned because|
after
Telemetry.detach_default_logger()
end

test "the default handler doesn't log anything on shutdown without orphans" do
:ok = Telemetry.attach_default_logger(:warning)

logged =
capture_log(fn ->
:telemetry.execute([:oban, :queue, :shutdown], %{}, %{queue: "alpha", orphaned: []})
end)

refute logged =~ ~s|"event":"queue:shutdown"|
after
Telemetry.detach_default_logger()
end

test "detaching the logger prevents logging" do
:ok = Telemetry.attach_default_logger(:warning)
:ok = Telemetry.detach_default_logger()
Expand Down

0 comments on commit e7f91f4

Please sign in to comment.