Skip to content

Commit

Permalink
fix: prevent client <-> db locking (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
abc3 authored Oct 25, 2023
1 parent c6e7335 commit da8de63
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 24 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.22
0.9.23
18 changes: 9 additions & 9 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ defmodule Supavisor.ClientHandler do
@impl true
def callback_mode, do: [:handle_event_function]

def client_call(pid, bin, ready?) do
:gen_statem.call(pid, {:client_call, bin, ready?}, 5000)
def client_cast(pid, bin, ready?) do
:gen_statem.cast(pid, {:client_cast, bin, ready?})
end

@impl true
Expand Down Expand Up @@ -303,7 +303,7 @@ defmodule Supavisor.ClientHandler do
end

def handle_event(_, {proto, _, bin}, :busy, data) when proto in [:tcp, :ssl] do
case Db.call(data.db_pid, bin) do
case Db.call(data.db_pid, self(), bin) do
:ok ->
Logger.debug("DB call success")
:keep_state_and_data
Expand Down Expand Up @@ -366,11 +366,11 @@ defmodule Supavisor.ClientHandler do
end
end

# emulate handle_call
def handle_event({:call, from}, {:client_call, bin, ready?}, _, data) do
# emulate handle_cast
def handle_event(:cast, {:client_cast, bin, ready?}, _, data) do
Logger.debug("--> --> bin #{inspect(byte_size(bin))} bytes")

reply = {:reply, from, HH.sock_send(data.sock, bin)}
:ok = HH.sock_send(data.sock, bin)

if ready? do
Logger.debug("Client is ready")
Expand All @@ -382,15 +382,15 @@ defmodule Supavisor.ClientHandler do

actions =
if data.idle_timeout > 0 do
[reply, idle_check(data.idle_timeout)]
idle_check(data.idle_timeout)
else
reply
[]
end

{:next_state, :idle, %{data | db_pid: db_pid, stats: stats}, actions}
else
Logger.debug("Client is not ready")
{:keep_state_and_data, reply}
:keep_state_and_data
end
end

Expand Down
23 changes: 12 additions & 11 deletions lib/supavisor/db_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ defmodule Supavisor.DbHandler do
:gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
end

@spec call(pid(), binary()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()}
def call(pid, msg) do
:gen_statem.call(pid, {:db_call, msg})
@spec call(pid(), pid(), binary()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()}
def call(pid, caller, msg) do
:gen_statem.call(pid, {:db_call, caller, msg}, 15_000)
end

@impl true
Expand Down Expand Up @@ -240,32 +240,33 @@ defmodule Supavisor.DbHandler do
{:keep_state, %{data | buffer: []}}
end

def handle_event(:info, {_proto, _, bin}, _, data) do
def handle_event(:info, {_proto, _, bin}, _, %{caller: caller} = data) when is_pid(caller) do
# check if the response ends with "ready for query"
ready = String.ends_with?(bin, Server.ready_for_query())
:ok = Client.client_call(data.caller, bin, ready)
Logger.debug("Db ready #{inspect(ready)}")
:ok = Client.client_cast(caller, bin, ready)

if ready do
{_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats)
{:keep_state, %{data | stats: stats}}
{:keep_state, %{data | stats: stats, caller: nil}}
else
:keep_state_and_data
end
end

def handle_event({:call, {pid, _} = from}, {:db_call, bin}, :idle, %{sock: sock} = data) do
def handle_event({:call, from}, {:db_call, caller, bin}, :idle, %{sock: sock} = data) do
reply = {:reply, from, sock_send(sock, bin)}
{:keep_state, %{data | caller: pid}, reply}
{:keep_state, %{data | caller: caller}, reply}
end

def handle_event({:call, {pid, _} = from}, {:db_call, bin}, state, %{buffer: buff} = data) do
def handle_event({:call, from}, {:db_call, caller, bin}, state, %{buffer: buff} = data) do
Logger.debug(
"state #{state} <-- <-- bin #{inspect(byte_size(bin))} bytes, caller: #{inspect(pid)}"
"state #{state} <-- <-- bin #{inspect(byte_size(bin))} bytes, caller: #{inspect(caller)}"
)

new_buff = [bin | buff]
reply = {:reply, from, {:buffering, IO.iodata_length(new_buff)}}
{:keep_state, %{data | caller: pid, buffer: new_buff}, reply}
{:keep_state, %{data | caller: caller, buffer: new_buff}, reply}
end

def handle_event(:info, {:tcp_closed, sock}, state, %{sock: sock} = data) do
Expand Down
6 changes: 3 additions & 3 deletions test/supavisor/db_handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ defmodule Supavisor.DbHandlerTest do
data = %{sock: {:gen_tcp, sock}, caller: nil, buffer: []}
from = {self(), :test_ref}
event = {:call, from}
payload = {:db_call, "test_data"}
payload = {:db_call, self(), "test_data"}

{:keep_state, new_data, reply} = Db.handle_event(event, payload, :idle, data)

Expand All @@ -107,10 +107,10 @@ defmodule Supavisor.DbHandlerTest do
end

test "handle_event/4 with non-idle state" do
data = %{sock: nil, caller: nil, buffer: []}
data = %{sock: nil, caller: self(), buffer: []}
from = {self(), :test_ref}
event = {:call, from}
payload = {:db_call, "test_data"}
payload = {:db_call, self(), "test_data"}
state = :non_idle

{:keep_state, new_data, reply} = Db.handle_event(event, payload, state, data)
Expand Down

0 comments on commit da8de63

Please sign in to comment.