Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Telemetry #296

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ defmodule EventStore do
quote bind_quoted: [opts: opts] do
@behaviour EventStore

alias EventStore.{Config, EventData, PubSub, Subscriptions}
alias EventStore.{Config, EventData, PubSub, Subscriptions, Telemetry}
alias EventStore.Snapshots.{SnapshotData, Snapshotter}
alias EventStore.Subscriptions.Subscription
alias EventStore.Streams.Stream
Expand Down Expand Up @@ -306,7 +306,15 @@ defmodule EventStore do
{conn, opts} = parse_opts(opts)
opts = Keyword.merge(opts, overrides)

Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts)
telemetry_metadata = %{
stream_uuid: stream_uuid,
expected_version: expected_version,
event_count: length(events)
}

Telemetry.measure_span(:append_to_stream, telemetry_metadata, fn ->
Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts)
end)
end

def link_to_stream(
Expand Down Expand Up @@ -335,7 +343,15 @@ defmodule EventStore do
def read_stream_forward(stream_uuid, start_version, count, opts) do
{conn, opts} = parse_opts(opts)

Stream.read_stream_forward(conn, stream_uuid, start_version, count, opts)
telemetry_metadata = %{
stream_uuid: stream_uuid,
start_version: start_version,
count: count
}

Telemetry.measure_span(:read_stream_forward, telemetry_metadata, fn ->
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Do we want the metrics to mimic the function names? Or should the metrics be lower level?

Stream.read_stream_forward(conn, stream_uuid, start_version, count, opts)
end)
end

def read_all_streams_forward(
Expand Down
6 changes: 4 additions & 2 deletions lib/event_store/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ defmodule EventStore.Supervisor do
Notifications,
PubSub,
Serializer,
Subscriptions
Subscriptions,
Telemetry
}

@doc """
Expand Down Expand Up @@ -90,7 +91,8 @@ defmodule EventStore.Supervisor do
Supervisor.child_spec({Registry, keys: :unique, name: subscriptions_registry_name},
id: subscriptions_registry_name
),
{Notifications.Supervisor, {name, config}}
{Notifications.Supervisor, {name, config}},
Telemetry.poller_child_spec(config)
] ++ PubSub.child_spec(name)

:ok = Config.associate(name, self(), event_store, config)
Expand Down
63 changes: 63 additions & 0 deletions lib/event_store/telemetry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule EventStore.Telemetry do
@moduledoc false
alias EventStore.Storage

require Logger

def poller_child_spec(opts) do
conn = Keyword.fetch!(opts, :conn)
schema = Keyword.fetch!(opts, :schema)
period = Keyword.get(opts, :telemetry_poller_period) || :timer.seconds(15)
init_delay = Keyword.get(opts, :telemetry_poller_init_delay) || :timer.seconds(5)

{:telemetry_poller,
period: period,
init_delay: init_delay,
measurements: [
{__MODULE__, :subscriptions, [conn, schema]}
]}
end

def subscriptions(conn, schema) do
with {:ok, stream_info} <- Storage.stream_info(conn, "$all", schema: schema),
{:ok, subscriptions} <- Storage.subscriptions(conn, schema: schema) do
Enum.each(subscriptions, fn subscription ->
measurements = %{
last_seen: subscription.last_seen,
lag: stream_info.stream_version - subscription.last_seen
}

metadata = %{
stream_uuid: subscription.stream_uuid,
subscription_name: subscription.subscription_name
}

execute(:subscription, measurements, metadata)
end)
else
_ ->
Logger.warning("Failed to emit subscription telemetry")
end
end

def execute(event_name, measurements, metadata) do
:telemetry.execute([event_name_prefix(), event_name], measurements, metadata)
end

def measure_span(event_name, metadata, func) do
:telemetry.span([event_name_prefix(), event_name], metadata, fn ->
case func.() do
:ok = result ->
{result, Map.put(metadata, :result, result)}

{:ok, _result} = result ->
{result, Map.put(metadata, :result, result)}

{:error, error} = result ->
{result, Map.merge(metadata, %{error: error, result: nil})}
end
end)
end

defp event_name_prefix, do: :event_store
TylerPachal marked this conversation as resolved.
Show resolved Hide resolved
end
2 changes: 2 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ defmodule EventStore.Mixfile do
{:fsm, "~> 0.3"},
{:gen_stage, "~> 1.2"},
{:postgrex, "~> 0.17"},
{:telemetry, "~> 1.0"},
{:telemetry_poller, "~> 1.0"},

# Optional dependencies
{:jason, "~> 1.4", optional: true},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"telemetry_poller": {:hex, :telemetry_poller, "1.1.0", "58fa7c216257291caaf8d05678c8d01bd45f4bdbc1286838a28c4bb62ef32999", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9eb9d9cbfd81cbd7cdd24682f8711b6e2b691289a0de6826e58452f28c103c8f"},
}