-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: concurrent connections limiter #696
Changes from 24 commits
3985aab
2955855
ac71814
e5cb674
eefd8f6
f7ce456
9b1aac1
110bfc3
1643d61
2bd050d
2c62892
1233437
9c0353f
c8cc498
07c8f6f
2a6baeb
d843c2f
d7eb8db
d13d6a0
800a67e
4277bac
0bf580a
9e55e82
8cf4dac
68b45cd
cffa3d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ defmodule ApiWeb.EventStream do | |
import Plug.Conn | ||
alias __MODULE__.Supervisor | ||
alias ApiWeb.Plugs.CheckForShutdown | ||
alias ApiWeb.RateLimiter.RateLimiterConcurrent | ||
require Logger | ||
|
||
@enforce_keys [:conn, :pid, :timeout] | ||
|
@@ -53,6 +54,13 @@ defmodule ApiWeb.EventStream do | |
|
||
@spec hibernate_loop(state) :: Plug.Conn.t() | ||
def hibernate_loop(state) do | ||
if Map.has_key?(state.conn.assigns, :api_user) do | ||
# Update the concurrent rate limit cache to ensure any flushing doesn't impact long-running connections: | ||
RateLimiterConcurrent.add_lock(state.conn.assigns.api_user, self(), true) | ||
else | ||
Logger.warn("#{__MODULE__} missing_api_user - cannot rate limit!") | ||
end | ||
|
||
case receive_result(state) do | ||
{:continue, state} -> | ||
:proc_lib.hibernate(__MODULE__, :hibernate_loop, [state]) | ||
|
@@ -130,6 +138,13 @@ defmodule ApiWeb.EventStream do | |
end | ||
|
||
defp unsubscribe(state) do | ||
if Map.has_key?(state.conn.assigns, :api_user) do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: is it possible to get here without having logged a warning already? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel it is worth having the condition and warning given that this is a separate function but I also don't feel super strongly - if you want it removed, I trust your judgment 🙂 |
||
# clean up our concurrent connections lock: | ||
RateLimiterConcurrent.remove_lock(state.conn.assigns.api_user, self(), true) | ||
else | ||
Logger.warn("#{__MODULE__} missing_api_user - cannot rate limit!") | ||
end | ||
|
||
# consume any extra messages received after unsubscribing | ||
receive do | ||
{:events, _} -> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
defmodule ApiWeb.Plugs.RateLimiterConcurrent do | ||
@moduledoc """ | ||
Plug to invoke the concurrent rate limiter. | ||
""" | ||
|
||
import Plug.Conn | ||
import Phoenix.Controller, only: [render: 3, put_view: 2] | ||
|
||
require Logger | ||
|
||
alias ApiWeb.RateLimiter.RateLimiterConcurrent | ||
|
||
@rate_limit_concurrent_config Application.compile_env!(:api_web, :rate_limiter_concurrent) | ||
|
||
def init(opts), do: opts | ||
|
||
def call(conn, _opts) do | ||
if enabled?() do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: can we change this to check the rate limit and log the result if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added two additional feature flags: |
||
event_stream? = Plug.Conn.get_req_header(conn, "accept") == ["text/event-stream"] | ||
|
||
{at_limit?, remaining, limit} = | ||
RateLimiterConcurrent.check_concurrent_rate_limit(conn.assigns.api_user, event_stream?) | ||
|
||
if log_statistics?() do | ||
Logger.info( | ||
"ApiWeb.Plugs.RateLimiterConcurrent event=request_statistics api_user=#{conn.assigns.api_user.id} at_limit=#{at_limit?} remaining=#{remaining - 1} limit=#{limit} event_stream=#{event_stream?}" | ||
) | ||
end | ||
|
||
# Allow negative limits to allow unlimited use: | ||
if limit_users?() and limit >= 0 and at_limit? do | ||
conn | ||
|> put_concurrent_rate_limit_headers(limit, remaining) | ||
|> put_status(429) | ||
|> put_view(ApiWeb.ErrorView) | ||
|> render("429.json-api", []) | ||
|> halt() | ||
else | ||
RateLimiterConcurrent.add_lock(conn.assigns.api_user, self(), event_stream?) | ||
|
||
conn | ||
|> put_concurrent_rate_limit_headers(limit, remaining - 1) | ||
|> register_before_send(fn conn -> | ||
RateLimiterConcurrent.remove_lock(conn.assigns.api_user, self(), event_stream?) | ||
conn | ||
end) | ||
end | ||
else | ||
conn | ||
end | ||
end | ||
|
||
defp put_concurrent_rate_limit_headers(conn, limit, remaining) do | ||
conn | ||
|> put_resp_header("x-concurrent-ratelimit-limit", "#{limit}") | ||
|> put_resp_header("x-concurrent-ratelimit-remaining", "#{remaining}") | ||
end | ||
|
||
def enabled? do | ||
Keyword.fetch!(@rate_limit_concurrent_config, :enabled) | ||
end | ||
|
||
def limit_users? do | ||
Keyword.fetch!(@rate_limit_concurrent_config, :limit_users) | ||
end | ||
|
||
def log_statistics? do | ||
Keyword.fetch!(@rate_limit_concurrent_config, :log_statistics) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: enable the rate limiter by default. Then we can get rid of the calls enabling it in
dev
andprod
, simplifying configuration and ensuring that we default to rate limiting.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only concern with that is it would then require memcached running locally - this feature was built with the intent of it being a global limiter, so there's no local-only implementation to fall back on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused — when running locally (i.e.,
mix run --no-halt
),dev.exs
is used for config.enabled
andmemcache
both appear to betrue
indev.exs
:api/apps/api_web/config/dev.exs
Line 32 in c8cc498
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be fixed now, they should be disabled in
dev
.