Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom bucket increments #87

Open
Trevoke opened this issue Sep 9, 2023 · 2 comments
Open

Custom bucket increments #87

Trevoke opened this issue Sep 9, 2023 · 2 comments

Comments

@Trevoke
Copy link

Trevoke commented Sep 9, 2023

In our new and fancy world of LLMs and OpenAI, we have rate limits by number of message-tokens per minute, which means the bucket needs to be increased by a somewhat arbitrary number. Obviously, I could just call "check_rate" sequentially in a loop a few hundred times, but that seems... A little silly.

Would it be possible to expand the public API a little, and allow us to pass in an optional number-by-which-to-increase-the-number-of-tokens-in-the-bucket?

@garretttaco
Copy link

Just following up with @Trevoke's question above. I also think this would be valuable 🙂

@ruslandoga
Copy link

ruslandoga commented Nov 13, 2024

👋 Hi all! For anyone looking for a zero-dependency rate limiter,:ets has everything you need.

Here’s a minimal and efficient example (adapted from plug_attack) that uses the same fixed window counter algorithm as ExRated.
defmodule MyApp.RateLimit do
  @moduledoc """
  Thin wrapper around `:ets.update_counter/4` and a clean-up process to act as a fixed window rate limiter.
  """

  use GenServer

  @doc """
  Starts the process that creates and cleans the ETS table.

  Accepts the following options:
  - `:clean_period` for how often to perform garbage collection
  """
  @spec start_link([{:clean_period, timeout}]) :: GenServer.on_start()
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @table __MODULE__

  @type key :: any
  @type scale :: pos_integer
  @type limit :: pos_integer
  @type increment :: pos_integer
  @type count :: non_neg_integer

  @doc "Checks the rate-limit for a key"
  @spec check_rate(key, scale, limit, increment) :: {:allow, count} | {:deny, limit}
  def check_rate(key, scale, limit, increment \\ 1) do
    count = hit(key, scale, increment)
    if count <= limit, do: {:allow, count}, else: {:deny, limit}
  end

  @doc "Increments the current count for the key and returns the new count"
  @spec hit(key, scale, increment) :: count
  def hit(key, scale, increment \\ 1) do
    window = window(scale)
    full_key = {key, window}
    expires_at = (window + 1) * scale
    :ets.update_counter(@table, full_key, increment, {full_key, 0, expires_at})
  end

  # The rest of the API is optional so it's commented out.

  # @doc "Sets the new count for the key"
  # @spec set(key, scale, count) :: count
  # def set(key, scale, count) do
  #   window = window(scale)
  #   full_key = {key, window}
  #   expires_at = (window + 1) * scale
  #   :ets.update_counter(@table, full_key, {2, 1, 0, count}, {full_key, count, expires_at})
  # end

  # @doc "Gets the current count for the key"
  # @spec get(key, scale, limit) :: count
  # def get(key, scale, limit) do
  #   window = window(scale)
  #   full_key = {key, window}

  #   case :ets.lookup(@table, full_key) do
  #     [{_full_key, count, _expires_at}] -> max(count, limit)
  #     [] -> 0
  #   end
  # end

  # @doc "Sets the current count for the key to zero"
  # @spec reset(key, scale) :: count
  # def reset(key, scale), do: set(key, scale, 0)

  @impl true
  def init(opts) do
    clean_period = Keyword.fetch!(opts, :clean_period)

    :ets.new(@table, [
      :named_table,
      :set,
      :public,
      {:read_concurrency, true},
      {:write_concurrency, true},
      {:decentralized_counters, true}
    ])

    schedule(clean_period)
    {:ok, %{clean_period: clean_period}}
  end

  @impl true
  def handle_info(:clean, state) do
    :ets.select_delete(@table, [{{{:_, :_}, :_, :"$1"}, [], [{:<, :"$1", {:const, now()}}]}])
    schedule(state.clean_period)
    {:noreply, state}
  end

  defp schedule(clean_period) do
    Process.send_after(self(), :clean, clean_period)
  end

  @compile inline: [now: 0]
  defp now do
    System.system_time(:millisecond)
  end

  @compile inline: [window: 1]
  defp window(scale) do
    div(now(), scale)
  end
end
And a demo.
iex> MyApp.RateLimit.start_link(clean_period: :timer.minutes(10))
#==> {:ok, #PID<0.114.0>}

iex> MyApp.RateLimit.check_rate("openai:123", :timer.minutes(1), _limit = 100000, _tokens = 1000)
#==> {:allow, 1000}

iex> MyApp.RateLimit.check_rate("openai:123", :timer.minutes(1), _limit = 100000, _tokens = 1000)
#==> {:allow, 2000}

iex> MyApp.RateLimit.check_rate("openai:123", :timer.minutes(1), _limit = 100000, _tokens = 1000)
#==> {:allow, 3000}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants