diff --git a/VERSION b/VERSION index 470abefa..03f7611d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.1.67 +1.1.68 diff --git a/lib/supavisor/application.ex b/lib/supavisor/application.ex index 0a90c5e5..11ed773c 100644 --- a/lib/supavisor/application.ex +++ b/lib/supavisor/application.ex @@ -34,19 +34,23 @@ defmodule Supavisor.Application do ] for {key, port, mode} <- proxy_ports do - :ranch.start_listener( - key, - :ranch_tcp, - %{ - max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "25000"), - num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"), - socket_opts: [port: port, keepalive: true] - }, - Supavisor.ClientHandler, - %{mode: mode} - ) - |> then(&"Proxy started #{mode} on port #{port}, result: #{inspect(&1)}") - |> Logger.warning() + case :ranch.start_listener( + key, + :ranch_tcp, + %{ + max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "25000"), + num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"), + socket_opts: [inet_backend: :socket, port: port, keepalive: true] + }, + Supavisor.ClientHandler, + %{mode: mode} + ) do + {:ok, _pid} -> + Logger.notice("Proxy started #{mode} on port #{port}") + + error -> + Logger.error("Proxy on #{port} not started because of #{inspect(error)}") + end end :syn.set_event_handler(Supavisor.SynHandler) diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index 4c46f5fe..1ce30ec7 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -11,6 +11,31 @@ defmodule Supavisor.ClientHandler do @behaviour :ranch_protocol @behaviour :partisan_gen_statem + defstruct [ + :id, + :sock, + :trans, + :db_pid, + :tenant, + :user, + :pool, + :manager, + :query_start, + :timeout, + :ps, + :ssl, + :auth_secrets, + :proxy_type, + :mode, + :stats, + :idle_timeout, + :db_name, + :last_query, + :heartbeat_interval, + :connection_start, + :log_level + ] + alias Supavisor, as: S alias Supavisor.DbHandler, as: Db alias Supavisor.Helpers, as: H @@ -18,7 +43,7 @@ defmodule Supavisor.ClientHandler do alias Supavisor.{Tenants, Monitoring.Telem, Protocol.Client, Protocol.Server} @impl true - def start_link(ref, _sock, transport, opts) do + def start_link(ref, transport, opts) do pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts]) {:ok, pid} end @@ -45,7 +70,7 @@ defmodule Supavisor.ClientHandler do :ok = trans.setopts(sock, active: true) Logger.debug("ClientHandler is: #{inspect(self())}") - data = %{ + data = %__MODULE__{ id: nil, sock: {:gen_tcp, sock}, trans: trans, @@ -74,7 +99,7 @@ defmodule Supavisor.ClientHandler do end @impl true - def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :exchange, data) do + def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :exchange, %__MODULE__{} = data) do Logger.debug("ClientHandler: Client is trying to request HTTP") HH.sock_send( @@ -93,7 +118,7 @@ defmodule Supavisor.ClientHandler do end # send cancel request to db - def handle_event(:info, :cancel_query, :busy, data) do + def handle_event(:info, :cancel_query, :busy, %__MODULE__{} = data) do key = {data.tenant, data.db_pid} Logger.debug("ClientHandler: Cancel query for #{inspect(key)}") {_pool, db_pid} = data.db_pid @@ -111,7 +136,7 @@ defmodule Supavisor.ClientHandler do :keep_state_and_data end - def handle_event(:info, {:tcp, _, <<_::64>>}, :exchange, %{sock: sock} = data) do + def handle_event(:info, {:tcp, _, <<_::64>>}, :exchange, %__MODULE__{sock: sock} = data) do Logger.debug("ClientHandler: Client is trying to connect with SSL") downstream_cert = H.downstream_cert() @@ -131,7 +156,7 @@ defmodule Supavisor.ClientHandler do {:ok, ssl_sock} -> socket = {:ssl, ssl_sock} :ok = HH.setopts(socket, active: true) - {:keep_state, %{data | sock: socket, ssl: true}} + {:keep_state, %__MODULE__{data | sock: socket, ssl: true}} error -> Logger.error("ClientHandler: SSL handshake error: #{inspect(error)}") @@ -139,7 +164,7 @@ defmodule Supavisor.ClientHandler do {:stop, {:shutdown, :ssl_handshake_error}} end else - Logger.error( + Logger.warning( "ClientHandler: User requested SSL connection but no downstream cert/key found" ) @@ -148,7 +173,7 @@ defmodule Supavisor.ClientHandler do end end - def handle_event(:info, {_, _, bin}, :exchange, data) do + def handle_event(:info, {_, _, bin}, :exchange, %__MODULE__{} = data) do case Server.decode_startup_packet(bin) do {:ok, hello} -> Logger.debug("ClientHandler: Client startup message: #{inspect(hello)}") @@ -186,7 +211,7 @@ defmodule Supavisor.ClientHandler do :internal, {:hello, {type, {user, tenant_or_alias, db_name}}}, :exchange, - %{sock: sock} = data + %__MODULE__{sock: sock} = data ) do sni_hostname = HH.try_get_sni(sock) @@ -274,7 +299,7 @@ defmodule Supavisor.ClientHandler do :internal, {:handle, {method, secrets}, info}, _, - %{sock: sock} = data + %__MODULE__{sock: sock} = data ) do Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(method)}") @@ -341,7 +366,7 @@ defmodule Supavisor.ClientHandler do end end - def handle_event(:internal, :subscribe, _, data) do + def handle_event(:internal, :subscribe, _, %__MODULE__{} = data) do Logger.debug("ClientHandler: Subscribe to tenant #{inspect(data.id)}") with {:ok, sup} <- @@ -374,7 +399,7 @@ defmodule Supavisor.ClientHandler do end end - def handle_event(:internal, {:greetings, ps}, _, %{sock: sock} = data) do + def handle_event(:internal, {:greetings, ps}, _, %__MODULE__{sock: sock} = data) do {header, <> = payload} = Server.backend_key_data() msg = [ps, [header, payload], Server.ready_for_query()] :ok = HH.listen_cancel_query(pid, key) @@ -387,7 +412,7 @@ defmodule Supavisor.ClientHandler do {:keep_state_and_data, {:next_event, :internal, :subscribe}} end - def handle_event(:timeout, :wait_ps, _, data) do + def handle_event(:timeout, :wait_ps, _, %__MODULE__{} = data) do Logger.error( "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}" ) @@ -396,12 +421,12 @@ defmodule Supavisor.ClientHandler do {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}} end - def handle_event(:timeout, :idle_terminate, _, data) do + def handle_event(:timeout, :idle_terminate, _, %__MODULE__{} = data) do Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout") {:stop, {:shutdown, :idle_terminate}} end - def handle_event(:timeout, :heartbeat_check, _, data) do + def handle_event(:timeout, :heartbeat_check, _, %__MODULE__{} = data) do Logger.debug("ClientHandler: Send heartbeat to client") HH.sock_send(data.sock, Server.application_name()) {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}} @@ -415,14 +440,14 @@ defmodule Supavisor.ClientHandler do end # handle Sync message - def handle_event(:info, {proto, _, <>}, :idle, data) + def handle_event(:info, {proto, _, <>}, :idle, %__MODULE__{} = data) when proto in [:tcp, :ssl] do Logger.debug("ClientHandler: Receive sync") :ok = HH.sock_send(data.sock, Server.ready_for_query()) {:keep_state_and_data, handle_actions(data)} end - def handle_event(:info, {proto, _, <> = msg}, _, data) + def handle_event(:info, {proto, _, <> = msg}, _, %__MODULE__{} = data) when proto in [:tcp, :ssl] do Logger.debug("ClientHandler: Receive sync while not idle") {_, db_pid} = data.db_pid @@ -430,7 +455,7 @@ defmodule Supavisor.ClientHandler do :keep_state_and_data end - def handle_event(:info, {proto, _, <> = msg}, _, data) + def handle_event(:info, {proto, _, <> = msg}, _, %__MODULE__{} = data) when proto in [:tcp, :ssl] do Logger.debug("ClientHandler: Receive flush while not idle") {_, db_pid} = data.db_pid @@ -439,7 +464,7 @@ defmodule Supavisor.ClientHandler do end # incoming query with a single pool - def handle_event(:info, {proto, _, bin}, :idle, %{pool: pid} = data) + def handle_event(:info, {proto, _, bin}, :idle, %__MODULE__{pool: pid} = data) when is_binary(bin) and is_pid(pid) do ts = System.monotonic_time() db_pid = db_checkout(:both, :on_query, data) @@ -450,7 +475,7 @@ defmodule Supavisor.ClientHandler do end # incoming query with read/write pools - def handle_event(:info, {proto, _, bin}, :idle, data) do + def handle_event(:info, {proto, _, bin}, :idle, %__MODULE__{} = data) do query_type = with {:ok, payload} <- Client.get_payload(bin), {:ok, statements} <- Supavisor.PgParser.statements(payload) do @@ -477,7 +502,7 @@ defmodule Supavisor.ClientHandler do end # forward query to db - def handle_event(_, {proto, _, bin}, :busy, data) + def handle_event(_, {proto, _, bin}, :busy, %__MODULE__{} = data) when proto in [:tcp, :ssl] do {_, db_pid} = data.db_pid @@ -517,21 +542,21 @@ defmodule Supavisor.ClientHandler do end # client closed connection - def handle_event(_, {closed, _}, _, data) + def handle_event(_, {closed, _}, _, %__MODULE__{} = data) when closed in [:tcp_closed, :ssl_closed] do Logger.debug("ClientHandler: #{closed} socket closed for #{inspect(data.tenant)}") {:stop, {:shutdown, :socket_closed}} end # linked DbHandler went down - def handle_event(:info, {:EXIT, db_pid, reason}, _, data) do + def handle_event(:info, {:EXIT, db_pid, reason}, _, %__MODULE__{} = data) do Logger.error("ClientHandler: DbHandler #{inspect(db_pid)} exited #{inspect(reason)}") HH.sock_send(data.sock, Server.error_message("XX000", "DbHandler exited")) {:stop, {:shutdown, :db_handler_exit}} end # pool's manager went down - def handle_event(:info, {:DOWN, _, _, _, reason}, state, data) do + def handle_event(:info, {:DOWN, _, _, _, reason}, state, %__MODULE__{} = data) do Logger.error( "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}" ) @@ -554,8 +579,8 @@ defmodule Supavisor.ClientHandler do end # emulate handle_cast - def handle_event(:cast, {:client_cast, bin, status}, _, data) do - Logger.debug("ClientHandler: --> --> bin #{inspect(byte_size(bin))} bytes") + def handle_event(:cast, {:client_cast, bin, status}, _, %__MODULE__{} = data) do + Logger.debug("ClientHandler: --> --> bin #{byte_size(bin)} bytes") case status do :ready_for_query -> @@ -563,12 +588,12 @@ defmodule Supavisor.ClientHandler do db_pid = handle_db_pid(data.mode, data.pool, data.db_pid) - {_, stats} = Telem.network_usage(:client, data.sock, data.id, data.stats) + # {_, stats} = Telem.network_usage(:client, data.sock, data.id, data.stats) Telem.client_query_time(data.query_start, data.id) :ok = HH.sock_send(data.sock, bin) actions = handle_actions(data) - {:next_state, :idle, %{data | db_pid: db_pid, stats: stats}, actions} + {:next_state, :idle, %__MODULE__{data | db_pid: db_pid}, actions} :continue -> Logger.debug("ClientHandler: Client is not ready") @@ -590,8 +615,8 @@ defmodule Supavisor.ClientHandler do end # emulate handle_call - def handle_event({:call, from}, {:client_call, bin, _}, _, data) do - Logger.debug("ClientHandler: --> --> bin call #{inspect(byte_size(bin))} bytes") + def handle_event({:call, from}, {:client_call, bin, _}, _, %__MODULE__{} = data) do + Logger.debug("ClientHandler: --> --> bin call #{byte_size(bin)} bytes") {:keep_state_and_data, {:reply, from, HH.sock_send(data.sock, bin)}} end @@ -612,7 +637,7 @@ defmodule Supavisor.ClientHandler do def terminate( {:timeout, {_, _, [_, {:checkout, _, _}, _]}}, _, - data + %__MODULE__{} = data ) do msg = case data.mode do @@ -789,7 +814,7 @@ defmodule Supavisor.ClientHandler do defp handle_db_pid(:session, _, db_pid), do: db_pid - defp update_user_data(data, info, user, id, db_name, mode) do + defp update_user_data(%__MODULE__{} = data, info, user, id, db_name, mode) do proxy_type = if info.tenant.require_user do :password @@ -797,7 +822,7 @@ defmodule Supavisor.ClientHandler do :auth_query end - %{ + %__MODULE__{ data | tenant: info.tenant.external_id, user: user, @@ -931,11 +956,6 @@ defmodule Supavisor.ClientHandler do def try_get_sni(_), do: nil - @spec timeout_check(atom, non_neg_integer) :: {:timeout, non_neg_integer, atom} - defp timeout_check(key, timeout) do - {:timeout, timeout, key} - end - defp db_pid_meta({_, {_, pid}} = _key) do rkey = Supavisor.Registry.PoolPids fnode = node(pid) @@ -948,10 +968,10 @@ defmodule Supavisor.ClientHandler do end @spec handle_prepared_statements({pid, pid}, binary, map) :: :ok | nil - defp handle_prepared_statements({_, pid}, bin, %{mode: :transaction} = data) do + defp handle_prepared_statements({_, pid}, bin, %__MODULE__{mode: :transaction} = data) do with {:ok, payload} <- Client.get_payload(bin), - {:ok, statamets} <- Supavisor.PgParser.statements(payload), - true <- Enum.member?([["PrepareStmt"], ["DeallocateStmt"]], statamets) do + {:ok, statements} when statements in [["PrepareStmt"], ["DeallocateStmt"]] <- + Supavisor.PgParser.statements(payload) do Logger.info("ClientHandler: Handle prepared statement #{inspect(payload)}") GenServer.call(data.pool, :get_all_workers) @@ -976,18 +996,15 @@ defmodule Supavisor.ClientHandler do defp handle_prepared_statements(_, _, _), do: nil @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}] - defp handle_actions(data) do - Enum.flat_map(data, fn - {:heartbeat_interval, v} = t when v > 0 -> - Logger.debug("ClientHandler: Call timeout #{inspect(t)}") - [timeout_check(:heartbeat_check, v)] - - {:idle_timeout, v} = t when v > 0 -> - Logger.debug("ClientHandler: Call timeout #{inspect(t)}") - [timeout_check(:idle_terminate, v)] - - _ -> - [] - end) + defp handle_actions(%__MODULE__{} = data) do + heartbeat = + if data.heartbeat_interval > 0, + do: [{:timeout, data.heartbeat_interval, :heartbeat_check}], + else: [] + + idle = + if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: [] + + heartbeat ++ idle end end diff --git a/lib/supavisor/native_handler.ex b/lib/supavisor/native_handler.ex index a02255ef..a3de4d0a 100644 --- a/lib/supavisor/native_handler.ex +++ b/lib/supavisor/native_handler.ex @@ -10,7 +10,7 @@ defmodule Supavisor.NativeHandler do alias Supavisor.{Protocol.Server, Tenants} @impl true - def start_link(ref, _sock, transport, opts) do + def start_link(ref, transport, opts) do pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts]) {:ok, pid} end diff --git a/mix.exs b/mix.exs index 111b3ab1..f8d103b9 100644 --- a/mix.exs +++ b/mix.exs @@ -73,8 +73,9 @@ defmodule Supavisor.MixProject do {:partisan, git: "https://github.com/lasp-lang/partisan.git", tag: "v5.0.0-rc.12"}, {:syn, "~> 3.3"}, {:pgo, "~> 0.13"}, - {:rustler, "~> 0.29.1"} + {:rustler, "~> 0.29.1"}, # TODO: add ranch deps + {:ranch, "~> 2.0", override: true} ] end diff --git a/mix.lock b/mix.lock index 6bc3bbe7..88338e12 100644 --- a/mix.lock +++ b/mix.lock @@ -64,7 +64,7 @@ "postgrex": {:hex, :postgrex, "0.17.3", "c92cda8de2033a7585dae8c61b1d420a1a1322421df84da9a82a6764580c503d", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "946cf46935a4fdca7a81448be76ba3503cff082df42c6ec1ff16a4bdfbfb098d"}, "prom_ex": {:hex, :prom_ex, "1.8.0", "662615e1d2f2ab3e0dc13a51c92ad0ccfcab24336a90cb9b114ee1bce9ef88aa", [:mix], [{:absinthe, ">= 1.6.0", [hex: :absinthe, repo: "hexpm", optional: true]}, {:broadway, ">= 1.0.2", [hex: :broadway, repo: "hexpm", optional: true]}, {:ecto, ">= 3.5.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:finch, "~> 0.15", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:oban, ">= 2.4.0", [hex: :oban, repo: "hexpm", optional: true]}, {:octo_fetch, "~> 0.3", [hex: :octo_fetch, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, ">= 0.14.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, ">= 1.12.1", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 2.5", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry, ">= 1.0.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "3eea763dfa941e25de50decbf17a6a94dbd2270e7b32f88279aa6e9bbb8e23e7"}, "quickrand": {:hex, :quickrand, "2.0.7", "d2bd76676a446e6a058d678444b7fda1387b813710d1af6d6e29bb92186c8820", [:rebar3], [], "hexpm", "b8acbf89a224bc217c3070ca8bebc6eb236dbe7f9767993b274084ea044d35f0"}, - "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, + "ranch": {:hex, :ranch, "2.1.0", "2261f9ed9574dcfcc444106b9f6da155e6e540b2f82ba3d42b339b93673b72a3", [:make, :rebar3], [], "hexpm", "244ee3fa2a6175270d8e1fc59024fd9dbc76294a321057de8f803b1479e76916"}, "recon": {:hex, :recon, "2.5.4", "05dd52a119ee4059fa9daa1ab7ce81bc7a8161a2f12e9d42e9d551ffd2ba901c", [:mix, :rebar3], [], "hexpm", "e9ab01ac7fc8572e41eb59385efeb3fb0ff5bf02103816535bacaedf327d0263"}, "req": {:hex, :req, "0.3.12", "f84c2f9e7cc71c81d7cbeacf7c61e763e53ab5f3065703792a4ab264b4f22672", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.9", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "c91103d4d1c8edeba90c84e0ba223a59865b673eaab217bfd17da3aa54ab136c"}, "rustler": {:hex, :rustler, "0.29.1", "880f20ae3027bd7945def6cea767f5257bc926f33ff50c0d5d5a5315883c084d", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "109497d701861bfcd26eb8f5801fe327a8eef304f56a5b63ef61151ff44ac9b6"}, diff --git a/test/integration/proxy_test.exs b/test/integration/proxy_test.exs index a3720f2b..aaee5ff7 100644 --- a/test/integration/proxy_test.exs +++ b/test/integration/proxy_test.exs @@ -75,6 +75,8 @@ defmodule Supavisor.Integration.ProxyTest do end test "query via another node", %{proxy: proxy, user: user} do + {:ok, _pid, node2} = Supavisor.Support.Cluster.start_node() + sup = Enum.reduce_while(1..30, nil, fn _, acc -> case Supavisor.get_global_sup({@tenant, user, :transaction}) do @@ -89,7 +91,7 @@ defmodule Supavisor.Integration.ProxyTest do assert sup == :erpc.call( - :"secondary@127.0.0.1", + node2, Supavisor, :get_global_sup, [{@tenant, user, :transaction}], @@ -114,7 +116,7 @@ defmodule Supavisor.Integration.ProxyTest do assert sup == :erpc.call( - :"secondary@127.0.0.1", + node2, Supavisor, :get_global_sup, [{@tenant, user, :transaction}], @@ -188,10 +190,11 @@ defmodule Supavisor.Integration.ProxyTest do [{_, client_pid, _}] = Supavisor.get_local_manager({{:single, @tenant}, "transaction", :transaction, "postgres"}) |> :sys.get_state() - |> then(& &1[:tid]) + |> Access.get(:tid) |> :ets.tab2list() - {state, %{db_pid: db_pid}} = :sys.get_state(client_pid) + assert {state, map} = :sys.get_state(client_pid) + assert %{db_pid: db_pid} = map assert {:idle, nil} = {state, db_pid} :gen_statem.stop(pid) diff --git a/test/supavisor/prom_ex_test.exs b/test/supavisor/prom_ex_test.exs index d29dc23c..246f1417 100644 --- a/test/supavisor/prom_ex_test.exs +++ b/test/supavisor/prom_ex_test.exs @@ -28,10 +28,10 @@ defmodule Supavisor.PromExTest do test "remove tenant tag upon termination", %{proxy: proxy, user: user, db_name: db_name} do assert PromEx.get_metrics() =~ "tenant=\"#{@tenant}\"" - GenServer.stop(proxy) - Supavisor.stop({{:single, @tenant}, user, :transaction, db_name}) + :ok = GenServer.stop(proxy) + :ok = Supavisor.stop({{:single, @tenant}, user, :transaction, db_name}) - Process.sleep(500) + Process.sleep(1000) refute PromEx.get_metrics() =~ "tenant=\"#{@tenant}\"" end diff --git a/test/supavisor/syn_handler_test.exs b/test/supavisor/syn_handler_test.exs index edfa4f5a..2404e2c1 100644 --- a/test/supavisor/syn_handler_test.exs +++ b/test/supavisor/syn_handler_test.exs @@ -7,7 +7,7 @@ defmodule Supavisor.SynHandlerTest do @id {{:single, "syn_tenant"}, "postgres", :session, "postgres"} test "resolving conflict" do - node2 = :"secondary@127.0.0.1" + {:ok, _pid, node2} = Supavisor.Support.Cluster.start_node() secret = %{alias: "postgres"} auth_secret = {:password, fn -> secret end} @@ -16,7 +16,7 @@ defmodule Supavisor.SynHandlerTest do assert pid2 == Supavisor.get_global_sup(@id) assert node(pid2) == node2 true = Node.disconnect(node2) - Process.sleep(500) + Process.sleep(1000) assert nil == Supavisor.get_global_sup(@id) {:ok, pid1} = Supavisor.start(@id, auth_secret) @@ -28,7 +28,7 @@ defmodule Supavisor.SynHandlerTest do msg = "Resolving syn_tenant conflict, stop local pid" - assert capture_log(fn -> Logger.warn(msg) end) =~ + assert capture_log(fn -> Logger.warning(msg) end) =~ msg assert pid2 == Supavisor.get_global_sup(@id) diff --git a/test/supavisor_web/controllers/metrics_controller_test.exs b/test/supavisor_web/controllers/metrics_controller_test.exs index 5bbd3407..9bb0963f 100644 --- a/test/supavisor_web/controllers/metrics_controller_test.exs +++ b/test/supavisor_web/controllers/metrics_controller_test.exs @@ -13,6 +13,10 @@ defmodule SupavisorWeb.MetricsControllerTest do end test "exporting metrics", %{conn: conn} do + {:ok, _pid, node2} = Supavisor.Support.Cluster.start_node() + + Node.connect(node2) + :meck.expect(Supavisor.Jwt, :authorize, fn _token, _secret -> {:ok, %{}} end) conn = get(conn, Routes.metrics_path(conn, :index)) assert conn.status == 200 diff --git a/test/support/cluster.ex b/test/support/cluster.ex index 9243eaf6..0b63c258 100644 --- a/test/support/cluster.ex +++ b/test/support/cluster.ex @@ -3,7 +3,24 @@ defmodule Supavisor.Support.Cluster do This module provides functionality to help handle distributive mode for testing. """ - def apply_config(node) do + def start_node(name \\ :peer.random_name()) do + {:ok, pid, node} = + :peer.start_link(%{ + name: name, + host: ~c"127.0.0.1", + longnames: true, + connection: :standard_io + }) + + :peer.call(pid, :logger, :set_primary_config, [:level, :none]) + true = :peer.call(pid, :code, :set_path, [:code.get_path()]) + apply_config(pid) + :peer.call(pid, Application, :ensure_all_started, [:supavisor]) + + {:ok, pid, node} + end + + defp apply_config(pid) do for {app_name, _, _} <- Application.loaded_applications() do for {key, val} <- Application.get_all_env(app_name) do val = @@ -27,9 +44,10 @@ defmodule Supavisor.Support.Cluster do val end - :rpc.call(node, Application, :put_env, [app_name, key, val, [persistent: true]]) - :rpc.call(node, Supavisor.Monitoring.PromEx, :set_metrics_tags, []) + :peer.call(pid, Application, :put_env, [app_name, key, val]) end end + + :peer.call(pid, Supavisor.Monitoring.PromEx, :set_metrics_tags, []) end end diff --git a/test/test_helper.exs b/test/test_helper.exs index a3a81c8b..92ecaa63 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,11 +1,4 @@ {:ok, _} = Node.start(:"primary@127.0.0.1", :longnames) -node2 = :"secondary@127.0.0.1" -:ct_slave.start(node2) -true = :erpc.call(node2, :code, :set_path, [:code.get_path()]) - -Supavisor.Support.Cluster.apply_config(node2) - -{:ok, _} = :erpc.call(node2, :application, :ensure_all_started, [:supavisor]) Cachex.start_link(name: Supavisor.Cache)