Skip to content

Commit

Permalink
fix conn
Browse files Browse the repository at this point in the history
  • Loading branch information
nmbrone committed Jul 31, 2023
1 parent 7b70e7e commit 472fb9e
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 66 deletions.
38 changes: 19 additions & 19 deletions lib/minch/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ defmodule Minch.Client do
{:noreply, %{state | conn: conn}}

{:error, error} ->
handle_disconnect(error, state)
{:noreply, handle_disconnect(state, error)}
end
end

Expand All @@ -69,27 +69,27 @@ defmodule Minch.Client do

def handle_info(message, state) do
case Minch.Conn.stream(state.conn, message) do
{:ok, conn, []} ->
handle_connect(%{state | conn: conn})
{:ok, conn, response} ->
{:noreply, %{state | conn: conn} |> handle_connect(response) |> handle_frames(response)}

{:ok, conn, frames} ->
{:noreply, handle_frames(%{state | conn: conn}, frames)}

{:error, error} ->
handle_disconnect(error, state)
{:error, conn, error} ->
Minch.Conn.close(conn)
{:noreply, handle_disconnect(%{state | conn: nil}, error)}

:unknown ->
do_handle_info(message, state)
end
end

defp handle_frames(state, %{frames: frames}) do
handle_frames(state, frames)
end

defp handle_frames(state, [frame | rest]) do
frame
|> handle_frame(state)
|> handle_frames(rest)
frame |> handle_frame(state) |> handle_frames(rest)
end

defp handle_frames(state, []), do: state
defp handle_frames(state, _), do: state

defp handle_frame(frame, state) do
case state.callback.handle_frame(frame, state.callback_state) do
Expand All @@ -113,27 +113,27 @@ defmodule Minch.Client do
end
end

defp handle_connect(state) do
defp handle_connect(state, %{status: _}) do
case state.callback.handle_connect(state.callback_state) do
{:ok, callback_state} ->
{:noreply, %{state | callback_state: callback_state}}
%{state | callback_state: callback_state}

{:reply, frame, callback_state} ->
{:ok, state} = send_reply(state, frame)
{:noreply, %{state | callback_state: callback_state}}
%{state | callback_state: callback_state}
end
end

defp handle_disconnect(error, state) do
state = %{state | conn: nil}
defp handle_connect(state, _response), do: state

defp handle_disconnect(state, error) do
case state.callback.handle_disconnect(error, state.callback_state) do
{:reconnect, backoff, callback_state} ->
timer = Process.send_after(self(), {:"$minch", :reconnect}, backoff)
{:noreply, %{state | callback_state: callback_state, timer: timer}}
%{state | callback_state: callback_state, timer: timer}

{:ok, callback_state} ->
{:noreply, %{state | callback_state: callback_state}}
%{state | callback_state: callback_state}
end
end

Expand Down
88 changes: 52 additions & 36 deletions lib/minch/conn.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
defmodule Minch.Conn do
@moduledoc false

@type t :: %__MODULE__{
conn: Mint.HTTP.t(),
request_ref: Mint.Types.request_ref(),
status: non_neg_integer() | nil,
headers: Mint.Types.headers() | nil,
websocket: Mint.WebSocket.t() | nil
}

defstruct [:conn, :request_ref, :websocket, :status, :headers]
defstruct [:conn, :request_ref, :websocket]

@type response :: %{
optional(:status) => Mint.Types.status(),
optional(:headers) => Mint.Types.headers(),
optional(:data) => binary(),
optional(:error) => term(),
optional(:frames) => [Mint.WebSocket.frame()]
}

@spec open?(t()) :: boolean()
def open?(state) do
Mint.HTTP.open?(state.conn) and not is_nil(state.websocket)
Mint.HTTP.open?(state.conn)
end

@spec open(String.t() | URI.t(), Mint.Types.headers(), Keyword.t()) ::
Expand Down Expand Up @@ -58,8 +66,8 @@ defmodule Minch.Conn do

@spec close(t()) :: t()
def close(c) do
if open?(c) do
send_frame(c, :close)
if Mint.HTTP.open?(c.conn) do
unless is_nil(c.websocket), do: send_frame(c, :close)
{:ok, conn} = Mint.HTTP.close(c.conn)
%{c | conn: conn}
else
Expand All @@ -69,8 +77,8 @@ defmodule Minch.Conn do

@spec send_frame(t(), Mint.WebSocket.frame() | Mint.WebSocket.shorthand_frame()) ::
{:ok, t()} | {:error, t(), term()}
def send_frame(c, frame) when c.websocket != nil do
case Mint.WebSocket.encode(c.websocket, frame) do
def send_frame(%{websocket: websocket} = c, frame) when websocket != nil do
case Mint.WebSocket.encode(websocket, frame) do
{:ok, websocket, data} ->
case Mint.WebSocket.stream_request_body(c.conn, c.request_ref, data) do
{:ok, conn} ->
Expand All @@ -86,50 +94,58 @@ defmodule Minch.Conn do
end

@spec stream(t(), term()) ::
{:ok, t(), [Mint.WebSocket.frame()]} | {:error, Mint.WebSocket.error()} | :unknown
{:ok, t(), response()}
| {:error, t(), Mint.WebSocket.error()}
| :unknown
def stream(c, http_reply) do
case Mint.WebSocket.stream(c.conn, http_reply) do
{:ok, conn, responses} ->
handle_responses(%{c | conn: conn}, responses)
responses
|> build_response(c.request_ref)
|> handle_response(%{c | conn: conn})

{:error, conn, error, _responses} ->
Mint.HTTP.close(conn)
{:error, error}
{:error, %{c | conn: conn}, error}

:unknown ->
:unknown
end
end

# upgrade response
defp handle_responses(%{conn: conn, request_ref: ref, websocket: nil} = c, responses) do
resp =
Enum.reduce(responses, %{status: nil, headers: nil, data: nil}, fn
{:status, ^ref, status}, resp -> %{resp | status: status}
{:headers, ^ref, headers}, resp -> %{resp | headers: headers}
{:data, ^ref, data}, resp -> %{resp | data: data}
{:done, ^ref}, resp -> resp
end)

case Mint.WebSocket.new(conn, ref, resp.status, resp.headers) do
{:ok, conn, websocket} ->
{:ok, %{c | conn: conn, websocket: websocket}, []}
defp build_response(responses, ref, response \\ %{})

{:error, conn, error} ->
Mint.HTTP.close(conn)
{:error, error}
end
defp build_response([{key, ref, val} | rest], ref, response) do
build_response(rest, ref, Map.put(response, key, val))
end

# websocket frames
defp handle_responses(%{request_ref: ref} = c, [{:data, ref, data}]) do
case Mint.WebSocket.decode(c.websocket, data) do
defp build_response([{:done, ref}], ref, response), do: response
defp build_response([], _ref, response), do: response

# WebSocket frames
defp handle_response(%{data: data} = response, %{websocket: websocket} = c)
when websocket != nil do
case Mint.WebSocket.decode(websocket, data) do
{:ok, websocket, frames} ->
{:ok, %{c | websocket: websocket}, frames}
{:ok, %{c | websocket: websocket}, Map.put(response, :frames, frames)}

{:error, _websocket, error} ->
Mint.HTTP.close(c.conn)
{:error, error}
{:error, websocket, error} ->
{:error, %{c | websocket: websocket}, error}
end
end

# upgrade response
defp handle_response(%{status: status, headers: headers} = response, c) do
case Mint.WebSocket.new(c.conn, c.request_ref, status, headers) do
{:ok, conn, websocket} ->
c = %{c | conn: conn, websocket: websocket}

case response do
%{data: _} -> handle_response(response, c)
_no_frames -> {:ok, c, response}
end

{:error, conn, error} ->
{:error, %{c | conn: conn}, error}
end
end
end
20 changes: 10 additions & 10 deletions lib/minch/simple_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,27 @@ defmodule Minch.SimpleClient do

def handle_info(message, state) do
case Minch.Conn.stream(state.conn, message) do
{:ok, conn, []} ->
{:noreply, reply(%{state | conn: conn}, {:ok, conn.request_ref})}
{:ok, conn, response} ->
{:noreply,
%{state | conn: conn}
|> reply({:ok, conn.request_ref})
|> handle_frames(Map.get(response, :frames, []))}

{:ok, conn, frames} ->
{:noreply, handle_frames(%{state | conn: conn}, frames)}

{:error, error} ->
{:stop, {:shutdown, error}, reply(state, {:error, error})}
{:error, conn, error} ->
{:stop, {:shutdown, error}, reply(%{state | conn: conn}, {:error, error})}

:unknown ->
{:noreply, state}
end
end

defp reply(%{caller: nil} = state, _message), do: state

defp reply(%{caller: caller} = state, message) do
defp reply(%{caller: caller} = state, message) when caller != nil do
GenServer.reply(caller, message)
%{state | caller: nil}
end

defp reply(state, _), do: state

defp handle_frames(state, [frame | rest]) do
frame
|> handle_frame(state)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Minch.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:cowboy, "~> 2.9", optional: true},
{:cowboy, "~> 2.9", only: :test},
{:ex_doc, "~> 0.27", only: :dev, runtime: false},
{:mint_web_socket, "~> 1.0"}
]
Expand Down

0 comments on commit 472fb9e

Please sign in to comment.