From 3985aab9abb618a07d4d88210851377b430f232a Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Tue, 23 Jan 2024 09:56:53 -0500 Subject: [PATCH 01/22] feat: global concurrent connections limiter --- apps/api_accounts/lib/api_accounts/key.ex | 4 +- apps/api_web/config/config.exs | 19 ++ apps/api_web/config/dev.exs | 2 + apps/api_web/config/prod.exs | 9 + apps/api_web/config/test.exs | 4 + apps/api_web/lib/api_web.ex | 1 + .../lib/api_web/api_controller_helpers.ex | 1 + apps/api_web/lib/api_web/event_stream.ex | 15 ++ .../api_web/plugs/rate_limiter_concurrent.ex | 54 +++++ .../rate_limiter/rate_limiter_concurrent.ex | 190 ++++++++++++++++++ .../admin/accounts/key/form.html.heex | 12 ++ apps/api_web/lib/api_web/user.ex | 16 ++ apps/api_web/mix.exs | 3 +- .../api_web/rate_limiter_concurrent_test.exs | 50 +++++ 14 files changed, 378 insertions(+), 2 deletions(-) create mode 100644 apps/api_web/lib/api_web/plugs/rate_limiter_concurrent.ex create mode 100644 apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex create mode 100644 apps/api_web/test/api_web/rate_limiter_concurrent_test.exs diff --git a/apps/api_accounts/lib/api_accounts/key.ex b/apps/api_accounts/lib/api_accounts/key.ex index 8ff20452..b0559d73 100644 --- a/apps/api_accounts/lib/api_accounts/key.ex +++ b/apps/api_accounts/lib/api_accounts/key.ex @@ -18,6 +18,8 @@ defmodule ApiAccounts.Key do field(:requested_date, :datetime) field(:approved, :boolean, default: false) field(:locked, :boolean, default: false) + field(:static_concurrent_limit, :integer) + field(:streaming_concurrent_limit, :integer) field(:daily_limit, :integer) field(:rate_request_pending, :boolean, default: false) field(:api_version, :string) @@ -28,7 +30,7 @@ defmodule ApiAccounts.Key do @doc false def changeset(struct, params \\ %{}) do fields = ~w( - created requested_date approved locked daily_limit rate_request_pending api_version description allowed_domains + created requested_date approved locked static_concurrent_limit streaming_concurrent_limit daily_limit rate_request_pending api_version description allowed_domains )a cast(struct, params, fields) end diff --git a/apps/api_web/config/config.exs b/apps/api_web/config/config.exs index 0758942f..b5bd40e5 100644 --- a/apps/api_web/config/config.exs +++ b/apps/api_web/config/config.exs @@ -30,6 +30,25 @@ config :api_web, :rate_limiter, max_registered_per_interval: 100_000, wait_time_ms: 0 +config :api_web, :rate_limiter_concurrent, + enabled: false, + memcache: false, + # How many seconds tolerated when calculating whether a connection is still open + # 45 - 30 (see ApiWeb.EventStream.Initialize's timeout value) gives us a buffer of 15 seconds: + heartbeat_tolerance: 45, + connection_opts: [ + namespace: "api_concurrent_rate_limit_dev", + # The total lifetime (s) of an API key + request type (static or streaming) key: + ttl: 60, + # Allows us to hold a nested payload per key, this is necessary because memcached does not offer + # a great way to group keys: + coder: Memcache.Coder.JSON + ], + # Default concurrent connections - these can be overridden on a per-key basis in the admin UI: + max_anon_static: 5, + max_registered_streaming: 10, + max_registered_static: 20 + config :api_web, ApiWeb.Plugs.ModifiedSinceHandler, check_caller: false config :api_web, :api_pipeline, diff --git a/apps/api_web/config/dev.exs b/apps/api_web/config/dev.exs index 98a75d8e..fd21f5d7 100644 --- a/apps/api_web/config/dev.exs +++ b/apps/api_web/config/dev.exs @@ -28,3 +28,5 @@ config :logger, :console, format: "[$level] $message\n", level: :debug # Do not configure such in production as keeping # and calculating stacktraces is usually expensive. config :phoenix, :stacktrace_depth, 20 + +config :api_web, :rate_limiter_concurrent, enabled: true, memcache: true diff --git a/apps/api_web/config/prod.exs b/apps/api_web/config/prod.exs index 3b4948d1..9b08b8c9 100644 --- a/apps/api_web/config/prod.exs +++ b/apps/api_web/config/prod.exs @@ -54,6 +54,15 @@ config :ehmon, :report_mf, {:ehmon, :info_report} config :logster, :filter_parameters, ~w(password password_confirm) +config :api_web, :rate_limiter_concurrent, + enabled: true, + memcache: true, + connection_opts: [ + namespace: "api_concurrent_rate_limit", + ttl: 60, + coder: Memcache.Coder.JSON + ] + config :api_web, :rate_limiter, clear_interval: 60_000, max_anon_per_interval: 20, diff --git a/apps/api_web/config/test.exs b/apps/api_web/config/test.exs index e1ceb7c1..8d7c30d9 100644 --- a/apps/api_web/config/test.exs +++ b/apps/api_web/config/test.exs @@ -26,3 +26,7 @@ config :recaptcha, # Print only warnings and errors during test config :logger, level: :warn + +config :api_web, :rate_limiter_concurrent, + enabled: true, + memcache: false diff --git a/apps/api_web/lib/api_web.ex b/apps/api_web/lib/api_web.ex index f2e48450..00662e99 100644 --- a/apps/api_web/lib/api_web.ex +++ b/apps/api_web/lib/api_web.ex @@ -16,6 +16,7 @@ defmodule ApiWeb do children = [ # Start the endpoint when the application starts ApiWeb.RateLimiter, + ApiWeb.RateLimiter.RateLimiterConcurrent, {RequestTrack, [name: ApiWeb.RequestTrack]}, ApiWeb.EventStream.Supervisor, ApiWeb.Endpoint, diff --git a/apps/api_web/lib/api_web/api_controller_helpers.ex b/apps/api_web/lib/api_web/api_controller_helpers.ex index 874fc8dc..52085c55 100644 --- a/apps/api_web/lib/api_web/api_controller_helpers.ex +++ b/apps/api_web/lib/api_web/api_controller_helpers.ex @@ -27,6 +27,7 @@ defmodule ApiWeb.ApiControllerHelpers do plug(:split_include) plug(ApiWeb.Plugs.ModifiedSinceHandler, caller: __MODULE__) plug(ApiWeb.Plugs.RateLimiter) + plug(ApiWeb.Plugs.RateLimiterConcurrent) def index(conn, params), do: ApiControllerHelpers.index(__MODULE__, conn, params) diff --git a/apps/api_web/lib/api_web/event_stream.ex b/apps/api_web/lib/api_web/event_stream.ex index 5dd85de2..b317578d 100644 --- a/apps/api_web/lib/api_web/event_stream.ex +++ b/apps/api_web/lib/api_web/event_stream.ex @@ -7,6 +7,7 @@ defmodule ApiWeb.EventStream do import Plug.Conn alias __MODULE__.Supervisor alias ApiWeb.Plugs.CheckForShutdown + alias ApiWeb.RateLimiter.RateLimiterConcurrent require Logger @enforce_keys [:conn, :pid, :timeout] @@ -48,6 +49,13 @@ defmodule ApiWeb.EventStream do @spec hibernate_loop(state) :: Plug.Conn.t() def hibernate_loop(state) do + if Map.has_key?(state.conn.assigns, :api_user) do + # Update the concurrent rate limit cache to ensure any flushing doesn't impact long-running connections: + RateLimiterConcurrent.add_lock(state.conn.assigns.api_user, self(), true) + else + Logger.warn("#{__MODULE__} missing_api_user - cannot rate limit!") + end + case receive_result(state) do {:continue, state} -> :proc_lib.hibernate(__MODULE__, :hibernate_loop, [state]) @@ -125,6 +133,13 @@ defmodule ApiWeb.EventStream do end defp unsubscribe(state) do + if Map.has_key?(state.conn.assigns, :api_user) do + # clean up our concurrent connections lock: + RateLimiterConcurrent.remove_lock(state.conn.assigns.api_user, self(), true) + else + Logger.warn("#{__MODULE__} missing_api_user - cannot rate limit!") + end + # consume any extra messages received after unsubscribing receive do {:events, _} -> diff --git a/apps/api_web/lib/api_web/plugs/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/plugs/rate_limiter_concurrent.ex new file mode 100644 index 00000000..e25d9b79 --- /dev/null +++ b/apps/api_web/lib/api_web/plugs/rate_limiter_concurrent.ex @@ -0,0 +1,54 @@ +defmodule ApiWeb.Plugs.RateLimiterConcurrent do + @moduledoc """ + Plug to invoke the concurrent rate limiter. + """ + + import Plug.Conn + import Phoenix.Controller, only: [render: 3, put_view: 2] + + alias ApiWeb.RateLimiter.RateLimiterConcurrent + + @rate_limit_concurrent_config Application.compile_env!(:api_web, :rate_limiter_concurrent) + + def init(opts), do: opts + + def call(conn, _opts) do + if enabled?() do + event_stream? = Plug.Conn.get_req_header(conn, "accept") == ["text/event-stream"] + + {at_limit?, remaining, limit} = + RateLimiterConcurrent.check_concurrent_rate_limit(conn.assigns.api_user, event_stream?) + + # Allow negative limits to allow unlimited use: + if limit >= 0 and at_limit? do + conn + |> put_concurrent_rate_limit_headers(limit, remaining) + |> put_status(429) + |> put_view(ApiWeb.ErrorView) + |> render("429.json-api", []) + |> halt() + else + RateLimiterConcurrent.add_lock(conn.assigns.api_user, self(), event_stream?) + + conn + |> put_concurrent_rate_limit_headers(limit, remaining - 1) + |> register_before_send(fn conn -> + RateLimiterConcurrent.remove_lock(conn.assigns.api_user, self(), event_stream?) + conn + end) + end + else + conn + end + end + + defp put_concurrent_rate_limit_headers(conn, limit, remaining) do + conn + |> put_resp_header("x-concurrent-ratelimit-limit", "#{limit}") + |> put_resp_header("x-concurrent-ratelimit-remaining", "#{remaining}") + end + + def enabled? do + Keyword.fetch!(@rate_limit_concurrent_config, :enabled) == true + end +end diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex new file mode 100644 index 00000000..ba4eccb2 --- /dev/null +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -0,0 +1,190 @@ +defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do + @moduledoc """ + Rate limits a user's concurrent connections based on their API key or by their IP address if no + API key is provided. Split by static and event-stream requests. + """ + + use GenServer + require Logger + + @rate_limit_concurrent_config Application.compile_env!(:api_web, :rate_limiter_concurrent) + + def start_link([]), do: GenServer.start_link(__MODULE__, nil, name: __MODULE__) + + def init(_) do + connection_opts = Keyword.fetch!(@rate_limit_concurrent_config, :connection_opts) + + {:ok, pid} = if memcache?(), do: Memcache.start_link(connection_opts), else: {:ok, nil} + {:ok, %{memcache_pid: pid, uuid: UUID.uuid1()}} + end + + defp lookup(%ApiWeb.User{} = user, event_stream?) do + type = if event_stream?, do: "event_stream", else: "static" + key = "concurrent_#{user.id}_#{type}" + + { + type, + key + } + end + + def get_pid_key(pid) do + sub_key = pid |> :erlang.pid_to_list() |> to_string + get_uuid() <> sub_key + end + + defp get_uuid do + {:ok, uuid} = GenServer.call(__MODULE__, :get_uuid) + uuid + end + + defp get_current_unix_ts do + System.system_time(:second) + end + + defp get_heartbeat_tolerance do + Keyword.fetch!(@rate_limit_concurrent_config, :heartbeat_tolerance) + end + + def get_locks(%ApiWeb.User{} = user, event_stream?) do + if enabled?() do + current_timestamp = get_current_unix_ts() + heartbeat_tolerance = get_heartbeat_tolerance() + {_type, key} = lookup(user, event_stream?) + {:ok, locks} = GenServer.call(__MODULE__, {:memcache_get, key, %{}}) + # Check if any expired, and remove: + valid_locks = + :maps.filter( + fn _, timestamp -> + timestamp + heartbeat_tolerance >= current_timestamp + end, + locks + ) + + if valid_locks != locks, do: GenServer.call(__MODULE__, {:memcache_set, key, valid_locks}) + valid_locks + else + %{} + end + end + + def check_concurrent_rate_limit(user, event_stream?) do + active_connections = user |> get_locks(event_stream?) |> Map.keys() |> length + + limit = + case {event_stream?, user.type} do + {true, :registered} -> + if user.streaming_concurrent_limit >= 0, + do: + max( + user.streaming_concurrent_limit || 0, + Keyword.fetch!( + @rate_limit_concurrent_config, + :max_registered_streaming + ) + ), + else: user.streaming_concurrent_limit + + {false, :registered} -> + if user.static_concurrent_limit >= 0, + do: + max( + user.static_concurrent_limit || 0, + Keyword.fetch!( + @rate_limit_concurrent_config, + :max_registered_static + ) + ), + else: user.static_concurrent_limit + + {true, :anon} -> + Keyword.fetch!( + @rate_limit_concurrent_config, + :max_anon_streaming + ) + + {false, :anon} -> + Keyword.fetch!( + @rate_limit_concurrent_config, + :max_anon_static + ) + end + + remaining = limit - active_connections + at_limit? = remaining <= 0 + {at_limit?, remaining, limit} + end + + def add_lock(%ApiWeb.User{} = user, pid, event_stream?) do + if enabled?() do + {_type, key} = lookup(user, event_stream?) + pid_key = get_pid_key(pid) + timestamp = get_current_unix_ts() + + Logger.info( + "#{__MODULE__} event=add_lock user=#{inspect(user)} pid_key=#{pid_key} key=#{key} timestamp=#{timestamp}" + ) + + locks = user |> get_locks(event_stream?) |> Map.put(pid_key, timestamp) + + Logger.info( + "#{__MODULE__} event=add_lock_after user=#{inspect(user)} pid_key=#{pid_key} key=#{key} timestamp=#{timestamp} locks=#{inspect(locks)}" + ) + + GenServer.call(__MODULE__, {:memcache_set, key, locks}) + end + + nil + end + + def remove_lock( + %ApiWeb.User{} = user, + pid, + event_stream?, + pid_key \\ nil + ) do + if enabled?() do + {_type, key} = lookup(user, event_stream?) + pid_key = if pid_key, do: pid_key, else: get_pid_key(pid) + locks_before = get_locks(user, event_stream?) + locks = locks_before |> Map.delete(pid_key) + + Logger.info( + "#{__MODULE__} event=remove_lock user_id=#{user.id} pid_key=#{pid_key} key=#{key}" + ) + + GenServer.call(__MODULE__, {:memcache_set, key, locks}) + + Logger.info( + "#{__MODULE__} event=remove_lock_after user_id=#{user.id} pid_key=#{pid_key} key=#{key} locks=#{inspect(locks)}" + ) + end + + nil + end + + def enabled? do + Keyword.fetch!(@rate_limit_concurrent_config, :enabled) == True + end + + def memcache? do + Keyword.fetch!(@rate_limit_concurrent_config, :memcache) == True + end + + def handle_call(:get_uuid, _from, state) do + {:reply, {:ok, state.uuid}, state} + end + + def handle_call({:memcache_get, key, default_value}, _from, state) do + {:reply, + {:ok, + case Memcache.get(state.memcache_pid, key) do + {:ok, result} -> result + _ -> default_value + end}, state} + end + + def handle_call({:memcache_set, key, value}, _from, state) do + {:reply, {:ok, Memcache.set(state.memcache_pid, key, value)}, state} + end +end diff --git a/apps/api_web/lib/api_web/templates/admin/accounts/key/form.html.heex b/apps/api_web/lib/api_web/templates/admin/accounts/key/form.html.heex index 74f5562d..ea2700b0 100644 --- a/apps/api_web/lib/api_web/templates/admin/accounts/key/form.html.heex +++ b/apps/api_web/lib/api_web/templates/admin/accounts/key/form.html.heex @@ -18,6 +18,18 @@ <%= error_tag(f, :daily_limit) %> +
+ <%= label(f, :static_concurrent_limit, class: "control-label") %> + <%= text_input(f, :static_concurrent_limit, class: "form-control") %> + <%= error_tag(f, :static_concurrent_limit) %> +
+ +
+ <%= label(f, :streaming_concurrent_limit, class: "control-label") %> + <%= text_input(f, :streaming_concurrent_limit, class: "form-control") %> + <%= error_tag(f, :streaming_concurrent_limit) %> +
+
<%= label(f, :description, class: "control-label") %> <%= text_input(f, :description, class: "form-control") %> diff --git a/apps/api_web/lib/api_web/user.ex b/apps/api_web/lib/api_web/user.ex index 4db1183c..46c620f9 100644 --- a/apps/api_web/lib/api_web/user.ex +++ b/apps/api_web/lib/api_web/user.ex @@ -9,6 +9,8 @@ defmodule ApiWeb.User do :id, :type, :limit, + :static_concurrent_limit, + :streaming_concurrent_limit, :version, :allowed_domains ] @@ -36,6 +38,16 @@ defmodule ApiWeb.User do """ @type requests_per_day :: integer + @typedoc """ + The max number of event-stream requests that the user can make at once. + """ + @type streaming_concurrent_limit :: integer + + @typedoc """ + The max number of static requests that the user can make at once. + """ + @type static_concurrent_limit :: integer + @typedoc """ Whether the user is an anonymous user or a registered user. @@ -103,6 +115,8 @@ defmodule ApiWeb.User do def from_key(%ApiAccounts.Key{ key: key, daily_limit: limit, + static_concurrent_limit: static_concurrent_limit, + streaming_concurrent_limit: streaming_concurrent_limit, api_version: version, allowed_domains: allowed_domains }) do @@ -111,6 +125,8 @@ defmodule ApiWeb.User do %__MODULE__{ id: key, limit: limit, + static_concurrent_limit: static_concurrent_limit, + streaming_concurrent_limit: streaming_concurrent_limit, type: :registered, version: version, allowed_domains: nil_or_allowed_domains(allowed_domains) diff --git a/apps/api_web/mix.exs b/apps/api_web/mix.exs index dd893d50..a7ee47dc 100644 --- a/apps/api_web/mix.exs +++ b/apps/api_web/mix.exs @@ -85,7 +85,8 @@ defmodule ApiWeb.Mixfile do {:recaptcha, git: "https://github.com/samueljseay/recaptcha.git", tag: "71cd746"}, {:sentry, "~> 8.0"}, {:qr_code, "~> 3.0"}, - {:nimble_totp, "~> 1.0"} + {:nimble_totp, "~> 1.0"}, + {:uuid, "~> 1.1"} ] end end diff --git a/apps/api_web/test/api_web/rate_limiter_concurrent_test.exs b/apps/api_web/test/api_web/rate_limiter_concurrent_test.exs new file mode 100644 index 00000000..607ca8ce --- /dev/null +++ b/apps/api_web/test/api_web/rate_limiter_concurrent_test.exs @@ -0,0 +1,50 @@ +defmodule ApiWeb.RateLimiterConcurrentTest do + @moduledoc false + use ExUnit.Case, async: false + use Plug.Test + alias ApiWeb.RateLimiter.RateLimiterConcurrent + + test "start_link/1" do + Application.stop(:api_web) + + on_exit(fn -> + Application.start(:api_web) + end) + + assert {:ok, _pid} = RateLimiterConcurrent.start_link([]) + end + + test "check_concurrent_rate_limit/1" do + {anon_streaming_at_limit?, anon_streaming_remaining, anon_streaming_limit} = + RateLimiterConcurrent.check_concurrent_rate_limit(%ApiWeb.User{type: :anon}, true) + + assert anon_streaming_limit == ApiWeb.config(:rate_limiter_concurrent, :max_anon_streaming) + assert anon_streaming_remaining == anon_streaming_limit + assert anon_streaming_at_limit? == false + + {anon_static_at_limit?, anon_static_remaining, anon_static_limit} = + RateLimiterConcurrent.check_concurrent_rate_limit(%ApiWeb.User{type: :anon}, false) + + assert anon_static_limit == ApiWeb.config(:rate_limiter_concurrent, :max_anon_static) + assert anon_static_remaining == anon_static_limit + assert anon_static_at_limit? == false + + {registered_streaming_at_limit?, registered_streaming_remaining, registered_streaming_limit} = + RateLimiterConcurrent.check_concurrent_rate_limit(%ApiWeb.User{type: :registered}, true) + + assert registered_streaming_limit == + ApiWeb.config(:rate_limiter_concurrent, :max_registered_streaming) + + assert registered_streaming_remaining == registered_streaming_limit + assert registered_streaming_at_limit? == false + + {registered_static_at_limit?, registered_static_remaining, registered_static_limit} = + RateLimiterConcurrent.check_concurrent_rate_limit(%ApiWeb.User{type: :registered}, false) + + assert registered_static_limit == + ApiWeb.config(:rate_limiter_concurrent, :max_registered_static) + + assert registered_static_remaining == registered_static_limit + assert registered_static_at_limit? == false + end +end From 2955855a2f501664da32242c6211830acb164fe0 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Tue, 23 Jan 2024 10:04:34 -0500 Subject: [PATCH 02/22] fix: use memcache pool from base rate limiter --- .../rate_limiter/memcache/supervisor.ex | 2 +- .../rate_limiter/rate_limiter_concurrent.ex | 25 ++++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex b/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex index 36da061a..e5b3bae5 100644 --- a/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex +++ b/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex @@ -31,7 +31,7 @@ defmodule ApiWeb.RateLimiter.Memcache.Supervisor do {:via, Registry, {@registry_name, index}} end - defp random_child do + def random_child do worker_name(:rand.uniform(@worker_count)) end end diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex index ba4eccb2..d0d8549c 100644 --- a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -51,7 +51,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do current_timestamp = get_current_unix_ts() heartbeat_tolerance = get_heartbeat_tolerance() {_type, key} = lookup(user, event_stream?) - {:ok, locks} = GenServer.call(__MODULE__, {:memcache_get, key, %{}}) + {:ok, locks} = memcache_get(key, %{}) # Check if any expired, and remove: valid_locks = :maps.filter( @@ -61,13 +61,15 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do locks ) - if valid_locks != locks, do: GenServer.call(__MODULE__, {:memcache_set, key, valid_locks}) + if valid_locks != locks, do: memcache_set(key, valid_locks) valid_locks else %{} end end + @spec check_concurrent_rate_limit(ApiWeb.User.t(), boolean()) :: + {false, number(), number()} | {true, number(), number()} def check_concurrent_rate_limit(user, event_stream?) do active_connections = user |> get_locks(event_stream?) |> Map.keys() |> length @@ -153,7 +155,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do "#{__MODULE__} event=remove_lock user_id=#{user.id} pid_key=#{pid_key} key=#{key}" ) - GenServer.call(__MODULE__, {:memcache_set, key, locks}) + memcache_set(key, locks) Logger.info( "#{__MODULE__} event=remove_lock_after user_id=#{user.id} pid_key=#{pid_key} key=#{key} locks=#{inspect(locks)}" @@ -175,16 +177,15 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do {:reply, {:ok, state.uuid}, state} end - def handle_call({:memcache_get, key, default_value}, _from, state) do - {:reply, - {:ok, - case Memcache.get(state.memcache_pid, key) do - {:ok, result} -> result - _ -> default_value - end}, state} + def memcache_set(key, value) do + Memcache.set(ApiWeb.RateLimiter.Memcache.Supervisor.random_child(), key, value) end - def handle_call({:memcache_set, key, value}, _from, state) do - {:reply, {:ok, Memcache.set(state.memcache_pid, key, value)}, state} + def memcache_get(key, default_value) do + {:ok, + case Memcache.get(ApiWeb.RateLimiter.Memcache.Supervisor.random_child(), key) do + {:ok, result} -> result + _ -> default_value + end} end end From ac718147ab89da8b0b3429cd801fc2c9cc164b21 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Tue, 23 Jan 2024 10:06:25 -0500 Subject: [PATCH 03/22] fix: use boolean true --- .../lib/api_web/rate_limiter/rate_limiter_concurrent.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex index d0d8549c..e0fa3c11 100644 --- a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -166,11 +166,11 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do end def enabled? do - Keyword.fetch!(@rate_limit_concurrent_config, :enabled) == True + Keyword.fetch!(@rate_limit_concurrent_config, :enabled) == true end def memcache? do - Keyword.fetch!(@rate_limit_concurrent_config, :memcache) == True + Keyword.fetch!(@rate_limit_concurrent_config, :memcache) == true end def handle_call(:get_uuid, _from, state) do From f7ce4567783e1186c581043ed3386c7bdd8e789b Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 24 Jan 2024 00:34:14 -0500 Subject: [PATCH 04/22] fix: use cas instead of get/set --- .../rate_limiter/rate_limiter_concurrent.ex | 61 +++++++------------ 1 file changed, 22 insertions(+), 39 deletions(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex index e0fa3c11..2c8011fd 100644 --- a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -12,10 +12,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do def start_link([]), do: GenServer.start_link(__MODULE__, nil, name: __MODULE__) def init(_) do - connection_opts = Keyword.fetch!(@rate_limit_concurrent_config, :connection_opts) - - {:ok, pid} = if memcache?(), do: Memcache.start_link(connection_opts), else: {:ok, nil} - {:ok, %{memcache_pid: pid, uuid: UUID.uuid1()}} + {:ok, %{uuid: UUID.uuid1()}} end defp lookup(%ApiWeb.User{} = user, event_stream?) do @@ -46,23 +43,23 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do Keyword.fetch!(@rate_limit_concurrent_config, :heartbeat_tolerance) end - def get_locks(%ApiWeb.User{} = user, event_stream?) do + def mutate_locks(%ApiWeb.User{} = user, event_stream?, before_commit \\ fn value -> value end) do if enabled?() do current_timestamp = get_current_unix_ts() heartbeat_tolerance = get_heartbeat_tolerance() {_type, key} = lookup(user, event_stream?) - {:ok, locks} = memcache_get(key, %{}) - # Check if any expired, and remove: - valid_locks = - :maps.filter( - fn _, timestamp -> - timestamp + heartbeat_tolerance >= current_timestamp - end, - locks - ) - - if valid_locks != locks, do: memcache_set(key, valid_locks) - valid_locks + + memcache_update(key, %{}, fn locks -> + valid_locks = + :maps.filter( + fn _, timestamp -> + timestamp + heartbeat_tolerance >= current_timestamp + end, + locks + ) + + before_commit.(valid_locks) + end) else %{} end @@ -71,7 +68,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do @spec check_concurrent_rate_limit(ApiWeb.User.t(), boolean()) :: {false, number(), number()} | {true, number(), number()} def check_concurrent_rate_limit(user, event_stream?) do - active_connections = user |> get_locks(event_stream?) |> Map.keys() |> length + active_connections = user |> mutate_locks(event_stream?) |> Map.keys() |> length limit = case {event_stream?, user.type} do @@ -127,13 +124,12 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do "#{__MODULE__} event=add_lock user=#{inspect(user)} pid_key=#{pid_key} key=#{key} timestamp=#{timestamp}" ) - locks = user |> get_locks(event_stream?) |> Map.put(pid_key, timestamp) + locks = + user |> mutate_locks(event_stream?, fn locks -> Map.put(locks, pid_key, timestamp) end) Logger.info( "#{__MODULE__} event=add_lock_after user=#{inspect(user)} pid_key=#{pid_key} key=#{key} timestamp=#{timestamp} locks=#{inspect(locks)}" ) - - GenServer.call(__MODULE__, {:memcache_set, key, locks}) end nil @@ -148,18 +144,11 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do if enabled?() do {_type, key} = lookup(user, event_stream?) pid_key = if pid_key, do: pid_key, else: get_pid_key(pid) - locks_before = get_locks(user, event_stream?) - locks = locks_before |> Map.delete(pid_key) + mutate_locks(user, event_stream?, fn locks -> Map.delete(locks, pid_key) end) Logger.info( "#{__MODULE__} event=remove_lock user_id=#{user.id} pid_key=#{pid_key} key=#{key}" ) - - memcache_set(key, locks) - - Logger.info( - "#{__MODULE__} event=remove_lock_after user_id=#{user.id} pid_key=#{pid_key} key=#{key} locks=#{inspect(locks)}" - ) end nil @@ -177,15 +166,9 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do {:reply, {:ok, state.uuid}, state} end - def memcache_set(key, value) do - Memcache.set(ApiWeb.RateLimiter.Memcache.Supervisor.random_child(), key, value) - end - - def memcache_get(key, default_value) do - {:ok, - case Memcache.get(ApiWeb.RateLimiter.Memcache.Supervisor.random_child(), key) do - {:ok, result} -> result - _ -> default_value - end} + def memcache_update(key, default_value, update_fn) do + Memcache.cas(ApiWeb.RateLimiter.Memcache.Supervisor.random_child(), key, update_fn, + default: default_value + ) end end From 110bfc3d43415425f97ecff5c10d3dd95bb65dcb Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 24 Jan 2024 00:46:54 -0500 Subject: [PATCH 05/22] chore: fix dialyzer complaints --- .../api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex index 2c8011fd..a64e08fc 100644 --- a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -144,7 +144,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do if enabled?() do {_type, key} = lookup(user, event_stream?) pid_key = if pid_key, do: pid_key, else: get_pid_key(pid) - mutate_locks(user, event_stream?, fn locks -> Map.delete(locks, pid_key) end) + {:ok, _locks} = mutate_locks(user, event_stream?, fn (locks) -> Map.delete(locks, pid_key) end) Logger.info( "#{__MODULE__} event=remove_lock user_id=#{user.id} pid_key=#{pid_key} key=#{key}" From 1643d61f28cc9dcac9248c70ede0881ef9feaa82 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 24 Jan 2024 00:49:04 -0500 Subject: [PATCH 06/22] chore: apply formatter --- .../lib/api_web/rate_limiter/rate_limiter_concurrent.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex index a64e08fc..2092759d 100644 --- a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -144,7 +144,9 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do if enabled?() do {_type, key} = lookup(user, event_stream?) pid_key = if pid_key, do: pid_key, else: get_pid_key(pid) - {:ok, _locks} = mutate_locks(user, event_stream?, fn (locks) -> Map.delete(locks, pid_key) end) + + {:ok, _locks} = + mutate_locks(user, event_stream?, fn locks -> Map.delete(locks, pid_key) end) Logger.info( "#{__MODULE__} event=remove_lock user_id=#{user.id} pid_key=#{pid_key} key=#{key}" From 2bd050d8dea2c45cac72ebc0742e2e1b7b443de9 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 24 Jan 2024 01:59:14 -0500 Subject: [PATCH 07/22] fix: initialize memcache supervisor independently of rate_limiter, concurrent_rate_limiter --- apps/api_web/config/config.exs | 13 ++----- apps/api_web/config/prod.exs | 7 +--- apps/api_web/lib/api_web.ex | 1 + .../lib/api_web/rate_limiter/memcache.ex | 12 ++---- .../rate_limiter/memcache/supervisor.ex | 37 +++++++++++++------ .../rate_limiter/rate_limiter_concurrent.ex | 8 ++-- 6 files changed, 39 insertions(+), 39 deletions(-) diff --git a/apps/api_web/config/config.exs b/apps/api_web/config/config.exs index b5bd40e5..667f4946 100644 --- a/apps/api_web/config/config.exs +++ b/apps/api_web/config/config.exs @@ -28,7 +28,10 @@ config :api_web, :rate_limiter, limiter: ApiWeb.RateLimiter.ETS, max_anon_per_interval: 5_000, max_registered_per_interval: 100_000, - wait_time_ms: 0 + wait_time_ms: 0, + connection_opts: [ + coder: Memcache.Coder.JSON + ] config :api_web, :rate_limiter_concurrent, enabled: false, @@ -36,14 +39,6 @@ config :api_web, :rate_limiter_concurrent, # How many seconds tolerated when calculating whether a connection is still open # 45 - 30 (see ApiWeb.EventStream.Initialize's timeout value) gives us a buffer of 15 seconds: heartbeat_tolerance: 45, - connection_opts: [ - namespace: "api_concurrent_rate_limit_dev", - # The total lifetime (s) of an API key + request type (static or streaming) key: - ttl: 60, - # Allows us to hold a nested payload per key, this is necessary because memcached does not offer - # a great way to group keys: - coder: Memcache.Coder.JSON - ], # Default concurrent connections - these can be overridden on a per-key basis in the admin UI: max_anon_static: 5, max_registered_streaming: 10, diff --git a/apps/api_web/config/prod.exs b/apps/api_web/config/prod.exs index 9b08b8c9..44f0f16d 100644 --- a/apps/api_web/config/prod.exs +++ b/apps/api_web/config/prod.exs @@ -56,12 +56,7 @@ config :logster, :filter_parameters, ~w(password password_confirm) config :api_web, :rate_limiter_concurrent, enabled: true, - memcache: true, - connection_opts: [ - namespace: "api_concurrent_rate_limit", - ttl: 60, - coder: Memcache.Coder.JSON - ] + memcache: true config :api_web, :rate_limiter, clear_interval: 60_000, diff --git a/apps/api_web/lib/api_web.ex b/apps/api_web/lib/api_web.ex index 00662e99..54ec13a8 100644 --- a/apps/api_web/lib/api_web.ex +++ b/apps/api_web/lib/api_web.ex @@ -15,6 +15,7 @@ defmodule ApiWeb do # no cover children = [ # Start the endpoint when the application starts + ApiWeb.RateLimiter.Memcache.Supervisor, ApiWeb.RateLimiter, ApiWeb.RateLimiter.RateLimiterConcurrent, {RequestTrack, [name: ApiWeb.RequestTrack]}, diff --git a/apps/api_web/lib/api_web/rate_limiter/memcache.ex b/apps/api_web/lib/api_web/rate_limiter/memcache.ex index 86278bc2..c6768cb1 100644 --- a/apps/api_web/lib/api_web/rate_limiter/memcache.ex +++ b/apps/api_web/lib/api_web/rate_limiter/memcache.ex @@ -5,15 +5,9 @@ defmodule ApiWeb.RateLimiter.Memcache do @behaviour ApiWeb.RateLimiter.Limiter alias ApiWeb.RateLimiter.Memcache.Supervisor - @impl ApiWeb.RateLimiter.Limiter - def start_link(opts) do - clear_interval_ms = Keyword.fetch!(opts, :clear_interval) - clear_interval = div(clear_interval_ms, 1000) - - connection_opts = - [ttl: clear_interval * 2] ++ ApiWeb.config(RateLimiter.Memcache, :connection_opts) - - Supervisor.start_link(connection_opts) + @impl true + def start_link(_) do + {:ok, self()} end @impl ApiWeb.RateLimiter.Limiter diff --git a/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex b/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex index e5b3bae5..8622b3f7 100644 --- a/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex +++ b/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex @@ -4,22 +4,32 @@ defmodule ApiWeb.RateLimiter.Memcache.Supervisor do """ @worker_count 5 @registry_name __MODULE__.Registry + @rate_limit_config Application.compile_env!(:api_web, :rate_limiter) + + use Agent + + def start_link(_) do + clear_interval_ms = Keyword.fetch!(@rate_limit_config, :clear_interval) + clear_interval = div(clear_interval_ms, 1000) + connection_opts_config = Keyword.fetch!(@rate_limit_config, :connection_opts) + connection_opts = [ttl: clear_interval * 2] ++ connection_opts_config - def start_link(connection_opts) do registry = {Registry, keys: :unique, name: @registry_name} - workers = - for i <- 1..@worker_count do - Supervisor.child_spec({Memcache, [connection_opts, [name: worker_name(i)]]}, id: i) - end + if memcache_required?() do + workers = + for i <- 1..@worker_count do + Supervisor.child_spec({Memcache, [connection_opts, [name: worker_name(i)]]}, id: i) + end - children = [registry | workers] + children = [registry | workers] - Supervisor.start_link( - children, - strategy: :rest_for_one, - name: __MODULE__ - ) + Supervisor.start_link( + children, + strategy: :rest_for_one, + name: __MODULE__ + ) + end end @doc "Decrement a given key, using a random child." @@ -31,6 +41,11 @@ defmodule ApiWeb.RateLimiter.Memcache.Supervisor do {:via, Registry, {@registry_name, index}} end + defp memcache_required? do + ApiWeb.RateLimiter.RateLimiterConcurrent.enabled?() or + ApiWeb.config(:rate_limiter, :limiter) == ApiWeb.RateLimiter.Memcache + end + def random_child do worker_name(:rand.uniform(@worker_count)) end diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex index 2092759d..ee038d75 100644 --- a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -6,6 +6,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do use GenServer require Logger + alias ApiWeb.RateLimiter.Memcache.Supervisor @rate_limit_concurrent_config Application.compile_env!(:api_web, :rate_limiter_concurrent) @@ -68,7 +69,8 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do @spec check_concurrent_rate_limit(ApiWeb.User.t(), boolean()) :: {false, number(), number()} | {true, number(), number()} def check_concurrent_rate_limit(user, event_stream?) do - active_connections = user |> mutate_locks(event_stream?) |> Map.keys() |> length + {:ok, locks} = user |> mutate_locks(event_stream?) + active_connections = locks |> Map.keys() |> length limit = case {event_stream?, user.type} do @@ -169,8 +171,6 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do end def memcache_update(key, default_value, update_fn) do - Memcache.cas(ApiWeb.RateLimiter.Memcache.Supervisor.random_child(), key, update_fn, - default: default_value - ) + Memcache.cas(Supervisor.random_child(), key, update_fn, default: default_value) end end From 2c62892b6b816173d69371c4eeb63f1b0fb47533 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 24 Jan 2024 02:06:23 -0500 Subject: [PATCH 08/22] fix: better logic for determining whether memcache supervisor should be booted --- apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex b/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex index 8622b3f7..ab7b0200 100644 --- a/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex +++ b/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex @@ -42,7 +42,8 @@ defmodule ApiWeb.RateLimiter.Memcache.Supervisor do end defp memcache_required? do - ApiWeb.RateLimiter.RateLimiterConcurrent.enabled?() or + (ApiWeb.RateLimiter.RateLimiterConcurrent.enabled?() and + ApiWeb.RateLimiter.RateLimiterConcurrent.memcache?()) or ApiWeb.config(:rate_limiter, :limiter) == ApiWeb.RateLimiter.Memcache end From 123343781a0d1536f0a459e3136238882559a7fd Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 24 Jan 2024 02:16:27 -0500 Subject: [PATCH 09/22] fix: better logic for determining whether memcache supervisor should be booted --- .../rate_limiter/memcache/supervisor.ex | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex b/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex index ab7b0200..8059ecfa 100644 --- a/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex +++ b/apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex @@ -16,20 +16,23 @@ defmodule ApiWeb.RateLimiter.Memcache.Supervisor do registry = {Registry, keys: :unique, name: @registry_name} - if memcache_required?() do - workers = - for i <- 1..@worker_count do - Supervisor.child_spec({Memcache, [connection_opts, [name: worker_name(i)]]}, id: i) - end - - children = [registry | workers] - - Supervisor.start_link( - children, - strategy: :rest_for_one, - name: __MODULE__ - ) - end + children = + if memcache_required?() do + workers = + for i <- 1..@worker_count do + Supervisor.child_spec({Memcache, [connection_opts, [name: worker_name(i)]]}, id: i) + end + + [registry | workers] + else + [registry] + end + + Supervisor.start_link( + children, + strategy: :rest_for_one, + name: __MODULE__ + ) end @doc "Decrement a given key, using a random child." From 9c0353f022d3ee833a67ed5cb19b2e44a74d76b9 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 24 Jan 2024 02:52:48 -0500 Subject: [PATCH 10/22] test: fix unit tests for concurrent connections --- apps/api_web/config/test.exs | 2 +- .../rate_limiter/rate_limiter_concurrent.ex | 14 +++++--------- .../test/api_web/rate_limiter_concurrent_test.exs | 7 ------- 3 files changed, 6 insertions(+), 17 deletions(-) diff --git a/apps/api_web/config/test.exs b/apps/api_web/config/test.exs index 8d7c30d9..08474ac5 100644 --- a/apps/api_web/config/test.exs +++ b/apps/api_web/config/test.exs @@ -28,5 +28,5 @@ config :recaptcha, config :logger, level: :warn config :api_web, :rate_limiter_concurrent, - enabled: true, + enabled: false, memcache: false diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex index ee038d75..3f574b86 100644 --- a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -62,7 +62,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do before_commit.(valid_locks) end) else - %{} + {:ok, %{}} end end @@ -98,12 +98,6 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do ), else: user.static_concurrent_limit - {true, :anon} -> - Keyword.fetch!( - @rate_limit_concurrent_config, - :max_anon_streaming - ) - {false, :anon} -> Keyword.fetch!( @rate_limit_concurrent_config, @@ -147,8 +141,10 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do {_type, key} = lookup(user, event_stream?) pid_key = if pid_key, do: pid_key, else: get_pid_key(pid) - {:ok, _locks} = - mutate_locks(user, event_stream?, fn locks -> Map.delete(locks, pid_key) end) + if memcache?() do + {:ok, _locks} = + mutate_locks(user, event_stream?, fn locks -> Map.delete(locks, pid_key) end) + end Logger.info( "#{__MODULE__} event=remove_lock user_id=#{user.id} pid_key=#{pid_key} key=#{key}" diff --git a/apps/api_web/test/api_web/rate_limiter_concurrent_test.exs b/apps/api_web/test/api_web/rate_limiter_concurrent_test.exs index 607ca8ce..8b786405 100644 --- a/apps/api_web/test/api_web/rate_limiter_concurrent_test.exs +++ b/apps/api_web/test/api_web/rate_limiter_concurrent_test.exs @@ -15,13 +15,6 @@ defmodule ApiWeb.RateLimiterConcurrentTest do end test "check_concurrent_rate_limit/1" do - {anon_streaming_at_limit?, anon_streaming_remaining, anon_streaming_limit} = - RateLimiterConcurrent.check_concurrent_rate_limit(%ApiWeb.User{type: :anon}, true) - - assert anon_streaming_limit == ApiWeb.config(:rate_limiter_concurrent, :max_anon_streaming) - assert anon_streaming_remaining == anon_streaming_limit - assert anon_streaming_at_limit? == false - {anon_static_at_limit?, anon_static_remaining, anon_static_limit} = RateLimiterConcurrent.check_concurrent_rate_limit(%ApiWeb.User{type: :anon}, false) From c8cc4988b55a0344710231ba4a0d350941c1cb4b Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 24 Jan 2024 03:19:30 -0500 Subject: [PATCH 11/22] test: fix unit tests for concurrent connections --- .../lib/api_web/rate_limiter/rate_limiter_concurrent.ex | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex index 3f574b86..c7ca5e14 100644 --- a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -137,14 +137,12 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do event_stream?, pid_key \\ nil ) do - if enabled?() do + if enabled?() and memcache?() do {_type, key} = lookup(user, event_stream?) pid_key = if pid_key, do: pid_key, else: get_pid_key(pid) - if memcache?() do - {:ok, _locks} = - mutate_locks(user, event_stream?, fn locks -> Map.delete(locks, pid_key) end) - end + {:ok, _locks} = + mutate_locks(user, event_stream?, fn locks -> Map.delete(locks, pid_key) end) Logger.info( "#{__MODULE__} event=remove_lock user_id=#{user.id} pid_key=#{pid_key} key=#{key}" From 07c8f6f607e6ec54bf3ff7765c0355312bd5222e Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 7 Feb 2024 08:24:13 -0500 Subject: [PATCH 12/22] chore: remove start_link/1 from behaviour ApiWeb.RateLimiter.Limiter --- apps/api_web/lib/api_web/rate_limiter/ets.ex | 1 - apps/api_web/lib/api_web/rate_limiter/limiter.ex | 5 ----- apps/api_web/lib/api_web/rate_limiter/memcache.ex | 5 ----- 3 files changed, 11 deletions(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/ets.ex b/apps/api_web/lib/api_web/rate_limiter/ets.ex index 1acf9d45..5c149568 100644 --- a/apps/api_web/lib/api_web/rate_limiter/ets.ex +++ b/apps/api_web/lib/api_web/rate_limiter/ets.ex @@ -7,7 +7,6 @@ defmodule ApiWeb.RateLimiter.ETS do @tab :mbta_api_rate_limiter - @impl ApiWeb.RateLimiter.Limiter def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end diff --git a/apps/api_web/lib/api_web/rate_limiter/limiter.ex b/apps/api_web/lib/api_web/rate_limiter/limiter.ex index 7ccf1900..5c2a2007 100644 --- a/apps/api_web/lib/api_web/rate_limiter/limiter.ex +++ b/apps/api_web/lib/api_web/rate_limiter/limiter.ex @@ -2,16 +2,11 @@ defmodule ApiWeb.RateLimiter.Limiter do @moduledoc """ Behavior for backends to the V3 API rate limiter. - - `start_link(opts)` is called to start the backend by the supervisor. - `rate_limited?(user_id, max_requests)` returns :rate_limited if the user_id has used too many requests, or else {:remaining, N} where N is the number of requests remaining for the user_id in this time period. - The main option passed to `start_link/1` is `clear_interval` which is a - number of milliseconds to bucket the requests into. - """ - @callback start_link(Keyword.t()) :: {:ok, pid} @callback rate_limited?(String.t(), non_neg_integer) :: {:remaining, non_neg_integer} | :rate_limited @callback clear() :: :ok diff --git a/apps/api_web/lib/api_web/rate_limiter/memcache.ex b/apps/api_web/lib/api_web/rate_limiter/memcache.ex index c6768cb1..550bf0a6 100644 --- a/apps/api_web/lib/api_web/rate_limiter/memcache.ex +++ b/apps/api_web/lib/api_web/rate_limiter/memcache.ex @@ -5,11 +5,6 @@ defmodule ApiWeb.RateLimiter.Memcache do @behaviour ApiWeb.RateLimiter.Limiter alias ApiWeb.RateLimiter.Memcache.Supervisor - @impl true - def start_link(_) do - {:ok, self()} - end - @impl ApiWeb.RateLimiter.Limiter def rate_limited?(key, max_requests) do case Supervisor.decr(key, default: max_requests) do From 2a6baeb03f4d9d24b22eb45af39de3c4d3151ea1 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 7 Feb 2024 08:34:09 -0500 Subject: [PATCH 13/22] chore: only call start_link/1 on limiter if it exists --- apps/api_web/lib/api_web/rate_limiter.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/api_web/lib/api_web/rate_limiter.ex b/apps/api_web/lib/api_web/rate_limiter.ex index 28a1fb8c..8f328e42 100644 --- a/apps/api_web/lib/api_web/rate_limiter.ex +++ b/apps/api_web/lib/api_web/rate_limiter.ex @@ -30,7 +30,9 @@ defmodule ApiWeb.RateLimiter do ## Client def start_link(_opts \\ []) do - @limiter.start_link(clear_interval: @clear_interval) + if Kernel.function_exported?(@limiter, :start_link, 1) do + @limiter.start_link(clear_interval: @clear_interval) + end end @doc """ From d843c2f0fe0b7cd351d2f8bd8a75a7758effb95a Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 7 Feb 2024 08:52:58 -0500 Subject: [PATCH 14/22] Revert "chore: remove start_link/1 from behaviour ApiWeb.RateLimiter.Limiter" This reverts commit 07c8f6f607e6ec54bf3ff7765c0355312bd5222e. --- apps/api_web/lib/api_web/rate_limiter/ets.ex | 1 + apps/api_web/lib/api_web/rate_limiter/limiter.ex | 5 +++++ apps/api_web/lib/api_web/rate_limiter/memcache.ex | 5 +++++ 3 files changed, 11 insertions(+) diff --git a/apps/api_web/lib/api_web/rate_limiter/ets.ex b/apps/api_web/lib/api_web/rate_limiter/ets.ex index 5c149568..1acf9d45 100644 --- a/apps/api_web/lib/api_web/rate_limiter/ets.ex +++ b/apps/api_web/lib/api_web/rate_limiter/ets.ex @@ -7,6 +7,7 @@ defmodule ApiWeb.RateLimiter.ETS do @tab :mbta_api_rate_limiter + @impl ApiWeb.RateLimiter.Limiter def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end diff --git a/apps/api_web/lib/api_web/rate_limiter/limiter.ex b/apps/api_web/lib/api_web/rate_limiter/limiter.ex index 5c2a2007..7ccf1900 100644 --- a/apps/api_web/lib/api_web/rate_limiter/limiter.ex +++ b/apps/api_web/lib/api_web/rate_limiter/limiter.ex @@ -2,11 +2,16 @@ defmodule ApiWeb.RateLimiter.Limiter do @moduledoc """ Behavior for backends to the V3 API rate limiter. + - `start_link(opts)` is called to start the backend by the supervisor. - `rate_limited?(user_id, max_requests)` returns :rate_limited if the user_id has used too many requests, or else {:remaining, N} where N is the number of requests remaining for the user_id in this time period. + The main option passed to `start_link/1` is `clear_interval` which is a + number of milliseconds to bucket the requests into. + """ + @callback start_link(Keyword.t()) :: {:ok, pid} @callback rate_limited?(String.t(), non_neg_integer) :: {:remaining, non_neg_integer} | :rate_limited @callback clear() :: :ok diff --git a/apps/api_web/lib/api_web/rate_limiter/memcache.ex b/apps/api_web/lib/api_web/rate_limiter/memcache.ex index 550bf0a6..c6768cb1 100644 --- a/apps/api_web/lib/api_web/rate_limiter/memcache.ex +++ b/apps/api_web/lib/api_web/rate_limiter/memcache.ex @@ -5,6 +5,11 @@ defmodule ApiWeb.RateLimiter.Memcache do @behaviour ApiWeb.RateLimiter.Limiter alias ApiWeb.RateLimiter.Memcache.Supervisor + @impl true + def start_link(_) do + {:ok, self()} + end + @impl ApiWeb.RateLimiter.Limiter def rate_limited?(key, max_requests) do case Supervisor.decr(key, default: max_requests) do From d7eb8db3bbf1ee11d9c62b74d443e3408323db44 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 7 Feb 2024 08:54:38 -0500 Subject: [PATCH 15/22] chore: switch to using a genserver to make code simpler --- apps/api_web/lib/api_web/rate_limiter.ex | 4 +--- apps/api_web/lib/api_web/rate_limiter/memcache.ex | 7 ++++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/apps/api_web/lib/api_web/rate_limiter.ex b/apps/api_web/lib/api_web/rate_limiter.ex index 8f328e42..28a1fb8c 100644 --- a/apps/api_web/lib/api_web/rate_limiter.ex +++ b/apps/api_web/lib/api_web/rate_limiter.ex @@ -30,9 +30,7 @@ defmodule ApiWeb.RateLimiter do ## Client def start_link(_opts \\ []) do - if Kernel.function_exported?(@limiter, :start_link, 1) do - @limiter.start_link(clear_interval: @clear_interval) - end + @limiter.start_link(clear_interval: @clear_interval) end @doc """ diff --git a/apps/api_web/lib/api_web/rate_limiter/memcache.ex b/apps/api_web/lib/api_web/rate_limiter/memcache.ex index c6768cb1..b55a5d11 100644 --- a/apps/api_web/lib/api_web/rate_limiter/memcache.ex +++ b/apps/api_web/lib/api_web/rate_limiter/memcache.ex @@ -4,10 +4,11 @@ defmodule ApiWeb.RateLimiter.Memcache do """ @behaviour ApiWeb.RateLimiter.Limiter alias ApiWeb.RateLimiter.Memcache.Supervisor + use GenServer - @impl true - def start_link(_) do - {:ok, self()} + @impl ApiWeb.RateLimiter.Limiter + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) end @impl ApiWeb.RateLimiter.Limiter From d13d6a0fbd8ca41800c04377b702e0f8bcc6a6a9 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 7 Feb 2024 09:02:21 -0500 Subject: [PATCH 16/22] chore: add missing placeholder init function --- apps/api_web/lib/api_web/rate_limiter/memcache.ex | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/memcache.ex b/apps/api_web/lib/api_web/rate_limiter/memcache.ex index b55a5d11..71903177 100644 --- a/apps/api_web/lib/api_web/rate_limiter/memcache.ex +++ b/apps/api_web/lib/api_web/rate_limiter/memcache.ex @@ -4,13 +4,17 @@ defmodule ApiWeb.RateLimiter.Memcache do """ @behaviour ApiWeb.RateLimiter.Limiter alias ApiWeb.RateLimiter.Memcache.Supervisor - use GenServer + use Agent @impl ApiWeb.RateLimiter.Limiter def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end + def init(opts) do + {:ok, opts} + end + @impl ApiWeb.RateLimiter.Limiter def rate_limited?(key, max_requests) do case Supervisor.decr(key, default: max_requests) do From 800a67ee736c58e7d272fc97fbaaaedf28b2ef6f Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 7 Feb 2024 09:22:24 -0500 Subject: [PATCH 17/22] chore: switch to persistent_term over GenServer state to hold server-wide UUID --- .../rate_limiter/rate_limiter_concurrent.ex | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex index c7ca5e14..b714110c 100644 --- a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -9,11 +9,13 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do alias ApiWeb.RateLimiter.Memcache.Supervisor @rate_limit_concurrent_config Application.compile_env!(:api_web, :rate_limiter_concurrent) - + @uuid_key "ApiWeb.RateLimiter.RateLimiterConcurrent_uuid" def start_link([]), do: GenServer.start_link(__MODULE__, nil, name: __MODULE__) def init(_) do - {:ok, %{uuid: UUID.uuid1()}} + uuid = UUID.uuid1() + :persistent_term.put(@uuid_key, uuid) + {:ok, %{uuid: uuid}} end defp lookup(%ApiWeb.User{} = user, event_stream?) do @@ -32,8 +34,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do end defp get_uuid do - {:ok, uuid} = GenServer.call(__MODULE__, :get_uuid) - uuid + :persistent_term.get(@uuid_key) end defp get_current_unix_ts do @@ -153,15 +154,11 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do end def enabled? do - Keyword.fetch!(@rate_limit_concurrent_config, :enabled) == true + Keyword.fetch!(@rate_limit_concurrent_config, :enabled) end def memcache? do - Keyword.fetch!(@rate_limit_concurrent_config, :memcache) == true - end - - def handle_call(:get_uuid, _from, state) do - {:reply, {:ok, state.uuid}, state} + Keyword.fetch!(@rate_limit_concurrent_config, :memcache) end def memcache_update(key, default_value, update_fn) do From 4277bac1838baf1dc58ef2dbfd88b136ce2a507c Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 7 Feb 2024 09:30:01 -0500 Subject: [PATCH 18/22] chore: update config to include new logging / actual enforcement toggles --- apps/api_web/config/config.exs | 2 ++ apps/api_web/config/dev.exs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/api_web/config/config.exs b/apps/api_web/config/config.exs index 667f4946..6c904cd9 100644 --- a/apps/api_web/config/config.exs +++ b/apps/api_web/config/config.exs @@ -90,6 +90,8 @@ config :api_web, :phoenix_swagger, "priv/static/swagger.json" => [router: ApiWeb.Router, endpoint: ApiWeb.Endpoint] } +config :api_web, :rate_limiter_concurrent, log_statistics: true, limit_users: false + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{config_env()}.exs" diff --git a/apps/api_web/config/dev.exs b/apps/api_web/config/dev.exs index fd21f5d7..39d6dbe7 100644 --- a/apps/api_web/config/dev.exs +++ b/apps/api_web/config/dev.exs @@ -29,4 +29,4 @@ config :logger, :console, format: "[$level] $message\n", level: :debug # and calculating stacktraces is usually expensive. config :phoenix, :stacktrace_depth, 20 -config :api_web, :rate_limiter_concurrent, enabled: true, memcache: true +config :api_web, :rate_limiter_concurrent, enabled: false, memcache: false From 0bf580af0b1fe15ef26b8e5b949434e8eb4367c7 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 7 Feb 2024 09:52:20 -0500 Subject: [PATCH 19/22] chore: add independent controls for a statistic logging mode and actual blocking when limit reached mode --- .../api_web/plugs/rate_limiter_concurrent.ex | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/apps/api_web/lib/api_web/plugs/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/plugs/rate_limiter_concurrent.ex index e25d9b79..f99e89d0 100644 --- a/apps/api_web/lib/api_web/plugs/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/plugs/rate_limiter_concurrent.ex @@ -6,6 +6,8 @@ defmodule ApiWeb.Plugs.RateLimiterConcurrent do import Plug.Conn import Phoenix.Controller, only: [render: 3, put_view: 2] + require Logger + alias ApiWeb.RateLimiter.RateLimiterConcurrent @rate_limit_concurrent_config Application.compile_env!(:api_web, :rate_limiter_concurrent) @@ -19,8 +21,14 @@ defmodule ApiWeb.Plugs.RateLimiterConcurrent do {at_limit?, remaining, limit} = RateLimiterConcurrent.check_concurrent_rate_limit(conn.assigns.api_user, event_stream?) + if log_statistics?() do + Logger.info( + "ApiWeb.Plugs.RateLimiterConcurrent event=request_statistics api_user=#{conn.assigns.api_user.id} at_limit=#{at_limit?} remaining=#{remaining - 1} limit=#{limit} event_stream=#{event_stream?}" + ) + end + # Allow negative limits to allow unlimited use: - if limit >= 0 and at_limit? do + if limit_users?() and limit >= 0 and at_limit? do conn |> put_concurrent_rate_limit_headers(limit, remaining) |> put_status(429) @@ -49,6 +57,14 @@ defmodule ApiWeb.Plugs.RateLimiterConcurrent do end def enabled? do - Keyword.fetch!(@rate_limit_concurrent_config, :enabled) == true + Keyword.fetch!(@rate_limit_concurrent_config, :enabled) + end + + def limit_users? do + Keyword.fetch!(@rate_limit_concurrent_config, :limit_users) + end + + def log_statistics? do + Keyword.fetch!(@rate_limit_concurrent_config, :log_statistics) end end From 9e55e82c772dbaee511c645ad46ac2772cf9a976 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 7 Feb 2024 09:59:21 -0500 Subject: [PATCH 20/22] chore: use GenServer instead of Agent for consistency --- apps/api_web/lib/api_web/rate_limiter/memcache.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/memcache.ex b/apps/api_web/lib/api_web/rate_limiter/memcache.ex index 71903177..9413f70b 100644 --- a/apps/api_web/lib/api_web/rate_limiter/memcache.ex +++ b/apps/api_web/lib/api_web/rate_limiter/memcache.ex @@ -4,13 +4,14 @@ defmodule ApiWeb.RateLimiter.Memcache do """ @behaviour ApiWeb.RateLimiter.Limiter alias ApiWeb.RateLimiter.Memcache.Supervisor - use Agent + use GenServer @impl ApiWeb.RateLimiter.Limiter def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end + @impl true def init(opts) do {:ok, opts} end From 8cf4dacaf98e05ad7749d5db8a601c336a648c1d Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Wed, 7 Feb 2024 10:40:52 -0500 Subject: [PATCH 21/22] chore: put log_statistics, limit_users with main rate_limit_concurrent config block --- apps/api_web/config/config.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/api_web/config/config.exs b/apps/api_web/config/config.exs index 6c904cd9..38a62532 100644 --- a/apps/api_web/config/config.exs +++ b/apps/api_web/config/config.exs @@ -36,6 +36,8 @@ config :api_web, :rate_limiter, config :api_web, :rate_limiter_concurrent, enabled: false, memcache: false, + log_statistics: true, + limit_users: false, # How many seconds tolerated when calculating whether a connection is still open # 45 - 30 (see ApiWeb.EventStream.Initialize's timeout value) gives us a buffer of 15 seconds: heartbeat_tolerance: 45, @@ -90,8 +92,6 @@ config :api_web, :phoenix_swagger, "priv/static/swagger.json" => [router: ApiWeb.Router, endpoint: ApiWeb.Endpoint] } -config :api_web, :rate_limiter_concurrent, log_statistics: true, limit_users: false - # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{config_env()}.exs" From 68b45cdad6a18b08c74f61dec5591a45975b26b5 Mon Sep 17 00:00:00 2001 From: Nick Stein Date: Fri, 9 Feb 2024 08:40:31 -0500 Subject: [PATCH 22/22] chore: remove unused return value --- .../api_web/rate_limiter/rate_limiter_concurrent.ex | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex index b714110c..42bbffad 100644 --- a/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex +++ b/apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex @@ -20,12 +20,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do defp lookup(%ApiWeb.User{} = user, event_stream?) do type = if event_stream?, do: "event_stream", else: "static" - key = "concurrent_#{user.id}_#{type}" - - { - type, - key - } + "concurrent_#{user.id}_#{type}" end def get_pid_key(pid) do @@ -49,7 +44,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do if enabled?() do current_timestamp = get_current_unix_ts() heartbeat_tolerance = get_heartbeat_tolerance() - {_type, key} = lookup(user, event_stream?) + key = lookup(user, event_stream?) memcache_update(key, %{}, fn locks -> valid_locks = @@ -113,7 +108,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do def add_lock(%ApiWeb.User{} = user, pid, event_stream?) do if enabled?() do - {_type, key} = lookup(user, event_stream?) + key = lookup(user, event_stream?) pid_key = get_pid_key(pid) timestamp = get_current_unix_ts() @@ -139,7 +134,7 @@ defmodule ApiWeb.RateLimiter.RateLimiterConcurrent do pid_key \\ nil ) do if enabled?() and memcache?() do - {_type, key} = lookup(user, event_stream?) + key = lookup(user, event_stream?) pid_key = if pid_key, do: pid_key, else: get_pid_key(pid) {:ok, _locks} =