Skip to content

Commit

Permalink
Merge pull request #252 from arthurnovak/add_finch_instance_to_req_te…
Browse files Browse the repository at this point in the history
…lemetry_meta

Add finch instance name to telemetry metadata
  • Loading branch information
sneako authored Jan 9, 2024
2 parents 603058b + 279ced6 commit 1a46841
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 37 deletions.
11 changes: 5 additions & 6 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,9 @@ defmodule Finch do
{:ok, acc} | {:error, Exception.t()}
when acc: term()
def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do
fun =
fn entry, acc ->
{:cont, fun.(entry, acc)}
end
fun = fn entry, acc ->
{:cont, fun.(entry, acc)}
end

stream_while(req, name, acc, fun, opts)
end
Expand Down Expand Up @@ -427,7 +426,7 @@ defmodule Finch do

defp __stream__(%Request{} = req, name, acc, fun, opts) do
{pool, pool_mod} = get_pool(req, name)
pool_mod.request(pool, req, acc, fun, opts)
pool_mod.request(pool, req, acc, fun, name, opts)
end

@doc """
Expand Down Expand Up @@ -560,7 +559,7 @@ defmodule Finch do
@spec async_request(Request.t(), name(), request_opts()) :: request_ref()
def async_request(%Request{} = req, name, opts \\ []) do
{pool, pool_mod} = get_pool(req, name)
pool_mod.async_request(pool, req, opts)
pool_mod.async_request(pool, req, name, opts)
end

@doc """
Expand Down
16 changes: 9 additions & 7 deletions lib/finch/http1/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@ defmodule Finch.HTTP1.Conn do
}
end

def connect(%{mint: mint} = conn) when not is_nil(mint) do
def connect(%{mint: mint} = conn, name) when not is_nil(mint) do
meta = %{
scheme: conn.scheme,
host: conn.host,
port: conn.port
port: conn.port,
name: name
}

Telemetry.event(:reused_connection, %{}, meta)
{:ok, conn}
end

def connect(%{mint: nil} = conn) do
def connect(%{mint: nil} = conn, name) do
meta = %{
scheme: conn.scheme,
host: conn.host,
port: conn.port
port: conn.port,
name: name
}

start_time = Telemetry.start(:connect, meta)
Expand Down Expand Up @@ -98,12 +100,12 @@ defmodule Finch.HTTP1.Conn do
end
end

def request(%{mint: nil} = conn, _, _, _, _, _, _), do: {:error, conn, "Could not connect"}
def request(%{mint: nil} = conn, _, _, _, _, _, _, _), do: {:error, conn, "Could not connect"}

def request(conn, req, acc, fun, receive_timeout, request_timeout, idle_time) do
def request(conn, req, acc, fun, name, receive_timeout, request_timeout, idle_time) do
full_path = Finch.Request.request_path(req)

metadata = %{request: req}
metadata = %{request: req, name: name}

extra_measurements = %{idle_time: idle_time}

Expand Down
24 changes: 17 additions & 7 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ defmodule Finch.HTTP1.Pool do
pool_max_idle_time,
_start_pool_metrics?,
_pool_idx
} =
opts
} = opts

%{
id: __MODULE__,
Expand All @@ -40,12 +39,13 @@ defmodule Finch.HTTP1.Pool do
end

@impl Finch.Pool
def request(pool, req, acc, fun, opts) do
def request(pool, req, acc, fun, name, opts) do
pool_timeout = Keyword.get(opts, :pool_timeout, 5_000)
receive_timeout = Keyword.get(opts, :receive_timeout, 15_000)
request_timeout = Keyword.get(opts, :request_timeout, :infinity)

metadata = %{request: req, pool: pool}
metadata = %{request: req, pool: pool, name: name}

start_time = Telemetry.start(:queue, metadata)

try do
Expand All @@ -55,9 +55,18 @@ defmodule Finch.HTTP1.Pool do
fn from, {state, conn, idle_time} ->
Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time})

with {:ok, conn} <- Conn.connect(conn),
with {:ok, conn} <- Conn.connect(conn, name),
{:ok, conn, acc} <-
Conn.request(conn, req, acc, fun, receive_timeout, request_timeout, idle_time) do
Conn.request(
conn,
req,
acc,
fun,
name,
receive_timeout,
request_timeout,
idle_time
) do
{{:ok, acc}, transfer_if_open(conn, state, from)}
else
{:error, conn, error} ->
Expand Down Expand Up @@ -90,7 +99,7 @@ defmodule Finch.HTTP1.Pool do
end

@impl Finch.Pool
def async_request(pool, req, opts) do
def async_request(pool, req, name, opts) do
owner = self()

pid =
Expand All @@ -103,6 +112,7 @@ defmodule Finch.HTTP1.Pool do
req,
{owner, monitor, request_ref},
&send_async_response/2,
name,
opts
) do
{:ok, _} -> send(owner, {request_ref, :done})
Expand Down
12 changes: 7 additions & 5 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Finch.HTTP2.Pool do
# Call the pool with the request. The pool will multiplex multiple requests
# and stream the result set back to the calling process using `send`
@impl Finch.Pool
def request(pool, request, acc, fun, opts) do
def request(pool, request, acc, fun, name, opts) do
opts = Keyword.put_new(opts, :receive_timeout, @default_receive_timeout)
timeout = opts[:receive_timeout]
request_ref = make_request_ref(pool)
Expand All @@ -48,7 +48,8 @@ defmodule Finch.HTTP2.Pool do
response_waiting_loop(acc, fun, request_ref, monitor, fail_safe_timeout, :headers)
catch
kind, error ->
Telemetry.exception(:recv, recv_start, kind, error, __STACKTRACE__, %{request: request})
metadata = %{request: request, name: name}
Telemetry.exception(:recv, recv_start, kind, error, __STACKTRACE__, metadata)

:ok = :gen_statem.call(pool, {:cancel, request_ref})
clean_responses(request_ref)
Expand All @@ -60,7 +61,7 @@ defmodule Finch.HTTP2.Pool do
end

@impl Finch.Pool
def async_request(pool, req, opts) do
def async_request(pool, req, _name, opts) do
opts = Keyword.put_new(opts, :receive_timeout, @default_receive_timeout)
request_ref = make_request_ref(pool)

Expand Down Expand Up @@ -264,7 +265,8 @@ defmodule Finch.HTTP2.Pool do
metadata = %{
scheme: data.scheme,
host: data.host,
port: data.port
port: data.port,
name: data.finch_name
}

start = Telemetry.start(:connect, metadata)
Expand Down Expand Up @@ -529,7 +531,7 @@ defmodule Finch.HTTP2.Pool do
end

defp send_request(from, from_pid, request_ref, req, opts, data) do
telemetry_metadata = %{request: req}
telemetry_metadata = %{request: req, name: data.finch_name}

request = %{
stream: RequestStream.new(req.body),
Expand Down
11 changes: 8 additions & 3 deletions lib/finch/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ defmodule Finch.Pool do
Finch.Request.t(),
acc,
Finch.stream(acc),
Finch.name(),
list()
) ::
{:ok, acc} | {:error, term()}
) :: {:ok, acc} | {:error, term()}
when acc: term()

@callback async_request(pid(), Finch.Request.t(), list()) :: request_ref()
@callback async_request(
pid(),
Finch.Request.t(),
Finch.name(),
list()
) :: request_ref()

@callback cancel_async_request(request_ref()) :: :ok

Expand Down
11 changes: 11 additions & 0 deletions lib/finch/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ defmodule Finch.Telemetry do
#### Metadata
* `:name` - The name of the Finch instance.
* `:pool` - The pool's PID.
* `:request` - The request (`Finch.Request`).
Expand All @@ -77,6 +78,7 @@ defmodule Finch.Telemetry do
#### Metadata
* `:name` - The name of the Finch instance.
* `:pool` - The pool's PID.
* `:request` - The request (`Finch.Request`).
Expand All @@ -90,6 +92,7 @@ defmodule Finch.Telemetry do
#### Metadata
* `:name` - The name of the Finch instance.
* `:request` - The request (`Finch.Request`).
* `:kind` - The type of exception.
* `:reason` - Error description or error data.
Expand All @@ -106,6 +109,7 @@ defmodule Finch.Telemetry do
#### Metadata
* `:name` - The name of the Finch instance.
* `:scheme` - The scheme used in the connection. either `http` or `https`.
* `:host` - The host address.
* `:port` - The port to connect on.
Expand All @@ -120,6 +124,7 @@ defmodule Finch.Telemetry do
#### Metadata
* `:name` - The name of the Finch instance.
* `:scheme` - The scheme used in the connection. either `http` or `https`.
* `:host` - The host address.
* `:port` - The port to connect on.
Expand All @@ -131,6 +136,7 @@ defmodule Finch.Telemetry do
#### Measurements
* `:name` - The name of the Finch instance.
* `:system_time` - The system time.
* `:idle_time` - Elapsed time since the connection was last checked in or initialized.
Expand All @@ -144,6 +150,7 @@ defmodule Finch.Telemetry do
#### Measurements
* `:name` - The name of the Finch instance.
* `:duration` - Time taken to make the request.
* `:idle_time` - Elapsed time since the connection was last checked in or initialized.
Expand All @@ -163,6 +170,7 @@ defmodule Finch.Telemetry do
#### Metadata
* `:name` - The name of the Finch instance.
* `:request` - The request (`Finch.Request`).
### Receive Stop
Expand All @@ -176,6 +184,7 @@ defmodule Finch.Telemetry do
#### Metadata
* `:name` - The name of the Finch instance.
* `:request` - The request (`Finch.Request`).
* `:status` - The response status (`Mint.Types.status()`).
* `:headers` - The response headers (`Mint.Types.headers()`).
Expand All @@ -192,6 +201,7 @@ defmodule Finch.Telemetry do
#### Metadata
* `:name` - The name of the Finch instance.
* `:request` - The request (`Finch.Request`).
* `:kind` - The type of exception.
* `:reason` - Error description or error data.
Expand All @@ -203,6 +213,7 @@ defmodule Finch.Telemetry do
#### Metadata
* `:name` - The name of the Finch instance.
* `:scheme` - The scheme used in the connection. either `http` or `https`.
* `:host` - The host address.
* `:port` - The port to connect on.
Expand Down
12 changes: 6 additions & 6 deletions test/finch/http2/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ defmodule Finch.HTTP2.PoolTest do
start_pool(port)
end)

ref = Pool.async_request(pool, req, [])
ref = Pool.async_request(pool, req, nil, [])

assert_recv_frames([headers(stream_id: stream_id)])

Expand All @@ -391,7 +391,7 @@ defmodule Finch.HTTP2.PoolTest do
:timer.sleep(10)

# We can't send any more requests since the connection is closed for writing.
ref2 = Pool.async_request(pool, req, [])
ref2 = Pool.async_request(pool, req, nil, [])
assert_receive {^ref2, {:error, %Finch.Error{reason: :read_only}}}

server_send_frames([
Expand All @@ -409,7 +409,7 @@ defmodule Finch.HTTP2.PoolTest do
Process.sleep(50)

# If we try to make a request now that the server shut down, we get an error.
ref3 = Pool.async_request(pool, req, [])
ref3 = Pool.async_request(pool, req, nil, [])
assert_receive {^ref3, {:error, %Finch.Error{reason: :disconnected}}}
end

Expand All @@ -421,7 +421,7 @@ defmodule Finch.HTTP2.PoolTest do
start_pool(port)
end)

ref = Pool.async_request(pool, req, [])
ref = Pool.async_request(pool, req, nil, [])

assert_recv_frames([headers(stream_id: stream_id)])

Expand Down Expand Up @@ -449,7 +449,7 @@ defmodule Finch.HTTP2.PoolTest do
start_pool(port)
end)

ref = Pool.async_request(pool, %{req | headers: [{"foo", "bar"}]}, [])
ref = Pool.async_request(pool, %{req | headers: [{"foo", "bar"}]}, nil, [])

assert_receive {^ref, {:error, %{reason: {:max_header_list_size_exceeded, _, _}}}}
end
Expand Down Expand Up @@ -488,7 +488,7 @@ defmodule Finch.HTTP2.PoolTest do
{:data, value}, {status, headers, body} -> {:cont, {status, headers, body <> value}}
end

Pool.request(pool, req, acc, fun, opts)
Pool.request(pool, req, acc, fun, nil, opts)
end

defp start_server_and_connect_with(opts \\ [], fun) do
Expand Down
Loading

0 comments on commit 1a46841

Please sign in to comment.