From 6b18ebffd7316b19e899ae7b0260fc3deacfa38a Mon Sep 17 00:00:00 2001 From: hissssst Date: Sun, 4 Aug 2024 16:18:19 +0200 Subject: [PATCH] Fail safe timeout implementation and tests --- lib/finch.ex | 13 +++++++- lib/finch/http1/pool.ex | 61 +++++++++++++++++++++++++--------- test/finch_test.exs | 72 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 17 deletions(-) diff --git a/lib/finch.ex b/lib/finch.ex index e29fd79..046354b 100644 --- a/lib/finch.ex +++ b/lib/finch.ex @@ -309,7 +309,18 @@ defmodule Finch do @doc """ Like `stream/5`, but returns valid `Stream` structure, safe for passing between processes, but working __only__ - on local node. Works only for HTTP1, HTTP2 not supported currently + on local node. Works only for HTTP1, HTTP2 not supported currently. + + ## Options + + Same as `request/3` plus + + * `:fail_safe_timeout` (`t:timeout/0`) (optional) (default is 15 minutes) - timeout in milliseconds. + Since this function returns an Enumerable which is lazily executed, it makes sense to + have a timeout which will close the connection in case it's never read from in erroneous situations. + + * `:stop_notify` (`{t:GenServer.name/0 | t:pid/0, t:any/0}`) (optional) - destination and message which + will be notified once connection is returned to pool or closed. """ @spec actual_stream(Request.t(), name(), request_opts()) :: {:ok, Enumerable.t()} | {:error, Exception.t()} diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index 076b492..bb992b4 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -44,6 +44,7 @@ defmodule Finch.HTTP1.Pool do receive_timeout = Keyword.get(opts, :receive_timeout, 15_000) request_timeout = Keyword.get(opts, :request_timeout, 30_000) fail_safe_timeout = Keyword.get(opts, :fail_safe_timeout, 15 * 60_000) + stop_notify = Keyword.get(opts, :stop_notify, nil) metadata = %{request: req, pool: pool, name: name} @@ -58,17 +59,37 @@ defmodule Finch.HTTP1.Pool do pool, :checkout, fn from, {state, conn, idle_time} -> + if fail_safe_timeout != :infinity do + Process.send_after(self(), :fail_safe_timeout, fail_safe_timeout) + end + Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time}) - send(owner, {ref, :ok, {conn, idle_time}}) - - receive do - {^ref, :stop, conn} -> - transfer_if_open(conn, state, from) - after - fail_safe_timeout -> - {_, state} = transfer_if_open(conn, state, from) - {:fail_safe_timeout, state} + + return = + case Conn.connect(conn, name) do + {:ok, conn} -> + send(owner, {ref, :ok, {conn, idle_time}}) + + receive do + {^ref, :stop, conn} -> + with :closed <- transfer_if_open(conn, state, from) do + {:ok, :closed} + end + + :fail_safe_timeout -> + Conn.close(conn) + {:fail_safe_timeout, :closed} + end + + {:error, conn, error} -> + {{:error, error}, transfer_if_open(conn, state, from)} + end + + with {to, message} <- stop_notify do + send(to, message) end + + return end, pool_timeout ) @@ -92,12 +113,18 @@ defmodule Finch.HTTP1.Pool do stream = fn {:cont, acc}, function -> - Process.link(holder) + case Process.alive?(holder) do + true -> + Process.link(holder) + + false -> + raise "Process holding the connection in pool died" <> + " before the stream enumeration started." <> + " Most likely fail_safe_timeout occured" + end try do - with {:ok, conn} <- Conn.connect(conn, name), - {:ok, conn} <- Conn.transfer(conn, self()), - {:ok, conn, acc} <- + with {:ok, conn, acc} <- Conn.request( conn, req, @@ -109,10 +136,11 @@ defmodule Finch.HTTP1.Pool do idle_time ) do send(holder, {ref, :stop, conn}) - {:cont, acc} + {:done, acc} else - {:error, conn, _error} -> + {:error, conn, error} -> send(holder, {ref, :stop, conn}) + raise error end catch class, reason -> @@ -120,8 +148,9 @@ defmodule Finch.HTTP1.Pool do :erlang.raise(class, reason, __STACKTRACE__) end - {_, _acc}, _function -> + {_, acc}, _function -> send(holder, {ref, :stop, conn}) + {:done, acc} end {:ok, stream} diff --git a/test/finch_test.exs b/test/finch_test.exs index e84950f..012bc91 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -697,6 +697,78 @@ defmodule FinchTest do :continue -> :ok end end + + test "Stream can be halted", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped}) + + assert [status: 200] = Enum.take_while(stream, &(not match?({:headers, _}, &1))) + + assert_receive :stopped + end + + test "Raising in stream closes the connection", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped}) + + assert_raise RuntimeError, fn -> + Enum.map(stream, fn _ -> raise "oops" end) + end + + assert_receive :stopped + end + + test "Fail safe timeout works", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Process.flag(:trap_exit, true) + + Bypass.stub(bypass, "GET", "/", fn conn -> + Process.sleep(1_000) + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, + stop_notify: {self(), :stopped}, + fail_safe_timeout: 100 + ) + + Process.sleep(200) + + assert_raise RuntimeError, fn -> + Enum.to_list(stream) + end + + assert_receive :stopped + end end describe "stream/5" do