From e7f91f4853837645b3904cad8b1719f42d4adb16 Mon Sep 17 00:00:00 2001 From: Parker Selbert Date: Mon, 6 May 2024 10:09:44 +0100 Subject: [PATCH] Emit queue shutdown telemetry event with logging A new queue shutdown event, `[:oban, :queue, :shutdown]`, is emitted by each queue when it terminates. The event originates from the `watchman` process, which tracks the total ellapsed time from when termination starts to when all jobs complete or the allotted period is exhausted. Any jobs that take longer than the `:shutdown_grace_period` (by default 15 seconds) are brutally killed and left as orphans. The ids of jobs left in an executing state are listed in the event's `orphaned` meta. This also adds `queue:shutdown` logging to the default logger. Only queues that shutdown with orphaned jobs are logged, which makes it easier to detect orphaned jobs and which jobs were affected. Inspired by #1076 from @axelson. Closes #1076. --- lib/oban.ex | 2 +- lib/oban/queue/supervisor.ex | 2 +- lib/oban/queue/watchman.ex | 43 +++++++++---------- lib/oban/telemetry.ex | 69 ++++++++++++++++++++++++------- test/oban/queue/watchman_test.exs | 2 +- test/oban/telemetry_test.exs | 49 ++++++++++++++++++++++ 6 files changed, 125 insertions(+), 42 deletions(-) diff --git a/lib/oban.ex b/lib/oban.ex index fc0ec145..cdcbf598 100644 --- a/lib/oban.ex +++ b/lib/oban.ex @@ -409,7 +409,7 @@ defmodule Oban do When running a mix release on a Heroku node, the node is alive even if not part of a distributed system. In order to use the `DYNO` value, configure the node value using runtime - configuration via `config/runtime.exs: + configuration via `config/runtime.exs`: config :my_app, Oban, node: System.get_env("DYNO", "nonode@nohost") diff --git a/lib/oban/queue/supervisor.ex b/lib/oban/queue/supervisor.ex index f9e06ef6..4e9c603e 100644 --- a/lib/oban/queue/supervisor.ex +++ b/lib/oban/queue/supervisor.ex @@ -36,7 +36,7 @@ defmodule Oban.Queue.Supervisor do |> Keyword.put_new(:dispatch_cooldown, conf.dispatch_cooldown) watch_opts = [ - foreman: fore_name, + conf: conf, name: watch_name, producer: prod_name, shutdown: conf.shutdown_grace_period diff --git a/lib/oban/queue/watchman.ex b/lib/oban/queue/watchman.ex index 5d3bc243..fb5a3f0a 100644 --- a/lib/oban/queue/watchman.ex +++ b/lib/oban/queue/watchman.ex @@ -6,26 +6,16 @@ defmodule Oban.Queue.Watchman do alias Oban.Queue.Producer alias __MODULE__, as: State - @type option :: - {:foreman, GenServer.name()} - | {:name, module()} - | {:producer, GenServer.name()} - | {:shutdown, timeout()} + defstruct [:conf, :producer, :shutdown, interval: 10] - defstruct [:foreman, :producer, :shutdown, interval: 10] - - @spec child_spec([option]) :: Supervisor.child_spec() + @spec child_spec(Keyword.t()) :: Supervisor.child_spec() def child_spec(opts) do - shutdown = - case opts[:shutdown] do - 0 -> :brutal_kill - value -> value - end + shutdown = Keyword.fetch!(opts, :shutdown) + Keyword.get(opts, :interval, 10) %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}, shutdown: shutdown} end - @spec start_link([option]) :: GenServer.on_start() + @spec start_link(Keyword.t()) :: GenServer.on_start() def start_link(opts) do {name, opts} = Keyword.pop(opts, :name) @@ -41,11 +31,10 @@ defmodule Oban.Queue.Watchman do @impl GenServer def terminate(_reason, %State{} = state) do - # There is a chance that the foreman doesn't exist, and we never want to raise another error - # as part of the shut down process. + # The producer may not exist, and we don't want to raise during shutdown. try do :ok = Producer.shutdown(state.producer) - :ok = wait_for_executing(state) + :ok = wait_for_executing(0, state) catch :exit, _reason -> :ok end @@ -53,15 +42,21 @@ defmodule Oban.Queue.Watchman do :ok end - defp wait_for_executing(state) do - case DynamicSupervisor.count_children(state.foreman) do - %{active: 0} -> - :ok + defp wait_for_executing(ellapsed, state) do + check = Producer.check(state.producer) + + if check.running == [] or ellapsed >= state.shutdown do + :telemetry.execute( + [:oban, :queue, :shutdown], + %{ellapsed: ellapsed}, + %{conf: state.conf, orphaned: check.running, queue: check.queue} + ) - _ -> - :ok = Process.sleep(state.interval) + :ok + else + :ok = Process.sleep(state.interval) - wait_for_executing(state) + wait_for_executing(ellapsed + state.interval, state) end end end diff --git a/lib/oban/telemetry.ex b/lib/oban/telemetry.ex index 843e6d2c..626922f1 100644 --- a/lib/oban/telemetry.ex +++ b/lib/oban/telemetry.ex @@ -35,7 +35,7 @@ defmodule Oban.Telemetry do | `:stop` | `:duration`, `:queue_time` | `:conf`, `:job`, `:state`, `:result` | | `:exception` | `:duration`, `:queue_time` | `:conf`, `:job`, `:state`, `:kind`, `:reason`, `:result`, `:stacktrace` | - ### Metadata + #### Metadata * `:conf` — the executing Oban instance's config * `:job` — the executing `Oban.Job` @@ -97,7 +97,7 @@ defmodule Oban.Telemetry do | `:stop` | `:duration` | `:conf`, `:engine`, `:job` | | `:exception` | `:duration` | `:conf`, `:engine`, `:job`, `:kind`, `:reason`, `:stacktrace` | - ### Metadata + #### Metadata * `:conf` — the Oban supervisor's config * `:engine` — the module of the engine used @@ -117,7 +117,7 @@ defmodule Oban.Telemetry do | `:stop` | `:duration` | `:conf`, `:channel`, `:payload` | | `:exception` | `:duration` | `:conf`, `:channel`, `:payload`, `:kind`, `:reason`, `:stacktrace` | - ### Metadata + #### Metadata * `:conf` — the Oban supervisor's config * `:channel` — the channel on which the notification was sent @@ -134,7 +134,7 @@ defmodule Oban.Telemetry do | ------------ | --------- | ------------------ | | `:switch` | | `:conf`, `:status` | - ### Metadata + #### Metadata * `:conf` — see the explanation in metadata above * `:status` — one of `:isolated`, `:solitary`, or `:clustered`, see @@ -179,12 +179,29 @@ defmodule Oban.Telemetry do | `:stop` | `:duration` | `:conf`, `:leader`, `:peer`, | | `:exception` | `:duration` | `:conf`, `:leader`, `:peer`, `:kind`, `:reason`, `:stacktrace` | - ### Metadata + #### Metadata * `:conf`, `:kind`, `:reason`, `:stacktrace` — see the explanation in notifier metadata above * `:leader` — whether the peer is the current leader * `:peer` — the module used for peering + ## Queue Shutdown Events + + Oban emits an event when a queue shuts down cleanly, e.g. without being brutally killed. Event + emission isn't guaranteed because it is emitted part of a `terminate/1` callback. + + * `[:oban, :queue, :shutdown]` + + | event | measures | metadata | + | ----------- | ----------- | ------------------------------ | + | `:shutdown` | `:ellapsed` | `:conf`, `:orphaned`, `:queue` | + + #### Metadata + + * `:conf` — see the explanation in metadata above + * `:orphaned` — a list of job id's left in an `executing` state because they couldn't finish + * `:queue` — the stringified queue name + ## Stager Events Oban emits an event any time the Stager switches between `local` and `global` modes: @@ -195,7 +212,7 @@ defmodule Oban.Telemetry do | ------------ | --------- | ---------------- | | `:switch` | | `:conf`, `:mode` | - ### Metadata + #### Metadata * `:conf` — see the explanation in metadata above * `:mode` — either `local` for polling mode or `global` in the more efficient pub-sub mode @@ -272,7 +289,10 @@ defmodule Oban.Telemetry do @doc """ Attaches a default structured JSON Telemetry handler for logging. - This function attaches a handler that outputs logs with the following fields for job events: + This function attaches a handler that outputs logs with `message` and `source` fields, along + with some event specific fields. + + #### Job Events * `args` — a map of the job's raw arguments * `attempt` — the job's execution atttempt @@ -282,27 +302,31 @@ defmodule Oban.Telemetry do * `id` — the job's id * `meta` — a map of the job's raw metadata * `queue` — the job's queue - * `source` — always "oban" * `state` — the execution state, one of "success", "failure", "cancelled", "discard", or "snoozed" * `system_time` — when the job started, in microseconds * `tags` — the job's tags * `worker` — the job's worker module - For stager events: + #### Queue Shutdown Events - * `event` — always `stager:switch` - * `message` — information about the mode switch - * `mode` — either `"local"` or `"global"` - * `source` — always "oban" + * `ellapsed` — the amount of time the queue waited for shutdown, in milliseconds + * `event` — always `queue:shutdown` + * `orphaned` — a list of any job id's left in an `executing` state + * `queue` — the queue name - For notifier events: + #### Notifier Events * `event` — always `notifier:switch` * `message` — information about the status switch - * `source` — always "oban" * `status` — either `"isolated"`, `"solitary"`, or `"clustered"` + #### Stager Events + + * `event` — always `stager:switch` + * `message` — information about the mode switch + * `mode` — either `"local"` or `"global"` + ## Options * `:level` — The log level to use for logging output, defaults to `:info` @@ -336,6 +360,7 @@ defmodule Oban.Telemetry do [:oban, :job, :stop], [:oban, :job, :exception], [:oban, :notifier, :switch], + [:oban, :queue, :shutdown], [:oban, :stager, :switch] ] @@ -428,6 +453,18 @@ defmodule Oban.Telemetry do end) end + def handle_event([:oban, :queue, :shutdown], measure, %{orphaned: [_ | _]} = meta, opts) do + log(opts, fn -> + %{ + ellapsed: measure.ellapsed, + event: "queue:shutdown", + orphaned: meta.orphaned, + queue: meta.queue, + message: "jobs were orphaned because they didn't finish executing in the allotted time" + } + end) + end + def handle_event([:oban, :stager, :switch], _measure, %{mode: mode}, opts) do log(opts, fn -> case mode do @@ -450,6 +487,8 @@ defmodule Oban.Telemetry do end) end + def handle_event(_event, _measure, _meta, _opts), do: :ok + defp log(opts, fun) do level = Keyword.fetch!(opts, :level) diff --git a/test/oban/queue/watchman_test.exs b/test/oban/queue/watchman_test.exs index a9bb82db..b60d6ee2 100644 --- a/test/oban/queue/watchman_test.exs +++ b/test/oban/queue/watchman_test.exs @@ -8,7 +8,7 @@ defmodule Oban.Queue.WatchmanTest do opts = [ name: WatchmanTest, shutdown: 1_000, - foreman: Watchman.Foreman + producer: Watchman.Producer ] start_supervised!({Watchman, opts}) diff --git a/test/oban/telemetry_test.exs b/test/oban/telemetry_test.exs index 7ca2fb4f..42c9f945 100644 --- a/test/oban/telemetry_test.exs +++ b/test/oban/telemetry_test.exs @@ -92,6 +92,22 @@ defmodule Oban.TelemetryTest do } = error_meta end + test "reporting jobs still executing after the shutdown period" do + ref = :telemetry_test.attach_event_handlers(self(), [[:oban, :queue, :shutdown]]) + + name = start_supervised_oban!(queues: [alpha: 10], shutdown_grace_period: 5) + + insert!(name, [ref: 1, sleep: 500], queue: :alpha) + insert!(name, [ref: 2, sleep: 500], queue: :alpha) + + assert_receive {:started, 1} + assert_receive {:started, 2} + + stop_supervised(name) + + assert_receive {_event, ^ref, %{ellapsed: 10}, %{orphaned: [_, _], queue: "alpha"}} + end + test "the default handler logs detailed event information" do :ok = Telemetry.attach_default_logger(:warning) @@ -168,6 +184,39 @@ defmodule Oban.TelemetryTest do Telemetry.detach_default_logger() end + test "the default handler logs orphaned jobs at queue shutdown" do + :ok = Telemetry.attach_default_logger(:warning) + + logged = + capture_log(fn -> + :telemetry.execute([:oban, :queue, :shutdown], %{ellapsed: 500}, %{ + queue: "alpha", + orphaned: [100, 101, 102] + }) + end) + + assert logged =~ ~s|"source":"oban"| + assert logged =~ ~s|"event":"queue:shutdown"| + assert logged =~ ~s|"orphaned":[100,101,102]| + assert logged =~ ~s|"queue":"alpha"| + assert logged =~ ~s|"message":"jobs were orphaned because| + after + Telemetry.detach_default_logger() + end + + test "the default handler doesn't log anything on shutdown without orphans" do + :ok = Telemetry.attach_default_logger(:warning) + + logged = + capture_log(fn -> + :telemetry.execute([:oban, :queue, :shutdown], %{}, %{queue: "alpha", orphaned: []}) + end) + + refute logged =~ ~s|"event":"queue:shutdown"| + after + Telemetry.detach_default_logger() + end + test "detaching the logger prevents logging" do :ok = Telemetry.attach_default_logger(:warning) :ok = Telemetry.detach_default_logger()