Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent connections limiter #696

Merged
merged 26 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3985aab
feat: global concurrent connections limiter
nlwstein Jan 23, 2024
2955855
fix: use memcache pool from base rate limiter
nlwstein Jan 23, 2024
ac71814
fix: use boolean true
nlwstein Jan 23, 2024
e5cb674
Merge branch 'master' into nlws-concurrent-limiter-rebuild
nlwstein Jan 23, 2024
eefd8f6
Merge branch 'master' into nlws-concurrent-limiter-rebuild
nlwstein Jan 24, 2024
f7ce456
fix: use cas instead of get/set
nlwstein Jan 24, 2024
9b1aac1
Merge branch 'nlws-concurrent-limiter-rebuild' of github.com:mbta/api…
nlwstein Jan 24, 2024
110bfc3
chore: fix dialyzer complaints
nlwstein Jan 24, 2024
1643d61
chore: apply formatter
nlwstein Jan 24, 2024
2bd050d
fix: initialize memcache supervisor independently of rate_limiter, co…
nlwstein Jan 24, 2024
2c62892
fix: better logic for determining whether memcache supervisor should …
nlwstein Jan 24, 2024
1233437
fix: better logic for determining whether memcache supervisor should …
nlwstein Jan 24, 2024
9c0353f
test: fix unit tests for concurrent connections
nlwstein Jan 24, 2024
c8cc498
test: fix unit tests for concurrent connections
nlwstein Jan 24, 2024
07c8f6f
chore: remove start_link/1 from behaviour ApiWeb.RateLimiter.Limiter
nlwstein Feb 7, 2024
2a6baeb
chore: only call start_link/1 on limiter if it exists
nlwstein Feb 7, 2024
d843c2f
Revert "chore: remove start_link/1 from behaviour ApiWeb.RateLimiter.…
nlwstein Feb 7, 2024
d7eb8db
chore: switch to using a genserver to make code simpler
nlwstein Feb 7, 2024
d13d6a0
chore: add missing placeholder init function
nlwstein Feb 7, 2024
800a67e
chore: switch to persistent_term over GenServer state to hold server-…
nlwstein Feb 7, 2024
4277bac
chore: update config to include new logging / actual enforcement toggles
nlwstein Feb 7, 2024
0bf580a
chore: add independent controls for a statistic logging mode and actu…
nlwstein Feb 7, 2024
9e55e82
chore: use GenServer instead of Agent for consistency
nlwstein Feb 7, 2024
8cf4dac
chore: put log_statistics, limit_users with main rate_limit_concurren…
nlwstein Feb 7, 2024
68b45cd
chore: remove unused return value
nlwstein Feb 9, 2024
cffa3d4
Merge branch 'master' into nlws-concurrent-limiter-rebuild
nlwstein Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/api_accounts/lib/api_accounts/key.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule ApiAccounts.Key do
field(:requested_date, :datetime)
field(:approved, :boolean, default: false)
field(:locked, :boolean, default: false)
field(:static_concurrent_limit, :integer)
field(:streaming_concurrent_limit, :integer)
field(:daily_limit, :integer)
field(:rate_request_pending, :boolean, default: false)
field(:api_version, :string)
Expand All @@ -28,7 +30,7 @@ defmodule ApiAccounts.Key do
@doc false
def changeset(struct, params \\ %{}) do
fields = ~w(
created requested_date approved locked daily_limit rate_request_pending api_version description allowed_domains
created requested_date approved locked static_concurrent_limit streaming_concurrent_limit daily_limit rate_request_pending api_version description allowed_domains
)a
cast(struct, params, fields)
end
Expand Down
18 changes: 17 additions & 1 deletion apps/api_web/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,23 @@ config :api_web, :rate_limiter,
limiter: ApiWeb.RateLimiter.ETS,
max_anon_per_interval: 5_000,
max_registered_per_interval: 100_000,
wait_time_ms: 0
wait_time_ms: 0,
connection_opts: [
coder: Memcache.Coder.JSON
]

config :api_web, :rate_limiter_concurrent,
enabled: false,
memcache: false,
Comment on lines +37 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: enable the rate limiter by default. Then we can get rid of the calls enabling it in dev and prod, simplifying configuration and ensuring that we default to rate limiting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern with that is it would then require memcached running locally - this feature was built with the intent of it being a global limiter, so there's no local-only implementation to fall back on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern with that is it would then require memcached running locally

I'm confused — when running locally (i.e., mix run --no-halt), dev.exs is used for config. enabled and memcache both appear to be true in dev.exs:

config :api_web, :rate_limiter_concurrent, enabled: true, memcache: true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed now, they should be disabled in dev.

log_statistics: true,
limit_users: false,
# How many seconds tolerated when calculating whether a connection is still open
# 45 - 30 (see ApiWeb.EventStream.Initialize's timeout value) gives us a buffer of 15 seconds:
heartbeat_tolerance: 45,
# Default concurrent connections - these can be overridden on a per-key basis in the admin UI:
max_anon_static: 5,
max_registered_streaming: 10,
max_registered_static: 20

config :api_web, ApiWeb.Plugs.ModifiedSinceHandler, check_caller: false

Expand Down
2 changes: 2 additions & 0 deletions apps/api_web/config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ config :logger, :console, format: "[$level] $message\n", level: :debug
# Do not configure such in production as keeping
# and calculating stacktraces is usually expensive.
config :phoenix, :stacktrace_depth, 20

config :api_web, :rate_limiter_concurrent, enabled: false, memcache: false
4 changes: 4 additions & 0 deletions apps/api_web/config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ config :ehmon, :report_mf, {:ehmon, :info_report}

config :logster, :filter_parameters, ~w(password password_confirm)

config :api_web, :rate_limiter_concurrent,
enabled: true,
memcache: true

config :api_web, :rate_limiter,
clear_interval: 60_000,
max_anon_per_interval: 20,
Expand Down
4 changes: 4 additions & 0 deletions apps/api_web/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ config :recaptcha,

# Print only warnings and errors during test
config :logger, level: :warn

config :api_web, :rate_limiter_concurrent,
enabled: false,
memcache: false
2 changes: 2 additions & 0 deletions apps/api_web/lib/api_web.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ defmodule ApiWeb do
# no cover
children = [
# Start the endpoint when the application starts
ApiWeb.RateLimiter.Memcache.Supervisor,
ApiWeb.RateLimiter,
ApiWeb.RateLimiter.RateLimiterConcurrent,
{RequestTrack, [name: ApiWeb.RequestTrack]},
ApiWeb.EventStream.Supervisor,
ApiWeb.Endpoint,
Expand Down
1 change: 1 addition & 0 deletions apps/api_web/lib/api_web/api_controller_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule ApiWeb.ApiControllerHelpers do
plug(:split_include)
plug(ApiWeb.Plugs.ModifiedSinceHandler, caller: __MODULE__)
plug(ApiWeb.Plugs.RateLimiter)
plug(ApiWeb.Plugs.RateLimiterConcurrent)

def index(conn, params), do: ApiControllerHelpers.index(__MODULE__, conn, params)

Expand Down
15 changes: 15 additions & 0 deletions apps/api_web/lib/api_web/event_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule ApiWeb.EventStream do
import Plug.Conn
alias __MODULE__.Supervisor
alias ApiWeb.Plugs.CheckForShutdown
alias ApiWeb.RateLimiter.RateLimiterConcurrent
require Logger

@enforce_keys [:conn, :pid, :timeout]
Expand Down Expand Up @@ -53,6 +54,13 @@ defmodule ApiWeb.EventStream do

@spec hibernate_loop(state) :: Plug.Conn.t()
def hibernate_loop(state) do
if Map.has_key?(state.conn.assigns, :api_user) do
# Update the concurrent rate limit cache to ensure any flushing doesn't impact long-running connections:
RateLimiterConcurrent.add_lock(state.conn.assigns.api_user, self(), true)
else
Logger.warn("#{__MODULE__} missing_api_user - cannot rate limit!")
end

case receive_result(state) do
{:continue, state} ->
:proc_lib.hibernate(__MODULE__, :hibernate_loop, [state])
Expand Down Expand Up @@ -130,6 +138,13 @@ defmodule ApiWeb.EventStream do
end

defp unsubscribe(state) do
if Map.has_key?(state.conn.assigns, :api_user) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: is it possible to get here without having logged a warning already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it is worth having the condition and warning given that this is a separate function but I also don't feel super strongly - if you want it removed, I trust your judgment 🙂

# clean up our concurrent connections lock:
RateLimiterConcurrent.remove_lock(state.conn.assigns.api_user, self(), true)
else
Logger.warn("#{__MODULE__} missing_api_user - cannot rate limit!")
end

# consume any extra messages received after unsubscribing
receive do
{:events, _} ->
Expand Down
70 changes: 70 additions & 0 deletions apps/api_web/lib/api_web/plugs/rate_limiter_concurrent.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule ApiWeb.Plugs.RateLimiterConcurrent do
@moduledoc """
Plug to invoke the concurrent rate limiter.
"""

import Plug.Conn
import Phoenix.Controller, only: [render: 3, put_view: 2]

require Logger

alias ApiWeb.RateLimiter.RateLimiterConcurrent

@rate_limit_concurrent_config Application.compile_env!(:api_web, :rate_limiter_concurrent)

def init(opts), do: opts

def call(conn, _opts) do
if enabled?() do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: can we change this to check the rate limit and log the result if memcache is true, but only enforce the rate limit if enabled is true? I'm thinking about how we roll this out, and I don't think there's a good way to understand the expected values for some of our higher-throughput applications (dotcom, screens) without getting some benchmark data first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added two additional feature flags: :log_statistics and :limit_users to support these behaviors, and implemented a log statement w/ API key + current state, and added :limit_users == true to the condition that actually blocks the user from accessing until their cooldown.

event_stream? = Plug.Conn.get_req_header(conn, "accept") == ["text/event-stream"]

{at_limit?, remaining, limit} =
RateLimiterConcurrent.check_concurrent_rate_limit(conn.assigns.api_user, event_stream?)

if log_statistics?() do
Logger.info(
"ApiWeb.Plugs.RateLimiterConcurrent event=request_statistics api_user=#{conn.assigns.api_user.id} at_limit=#{at_limit?} remaining=#{remaining - 1} limit=#{limit} event_stream=#{event_stream?}"
)
end

# Allow negative limits to allow unlimited use:
if limit_users?() and limit >= 0 and at_limit? do
conn
|> put_concurrent_rate_limit_headers(limit, remaining)
|> put_status(429)
|> put_view(ApiWeb.ErrorView)
|> render("429.json-api", [])
|> halt()
else
RateLimiterConcurrent.add_lock(conn.assigns.api_user, self(), event_stream?)

conn
|> put_concurrent_rate_limit_headers(limit, remaining - 1)
|> register_before_send(fn conn ->
RateLimiterConcurrent.remove_lock(conn.assigns.api_user, self(), event_stream?)
conn
end)
end
else
conn
end
end

defp put_concurrent_rate_limit_headers(conn, limit, remaining) do
conn
|> put_resp_header("x-concurrent-ratelimit-limit", "#{limit}")
|> put_resp_header("x-concurrent-ratelimit-remaining", "#{remaining}")
end

def enabled? do
Keyword.fetch!(@rate_limit_concurrent_config, :enabled)
end

def limit_users? do
Keyword.fetch!(@rate_limit_concurrent_config, :limit_users)
end

def log_statistics? do
Keyword.fetch!(@rate_limit_concurrent_config, :log_statistics)
end
end
12 changes: 6 additions & 6 deletions apps/api_web/lib/api_web/rate_limiter/memcache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ defmodule ApiWeb.RateLimiter.Memcache do
"""
@behaviour ApiWeb.RateLimiter.Limiter
alias ApiWeb.RateLimiter.Memcache.Supervisor
use GenServer

@impl ApiWeb.RateLimiter.Limiter
def start_link(opts) do
clear_interval_ms = Keyword.fetch!(opts, :clear_interval)
clear_interval = div(clear_interval_ms, 1000)

connection_opts =
[ttl: clear_interval * 2] ++ ApiWeb.config(RateLimiter.Memcache, :connection_opts)
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

Supervisor.start_link(connection_opts)
@impl true
def init(opts) do
{:ok, opts}
end

@impl ApiWeb.RateLimiter.Limiter
Expand Down
33 changes: 26 additions & 7 deletions apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,29 @@ defmodule ApiWeb.RateLimiter.Memcache.Supervisor do
"""
@worker_count 5
@registry_name __MODULE__.Registry
@rate_limit_config Application.compile_env!(:api_web, :rate_limiter)

use Agent

def start_link(_) do
clear_interval_ms = Keyword.fetch!(@rate_limit_config, :clear_interval)
clear_interval = div(clear_interval_ms, 1000)
connection_opts_config = Keyword.fetch!(@rate_limit_config, :connection_opts)
connection_opts = [ttl: clear_interval * 2] ++ connection_opts_config

def start_link(connection_opts) do
registry = {Registry, keys: :unique, name: @registry_name}

workers =
for i <- 1..@worker_count do
Supervisor.child_spec({Memcache, [connection_opts, [name: worker_name(i)]]}, id: i)
end
children =
if memcache_required?() do
workers =
for i <- 1..@worker_count do
Supervisor.child_spec({Memcache, [connection_opts, [name: worker_name(i)]]}, id: i)
end

children = [registry | workers]
[registry | workers]
else
[registry]
end

Supervisor.start_link(
children,
Expand All @@ -31,7 +44,13 @@ defmodule ApiWeb.RateLimiter.Memcache.Supervisor do
{:via, Registry, {@registry_name, index}}
end

defp random_child do
defp memcache_required? do
(ApiWeb.RateLimiter.RateLimiterConcurrent.enabled?() and
ApiWeb.RateLimiter.RateLimiterConcurrent.memcache?()) or
ApiWeb.config(:rate_limiter, :limiter) == ApiWeb.RateLimiter.Memcache
end

def random_child do
worker_name(:rand.uniform(@worker_count))
end
end
Loading
Loading