Skip to content

Commit

Permalink
Fail safe timeout implementation and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hissssst committed Aug 4, 2024
1 parent 0dd28a1 commit 6b18ebf
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 17 deletions.
13 changes: 12 additions & 1 deletion lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,18 @@ defmodule Finch do

@doc """
Like `stream/5`, but returns valid `Stream` structure, safe for passing between processes, but working __only__
on local node. Works only for HTTP1, HTTP2 not supported currently
on local node. Works only for HTTP1, HTTP2 not supported currently.
## Options
Same as `request/3` plus
* `:fail_safe_timeout` (`t:timeout/0`) (optional) (default is 15 minutes) - timeout in milliseconds.
Since this function returns an Enumerable which is lazily executed, it makes sense to
have a timeout which will close the connection in case it's never read from in erroneous situations.
* `:stop_notify` (`{t:GenServer.name/0 | t:pid/0, t:any/0}`) (optional) - destination and message which
will be notified once connection is returned to pool or closed.
"""
@spec actual_stream(Request.t(), name(), request_opts()) ::
{:ok, Enumerable.t()} | {:error, Exception.t()}
Expand Down
61 changes: 45 additions & 16 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ defmodule Finch.HTTP1.Pool do
receive_timeout = Keyword.get(opts, :receive_timeout, 15_000)
request_timeout = Keyword.get(opts, :request_timeout, 30_000)
fail_safe_timeout = Keyword.get(opts, :fail_safe_timeout, 15 * 60_000)
stop_notify = Keyword.get(opts, :stop_notify, nil)

metadata = %{request: req, pool: pool, name: name}

Expand All @@ -58,17 +59,37 @@ defmodule Finch.HTTP1.Pool do
pool,
:checkout,
fn from, {state, conn, idle_time} ->
if fail_safe_timeout != :infinity do
Process.send_after(self(), :fail_safe_timeout, fail_safe_timeout)
end

Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time})
send(owner, {ref, :ok, {conn, idle_time}})

receive do
{^ref, :stop, conn} ->
transfer_if_open(conn, state, from)
after
fail_safe_timeout ->
{_, state} = transfer_if_open(conn, state, from)
{:fail_safe_timeout, state}

return =
case Conn.connect(conn, name) do
{:ok, conn} ->
send(owner, {ref, :ok, {conn, idle_time}})

receive do
{^ref, :stop, conn} ->
with :closed <- transfer_if_open(conn, state, from) do
{:ok, :closed}
end

:fail_safe_timeout ->
Conn.close(conn)
{:fail_safe_timeout, :closed}
end

{:error, conn, error} ->
{{:error, error}, transfer_if_open(conn, state, from)}
end

with {to, message} <- stop_notify do
send(to, message)
end

return
end,
pool_timeout
)
Expand All @@ -92,12 +113,18 @@ defmodule Finch.HTTP1.Pool do
stream =
fn
{:cont, acc}, function ->
Process.link(holder)
case Process.alive?(holder) do
true ->
Process.link(holder)

false ->
raise "Process holding the connection in pool died" <>
" before the stream enumeration started." <>
" Most likely fail_safe_timeout occured"
end

try do
with {:ok, conn} <- Conn.connect(conn, name),
{:ok, conn} <- Conn.transfer(conn, self()),
{:ok, conn, acc} <-
with {:ok, conn, acc} <-
Conn.request(
conn,
req,
Expand All @@ -109,19 +136,21 @@ defmodule Finch.HTTP1.Pool do
idle_time
) do
send(holder, {ref, :stop, conn})
{:cont, acc}
{:done, acc}
else
{:error, conn, _error} ->
{:error, conn, error} ->
send(holder, {ref, :stop, conn})
raise error
end
catch
class, reason ->
send(holder, {ref, :stop, conn})
:erlang.raise(class, reason, __STACKTRACE__)
end

{_, _acc}, _function ->
{_, acc}, _function ->
send(holder, {ref, :stop, conn})
{:done, acc}
end

{:ok, stream}
Expand Down
72 changes: 72 additions & 0 deletions test/finch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,78 @@ defmodule FinchTest do
:continue -> :ok
end
end

test "Stream can be halted", %{
bypass: bypass,
finch_name: finch_name
} do
start_supervised!({Finch, name: finch_name})

Bypass.expect_once(bypass, "GET", "/", fn conn ->
Plug.Conn.send_resp(conn, 200, "OK")
end)

request = Finch.build("GET", endpoint(bypass))

assert {:ok, stream} =
Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped})

assert [status: 200] = Enum.take_while(stream, &(not match?({:headers, _}, &1)))

assert_receive :stopped
end

test "Raising in stream closes the connection", %{
bypass: bypass,
finch_name: finch_name
} do
start_supervised!({Finch, name: finch_name})

Bypass.expect_once(bypass, "GET", "/", fn conn ->
Plug.Conn.send_resp(conn, 200, "OK")
end)

request = Finch.build("GET", endpoint(bypass))

assert {:ok, stream} =
Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped})

assert_raise RuntimeError, fn ->
Enum.map(stream, fn _ -> raise "oops" end)
end

assert_receive :stopped
end

test "Fail safe timeout works", %{
bypass: bypass,
finch_name: finch_name
} do
start_supervised!({Finch, name: finch_name})

Process.flag(:trap_exit, true)

Bypass.stub(bypass, "GET", "/", fn conn ->
Process.sleep(1_000)
Plug.Conn.send_resp(conn, 200, "OK")
end)

request = Finch.build("GET", endpoint(bypass))

assert {:ok, stream} =
Finch.actual_stream(request, finch_name,
stop_notify: {self(), :stopped},
fail_safe_timeout: 100
)

Process.sleep(200)

assert_raise RuntimeError, fn ->
Enum.to_list(stream)
end

assert_receive :stopped
end
end

describe "stream/5" do
Expand Down

0 comments on commit 6b18ebf

Please sign in to comment.