Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace all Oban telemetry events #437

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ defmodule OpentelemetryOban do
@doc """
Initializes and configures telemetry handlers.

By default jobs and plugins are traced. If you wish to trace only jobs then
use:
By default everything is traced. If you wish to trace only jobs then use:

OpentelemetryOban.setup(trace: [:jobs])

Note that if you don't trace plugins, but inside the plugins, there are spans
from other instrumentation libraries (e.g. ecto) then these will still be
traced. This setting controls only the spans that are created by
opentelemetry_oban.
Note that if you don't trace plugins or internal, there will be spans from
other instrumentation libraries (e.g. ecto) that would be traced. This setting
controls only the spans that are created by opentelemetry_oban.
"""
@spec setup() :: :ok
def setup(opts \\ []) do
trace = Keyword.get(opts, :trace, [:jobs, :plugins])
trace = Keyword.get(opts, :trace, [:jobs, :plugins, :internal])

if Enum.member?(trace, :jobs) do
OpentelemetryOban.JobHandler.attach()
Expand All @@ -48,6 +46,10 @@ defmodule OpentelemetryOban do
OpentelemetryOban.PluginHandler.attach()
end

if Enum.member?(trace, :internal) do
OpentelemetryOban.InternalHandler.attach()
end

:ok
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule OpentelemetryOban.InternalHandler do
alias OpenTelemetry.Span
alias OpenTelemetry.SemanticConventions.Trace

require Trace

@tracer_id __MODULE__

def attach() do
:telemetry.attach_many(
{__MODULE__, :internal},
Enum.flat_map(
[
[:engine, :init],
[:engine, :refresh],
[:engine, :put_meta],
[:engine, :check_available],
[:engine, :cancel_all_jobs],
[:engine, :fetch_jobs],
[:engine, :insert_all_jobs],
[:engine, :prune_all_jobs],
[:engine, :stage_jobs],
[:engine, :cancel_job],
[:engine, :complete_job],
[:engine, :discard_job],
[:engine, :error_job],
[:engine, :insert_job],
[:engine, :snooze_job],
[:notifier, :notify],
[:peer, :election]
],
fn event ->
[
[:oban | event ++ [:start]],
[:oban | event ++ [:stop]],
[:oban | event ++ [:exception]]
]
end
),
&__MODULE__.handle_oban_event/4,
[]
)
end

def handle_oban_event(event, _measurements, metadata, _config) do
[op | rest] = Enum.reverse(event)

case op do
:start ->
OpentelemetryTelemetry.start_telemetry_span(__MODULE__, Enum.join(Enum.reverse(rest), "."), metadata, %{kind: :consumer})
Copy link
Author

@danschultzer danschultzer Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should add messaging.system attribute here to make it easy to filter?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes for the plugin handler.


:stop ->
OpentelemetryTelemetry.end_telemetry_span(__MODULE__, metadata)

:exception ->
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)

Span.record_exception(ctx, metadata.reason, metadata.stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, Exception.message(metadata.reason)))

OpentelemetryTelemetry.end_telemetry_span(__MODULE__, metadata)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule OpentelemetryOban.JobHandler do

defp attach_job_start_handler() do
:telemetry.attach(
"#{__MODULE__}.job_start",
{__MODULE__, [:job, :start]},
[:oban, :job, :start],
&__MODULE__.handle_job_start/4,
[]
Expand All @@ -23,7 +23,7 @@ defmodule OpentelemetryOban.JobHandler do

defp attach_job_stop_handler() do
:telemetry.attach(
"#{__MODULE__}.job_stop",
{__MODULE__, [:job, :stop]},
[:oban, :job, :stop],
&__MODULE__.handle_job_stop/4,
[]
Expand All @@ -32,7 +32,7 @@ defmodule OpentelemetryOban.JobHandler do

defp attach_job_exception_handler() do
:telemetry.attach(
"#{__MODULE__}.job_exception",
{__MODULE__, [:job, :exception]},
[:oban, :job, :exception],
&__MODULE__.handle_job_exception/4,
[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule OpentelemetryOban.PluginHandler do

defp attach_plugin_start_handler() do
:telemetry.attach(
"#{__MODULE__}.plugin_start",
{__MODULE__, [:plugin, :start]},
[:oban, :plugin, :start],
&__MODULE__.handle_plugin_start/4,
[]
Expand All @@ -21,7 +21,7 @@ defmodule OpentelemetryOban.PluginHandler do

defp attach_plugin_stop_handler() do
:telemetry.attach(
"#{__MODULE__}.plugin_stop",
{__MODULE__, [:plugin, :stop]},
[:oban, :plugin, :stop],
&__MODULE__.handle_plugin_stop/4,
[]
Expand All @@ -30,7 +30,7 @@ defmodule OpentelemetryOban.PluginHandler do

defp attach_plugin_exception_handler() do
:telemetry.attach(
"#{__MODULE__}.plugin_exception",
{__MODULE__, [:plugin, :exception]},
[:oban, :plugin, :exception],
&__MODULE__.handle_plugin_exception/4,
[]
Expand Down
6 changes: 3 additions & 3 deletions instrumentation/opentelemetry_oban/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ defmodule OpentelemetryOban.MixProject do
defp deps do
[
{:oban, "~> 2.0"},
{:opentelemetry_api, "~> 1.2"},
{:opentelemetry_api, "~> 1.4"},
{:opentelemetry_telemetry, "~> 1.1"},
{:opentelemetry_semantic_conventions, "~> 0.2"},
{:opentelemetry, "~> 1.0", only: [:test]},
{:opentelemetry_semantic_conventions, "~> 1.27"},
{:opentelemetry, "~> 1.4", only: [:test]},
{:opentelemetry_exporter, "~> 1.0", only: [:test]},
{:telemetry, "~> 0.4 or ~> 1.0"},
{:ex_doc, "~> 0.35", only: [:dev], runtime: false},
Expand Down
6 changes: 3 additions & 3 deletions instrumentation/opentelemetry_oban/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
"makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"oban": {:hex, :oban, "2.17.4", "3ebe79dc0cad16f23e5feea418f9bc5b07d453b8fb7caf376d812be96157a5c5", [:mix], [{:ecto_sql, "~> 3.6", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:ecto_sqlite3, "~> 0.9", [hex: :ecto_sqlite3, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "71a804abea3bb7e104782a5b5337cbab76c1a56b9689a6d5159a3873c93898b6"},
"opentelemetry": {:hex, :opentelemetry, "1.3.1", "f0a342a74379e3540a634e7047967733da4bc8b873ec9026e224b2bd7369b1fc", [:rebar3], [{:opentelemetry_api, "~> 1.2.2", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:opentelemetry_semantic_conventions, "~> 0.2", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}], "hexpm", "de476b2ac4faad3e3fe3d6e18b35dec9cb338c3b9910c2ce9317836dacad3483"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.2.2", "693f47b0d8c76da2095fe858204cfd6350c27fe85d00e4b763deecc9588cf27a", [:mix, :rebar3], [{:opentelemetry_semantic_conventions, "~> 0.2", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}], "hexpm", "dc77b9a00f137a858e60a852f14007bb66eda1ffbeb6c05d5fe6c9e678b05e9d"},
"opentelemetry": {:hex, :opentelemetry, "1.5.0", "7dda6551edfc3050ea4b0b40c0d2570423d6372b97e9c60793263ef62c53c3c2", [:rebar3], [{:opentelemetry_api, "~> 1.4", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "cdf4f51d17b592fc592b9a75f86a6f808c23044ba7cf7b9534debbcc5c23b0ee"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.4.0", "63ca1742f92f00059298f478048dfb826f4b20d49534493d6919a0db39b6db04", [:mix, :rebar3], [], "hexpm", "3dfbbfaa2c2ed3121c5c483162836c4f9027def469c41578af5ef32589fcfc58"},
"opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.6.0", "f4fbf69aa9f1541b253813221b82b48a9863bc1570d8ecc517bc510c0d1d3d8c", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.3", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.2", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.18", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "1802d1dca297e46f21e5832ecf843c451121e875f73f04db87355a6cb2ba1710"},
"opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "0.2.0", "b67fe459c2938fcab341cb0951c44860c62347c005ace1b50f8402576f241435", [:mix, :rebar3], [], "hexpm", "d61fa1f5639ee8668d74b527e6806e0503efc55a42db7b5f39939d84c07d6895"},
"opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"},
"opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.1", "4a73bfa29d7780ffe33db345465919cef875034854649c37ac789eb8e8f38b21", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ee43b14e6866123a3ee1344e3c0d3d7591f4537542c2a925fcdbf46249c9b50b"},
"postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
defmodule OpentelemetryOban.InternalHandlerTest do
use DataCase

require OpenTelemetry.Tracer
require OpenTelemetry.Span
require Record

for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do
Record.defrecord(name, spec)
end

for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do
Record.defrecord(name, spec)
end

@events [
[:engine, :init],
[:engine, :refresh],
[:engine, :put_meta],
[:engine, :check_available],
[:engine, :cancel_all_jobs],
[:engine, :fetch_jobs],
[:engine, :insert_all_jobs],
[:engine, :prune_all_jobs],
[:engine, :stage_jobs],
[:engine, :cancel_job],
[:engine, :complete_job],
[:engine, :discard_job],
[:engine, :error_job],
[:engine, :insert_job],
[:engine, :snooze_job],
[:notifier, :notify],
[:peer, :election]
]

setup do
:application.stop(:opentelemetry)
:application.set_env(:opentelemetry, :tracer, :otel_tracer_default)

:application.set_env(:opentelemetry, :processors, [
{:otel_batch_processor, %{scheduled_delay_ms: 1, exporter: {:otel_exporter_pid, self()}}}
])

:application.start(:opentelemetry)

TestHelpers.remove_oban_handlers()
OpentelemetryOban.setup(trace: [:internal])

:ok
end

test "does not create spans when internal tracing is disabled" do
TestHelpers.remove_oban_handlers()
OpentelemetryOban.setup(trace: [])

execute_internal_event([:peer, :election])

refute_receive {:span, span(name: "oban.peer.election")}
end

test "records span on internal execution" do
execute_internal_event([:peer, :election])

assert_receive {:span, span(name: "oban.peer.election")}
end

test "records span on error" do
:telemetry.execute(
[:oban, :peer, :election, :start],
%{system_time: System.system_time()},
%{}
)

exception = %UndefinedFunctionError{
arity: 0,
function: :error,
message: nil,
module: Some,
reason: nil
}

:telemetry.execute(
[:oban, :peer, :election, :exception],
%{duration: 444},
%{
kind: :error,
stacktrace: [
{Some, :error, [], []}
],
reason: exception
}
)

expected_status = OpenTelemetry.status(:error, Exception.message(exception))

assert_receive {:span,
span(
name: "oban.peer.election",
events: events,
status: ^expected_status
)}

[
event(
name: :exception,
attributes: event_attributes
)
] = :otel_events.list(events)

assert [:"exception.message", :"exception.stacktrace", :"exception.type"] ==
Enum.sort(Map.keys(:otel_attributes.map(event_attributes)))
end

for event <- @events do
test "#{inspect([:oban | event])} spans" do
execute_internal_event(unquote(event))

assert_receive {:span, span(name: "oban.#{unquote(Enum.join(event, "."))}")}

:ok
end
end

defp execute_internal_event(event) do
:telemetry.execute(
[:oban | event ++ [:start]],
%{system_time: System.system_time()},
%{}
)

:telemetry.execute(
[:oban | event ++ [:stop]],
%{duration: 42069},
%{}
)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do

[
event(
name: "exception",
name: :exception,
attributes: event_attributes
)
] = :otel_events.list(events)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ defmodule OpentelemetryObanTest do

[
event(
name: "exception",
name: :exception,
attributes: event_attributes
)
] = :otel_events.list(events)
Expand Down Expand Up @@ -268,7 +268,7 @@ defmodule OpentelemetryObanTest do

[
event(
name: "exception",
name: :exception,
attributes: event_attributes
)
] = :otel_events.list(events)
Expand Down Expand Up @@ -324,7 +324,7 @@ defmodule OpentelemetryObanTest do

[
event(
name: "exception",
name: :exception,
attributes: event_attributes
)
] = :otel_events.list(events)
Expand Down
Loading