From cd8ad2cbd41f6ca97c5833c754f713c0244c8a07 Mon Sep 17 00:00:00 2001 From: Greaka Date: Sun, 26 Feb 2023 16:38:43 +0100 Subject: [PATCH] fix(http-ratelimiting): global rate limit penalties are now applied correctly --- .../src/in_memory/global_bucket.rs | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs index b8ce7d41b61..cdd02ff8c7b 100644 --- a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs +++ b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs @@ -5,7 +5,8 @@ use crate::ticket::TicketNotifier; use crate::RatelimitHeaders; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{Mutex, Semaphore}; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::{mpsc, Mutex, Semaphore}; use tokio::time::Instant; /// seconds per period @@ -73,19 +74,31 @@ impl InnerGlobalBucket { async fn run_global_queue_task(bucket: Arc, period: u64, requests: u32) { let mut time = Instant::now(); let semaphore = Arc::new(Semaphore::new(requests as usize)); + let (penalty_tx, mut penalty_rx) = mpsc::channel(requests as usize); while let Some(queue_tx) = bucket.queue.pop().await { - wait_if_needed(bucket.as_ref(), &mut time, period, requests).await; - - tokio::spawn(process_request(bucket.clone(), semaphore.clone(), queue_tx)); + wait_if_needed( + bucket.as_ref(), + &mut time, + period, + requests, + &mut penalty_rx, + ) + .await; + + tokio::spawn(process_request( + semaphore.clone(), + queue_tx, + penalty_tx.clone(), + )); } } #[tracing::instrument(name = "process request", skip_all)] async fn process_request( - bucket: Arc, semaphore: Arc, queue_tx: TicketNotifier, + penalties: Sender, ) { // This error should never occur, but if it does, do not lock up let _permit = semaphore.acquire().await; @@ -99,8 +112,8 @@ async fn process_request( if let Ok(Some(RatelimitHeaders::Global(headers))) = ticket_headers.await { tracing::debug!(seconds = headers.retry_after(), "globally ratelimited"); - let _guard = bucket.is_locked.lock().await; - tokio::time::sleep(Duration::from_secs(headers.retry_after())).await; + let deadline = Instant::now() + Duration::from_secs(headers.retry_after()); + penalties.send(deadline).await.ok(); } } @@ -110,24 +123,31 @@ async fn wait_if_needed( time: &mut Instant, period: u64, requests: u32, + penalties: &mut Receiver, ) { let period = Duration::from_secs(period); let fill_rate = period / requests; let now = Instant::now(); - // base contingent of 1 period worth of requests + // maximum requests at once is 1 period worth of requests let base = now - period; - // reset to base if no request came in for long enough + // if the bucket currently holds more requests than maximum, set to maximum if base > *time { *time = base; } - // we request one request worth of rate limit consumption + // deduct one request from current capacity *time += fill_rate; - // if time > now, wait until there is capacity available again + // if time > now, then the bucket is exhausted. wait until a request is available again if *time > now { let _guard = bucket.is_locked.lock().await; tokio::time::sleep_until(*time).await; } + + // wait for penalties + while let Ok(deadline) = penalties.try_recv() { + let _guard = bucket.is_locked.lock().await; + tokio::time::sleep_until(deadline).await; + } }