From 472fb9eb40bc39f4b038e342cad2860135972b17 Mon Sep 17 00:00:00 2001 From: Sergey Snozyk Date: Mon, 31 Jul 2023 12:22:16 +0200 Subject: [PATCH] fix conn --- lib/minch/client.ex | 38 ++++++++-------- lib/minch/conn.ex | 88 ++++++++++++++++++++++---------------- lib/minch/simple_client.ex | 20 ++++----- mix.exs | 2 +- 4 files changed, 82 insertions(+), 66 deletions(-) diff --git a/lib/minch/client.ex b/lib/minch/client.ex index b2c6790..1ffe539 100644 --- a/lib/minch/client.ex +++ b/lib/minch/client.ex @@ -54,7 +54,7 @@ defmodule Minch.Client do {:noreply, %{state | conn: conn}} {:error, error} -> - handle_disconnect(error, state) + {:noreply, handle_disconnect(state, error)} end end @@ -69,27 +69,27 @@ defmodule Minch.Client do def handle_info(message, state) do case Minch.Conn.stream(state.conn, message) do - {:ok, conn, []} -> - handle_connect(%{state | conn: conn}) + {:ok, conn, response} -> + {:noreply, %{state | conn: conn} |> handle_connect(response) |> handle_frames(response)} - {:ok, conn, frames} -> - {:noreply, handle_frames(%{state | conn: conn}, frames)} - - {:error, error} -> - handle_disconnect(error, state) + {:error, conn, error} -> + Minch.Conn.close(conn) + {:noreply, handle_disconnect(%{state | conn: nil}, error)} :unknown -> do_handle_info(message, state) end end + defp handle_frames(state, %{frames: frames}) do + handle_frames(state, frames) + end + defp handle_frames(state, [frame | rest]) do - frame - |> handle_frame(state) - |> handle_frames(rest) + frame |> handle_frame(state) |> handle_frames(rest) end - defp handle_frames(state, []), do: state + defp handle_frames(state, _), do: state defp handle_frame(frame, state) do case state.callback.handle_frame(frame, state.callback_state) do @@ -113,27 +113,27 @@ defmodule Minch.Client do end end - defp handle_connect(state) do + defp handle_connect(state, %{status: _}) do case state.callback.handle_connect(state.callback_state) do {:ok, callback_state} -> - {:noreply, %{state | callback_state: callback_state}} + %{state | callback_state: callback_state} {:reply, frame, callback_state} -> {:ok, state} = send_reply(state, frame) - {:noreply, %{state | callback_state: callback_state}} + %{state | callback_state: callback_state} end end - defp handle_disconnect(error, state) do - state = %{state | conn: nil} + defp handle_connect(state, _response), do: state + defp handle_disconnect(state, error) do case state.callback.handle_disconnect(error, state.callback_state) do {:reconnect, backoff, callback_state} -> timer = Process.send_after(self(), {:"$minch", :reconnect}, backoff) - {:noreply, %{state | callback_state: callback_state, timer: timer}} + %{state | callback_state: callback_state, timer: timer} {:ok, callback_state} -> - {:noreply, %{state | callback_state: callback_state}} + %{state | callback_state: callback_state} end end diff --git a/lib/minch/conn.ex b/lib/minch/conn.ex index 91438ff..90de183 100644 --- a/lib/minch/conn.ex +++ b/lib/minch/conn.ex @@ -1,17 +1,25 @@ defmodule Minch.Conn do + @moduledoc false + @type t :: %__MODULE__{ conn: Mint.HTTP.t(), request_ref: Mint.Types.request_ref(), - status: non_neg_integer() | nil, - headers: Mint.Types.headers() | nil, websocket: Mint.WebSocket.t() | nil } - defstruct [:conn, :request_ref, :websocket, :status, :headers] + defstruct [:conn, :request_ref, :websocket] + + @type response :: %{ + optional(:status) => Mint.Types.status(), + optional(:headers) => Mint.Types.headers(), + optional(:data) => binary(), + optional(:error) => term(), + optional(:frames) => [Mint.WebSocket.frame()] + } @spec open?(t()) :: boolean() def open?(state) do - Mint.HTTP.open?(state.conn) and not is_nil(state.websocket) + Mint.HTTP.open?(state.conn) end @spec open(String.t() | URI.t(), Mint.Types.headers(), Keyword.t()) :: @@ -58,8 +66,8 @@ defmodule Minch.Conn do @spec close(t()) :: t() def close(c) do - if open?(c) do - send_frame(c, :close) + if Mint.HTTP.open?(c.conn) do + unless is_nil(c.websocket), do: send_frame(c, :close) {:ok, conn} = Mint.HTTP.close(c.conn) %{c | conn: conn} else @@ -69,8 +77,8 @@ defmodule Minch.Conn do @spec send_frame(t(), Mint.WebSocket.frame() | Mint.WebSocket.shorthand_frame()) :: {:ok, t()} | {:error, t(), term()} - def send_frame(c, frame) when c.websocket != nil do - case Mint.WebSocket.encode(c.websocket, frame) do + def send_frame(%{websocket: websocket} = c, frame) when websocket != nil do + case Mint.WebSocket.encode(websocket, frame) do {:ok, websocket, data} -> case Mint.WebSocket.stream_request_body(c.conn, c.request_ref, data) do {:ok, conn} -> @@ -86,50 +94,58 @@ defmodule Minch.Conn do end @spec stream(t(), term()) :: - {:ok, t(), [Mint.WebSocket.frame()]} | {:error, Mint.WebSocket.error()} | :unknown + {:ok, t(), response()} + | {:error, t(), Mint.WebSocket.error()} + | :unknown def stream(c, http_reply) do case Mint.WebSocket.stream(c.conn, http_reply) do {:ok, conn, responses} -> - handle_responses(%{c | conn: conn}, responses) + responses + |> build_response(c.request_ref) + |> handle_response(%{c | conn: conn}) {:error, conn, error, _responses} -> - Mint.HTTP.close(conn) - {:error, error} + {:error, %{c | conn: conn}, error} :unknown -> :unknown end end - # upgrade response - defp handle_responses(%{conn: conn, request_ref: ref, websocket: nil} = c, responses) do - resp = - Enum.reduce(responses, %{status: nil, headers: nil, data: nil}, fn - {:status, ^ref, status}, resp -> %{resp | status: status} - {:headers, ^ref, headers}, resp -> %{resp | headers: headers} - {:data, ^ref, data}, resp -> %{resp | data: data} - {:done, ^ref}, resp -> resp - end) - - case Mint.WebSocket.new(conn, ref, resp.status, resp.headers) do - {:ok, conn, websocket} -> - {:ok, %{c | conn: conn, websocket: websocket}, []} + defp build_response(responses, ref, response \\ %{}) - {:error, conn, error} -> - Mint.HTTP.close(conn) - {:error, error} - end + defp build_response([{key, ref, val} | rest], ref, response) do + build_response(rest, ref, Map.put(response, key, val)) end - # websocket frames - defp handle_responses(%{request_ref: ref} = c, [{:data, ref, data}]) do - case Mint.WebSocket.decode(c.websocket, data) do + defp build_response([{:done, ref}], ref, response), do: response + defp build_response([], _ref, response), do: response + + # WebSocket frames + defp handle_response(%{data: data} = response, %{websocket: websocket} = c) + when websocket != nil do + case Mint.WebSocket.decode(websocket, data) do {:ok, websocket, frames} -> - {:ok, %{c | websocket: websocket}, frames} + {:ok, %{c | websocket: websocket}, Map.put(response, :frames, frames)} - {:error, _websocket, error} -> - Mint.HTTP.close(c.conn) - {:error, error} + {:error, websocket, error} -> + {:error, %{c | websocket: websocket}, error} + end + end + + # upgrade response + defp handle_response(%{status: status, headers: headers} = response, c) do + case Mint.WebSocket.new(c.conn, c.request_ref, status, headers) do + {:ok, conn, websocket} -> + c = %{c | conn: conn, websocket: websocket} + + case response do + %{data: _} -> handle_response(response, c) + _no_frames -> {:ok, c, response} + end + + {:error, conn, error} -> + {:error, %{c | conn: conn}, error} end end end diff --git a/lib/minch/simple_client.ex b/lib/minch/simple_client.ex index 8b4d10e..b739ae2 100644 --- a/lib/minch/simple_client.ex +++ b/lib/minch/simple_client.ex @@ -62,27 +62,27 @@ defmodule Minch.SimpleClient do def handle_info(message, state) do case Minch.Conn.stream(state.conn, message) do - {:ok, conn, []} -> - {:noreply, reply(%{state | conn: conn}, {:ok, conn.request_ref})} + {:ok, conn, response} -> + {:noreply, + %{state | conn: conn} + |> reply({:ok, conn.request_ref}) + |> handle_frames(Map.get(response, :frames, []))} - {:ok, conn, frames} -> - {:noreply, handle_frames(%{state | conn: conn}, frames)} - - {:error, error} -> - {:stop, {:shutdown, error}, reply(state, {:error, error})} + {:error, conn, error} -> + {:stop, {:shutdown, error}, reply(%{state | conn: conn}, {:error, error})} :unknown -> {:noreply, state} end end - defp reply(%{caller: nil} = state, _message), do: state - - defp reply(%{caller: caller} = state, message) do + defp reply(%{caller: caller} = state, message) when caller != nil do GenServer.reply(caller, message) %{state | caller: nil} end + defp reply(state, _), do: state + defp handle_frames(state, [frame | rest]) do frame |> handle_frame(state) diff --git a/mix.exs b/mix.exs index 2901931..17c8319 100644 --- a/mix.exs +++ b/mix.exs @@ -21,7 +21,7 @@ defmodule Minch.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ - {:cowboy, "~> 2.9", optional: true}, + {:cowboy, "~> 2.9", only: :test}, {:ex_doc, "~> 0.27", only: :dev, runtime: false}, {:mint_web_socket, "~> 1.0"} ]