Skip to content

Commit

Permalink
fix(http-ratelimiting): global rate limit penalties are now applied c…
Browse files Browse the repository at this point in the history
…orrectly
  • Loading branch information
greaka committed Feb 26, 2023
1 parent 7b42fff commit cd8ad2c
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions twilight-http-ratelimiting/src/in_memory/global_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use crate::ticket::TicketNotifier;
use crate::RatelimitHeaders;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, Semaphore};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Mutex, Semaphore};
use tokio::time::Instant;

/// seconds per period
Expand Down Expand Up @@ -73,19 +74,31 @@ impl InnerGlobalBucket {
async fn run_global_queue_task(bucket: Arc<InnerGlobalBucket>, period: u64, requests: u32) {
let mut time = Instant::now();
let semaphore = Arc::new(Semaphore::new(requests as usize));
let (penalty_tx, mut penalty_rx) = mpsc::channel(requests as usize);

while let Some(queue_tx) = bucket.queue.pop().await {
wait_if_needed(bucket.as_ref(), &mut time, period, requests).await;

tokio::spawn(process_request(bucket.clone(), semaphore.clone(), queue_tx));
wait_if_needed(
bucket.as_ref(),
&mut time,
period,
requests,
&mut penalty_rx,
)
.await;

tokio::spawn(process_request(
semaphore.clone(),
queue_tx,
penalty_tx.clone(),
));
}
}

#[tracing::instrument(name = "process request", skip_all)]
async fn process_request(
bucket: Arc<InnerGlobalBucket>,
semaphore: Arc<Semaphore>,
queue_tx: TicketNotifier,
penalties: Sender<Instant>,
) {
// This error should never occur, but if it does, do not lock up
let _permit = semaphore.acquire().await;
Expand All @@ -99,8 +112,8 @@ async fn process_request(
if let Ok(Some(RatelimitHeaders::Global(headers))) = ticket_headers.await {
tracing::debug!(seconds = headers.retry_after(), "globally ratelimited");

let _guard = bucket.is_locked.lock().await;
tokio::time::sleep(Duration::from_secs(headers.retry_after())).await;
let deadline = Instant::now() + Duration::from_secs(headers.retry_after());
penalties.send(deadline).await.ok();
}
}

Expand All @@ -110,24 +123,31 @@ async fn wait_if_needed(
time: &mut Instant,
period: u64,
requests: u32,
penalties: &mut Receiver<Instant>,
) {
let period = Duration::from_secs(period);
let fill_rate = period / requests;

let now = Instant::now();
// base contingent of 1 period worth of requests
// maximum requests at once is 1 period worth of requests
let base = now - period;
// reset to base if no request came in for long enough
// if the bucket currently holds more requests than maximum, set to maximum
if base > *time {
*time = base;
}

// we request one request worth of rate limit consumption
// deduct one request from current capacity
*time += fill_rate;

// if time > now, wait until there is capacity available again
// if time > now, then the bucket is exhausted. wait until a request is available again
if *time > now {
let _guard = bucket.is_locked.lock().await;
tokio::time::sleep_until(*time).await;
}

// wait for penalties
while let Ok(deadline) = penalties.try_recv() {
let _guard = bucket.is_locked.lock().await;
tokio::time::sleep_until(deadline).await;
}
}

0 comments on commit cd8ad2c

Please sign in to comment.