diff --git a/book/src/chapter_2_multi-serviced_approach.md b/book/src/chapter_2_multi-serviced_approach.md index dcd1ff39a67..0a305bdecd1 100644 --- a/book/src/chapter_2_multi-serviced_approach.md +++ b/book/src/chapter_2_multi-serviced_approach.md @@ -14,19 +14,18 @@ One of the popular design choices when creating a multi-serviced application is to have a service that only connects shards to the gateway and sends the events to a broker to be processed. As bots grow into hundreds or thousands of shards, multiple instances of the application can be created and groups of shards can be -managed by each. Twilight is an excellent choice for this use case: you can -receive either events that come in in a loop and send the payloads to the -appropriate broker stream, or you can loop over received payloads' bytes to send -off. +managed by each. Twilight is an excellent choice for this use case: just receive +and send the payloads to the appropriate broker stream. Twilight shards need +only partially deserialize payloads to function. ## Gateway session ratelimiting If multiple shard groups are used, then they need to be queued and their session -initialization ratelimited. The Gateway includes a Queue trait which can be +initialization ratelimited. The gateway includes a Queue trait which can be implemented; the gateway will submit a request to the queue before starting a -session. Twilight comes with a queue that supports sharding and Large Bot -sharding, but when multiple shard groups are in use then a custom queue will -need to be implemented. Refer to [gateway-queue] for an example of this. +session. Twilight comes with a queue that supports Large Bot sharding, but when +multiple shard groups are in use then a custom queue will need to be implemented. +Refer to [gateway-queue] for an example of this. ## HTTP proxy ratelimiting diff --git a/examples/Cargo.toml b/examples/Cargo.toml index a0a7502cf02..21bd07d475f 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -28,6 +28,10 @@ twilight-standby = { path = "../twilight-standby" } name = "gateway-parallel" path = "gateway-parallel.rs" +[[example]] +name = "gateway-queue-http" +path = "gateway-queue-http.rs" + [[example]] name = "gateway-reshard" path = "gateway-reshard.rs" diff --git a/examples/gateway-queue-http.rs b/examples/gateway-queue-http.rs new file mode 100644 index 00000000000..8db24819eef --- /dev/null +++ b/examples/gateway-queue-http.rs @@ -0,0 +1,62 @@ +use hyper::client::{Client, HttpConnector}; +use std::{env, sync::Arc}; +use tokio::sync::oneshot; +use twilight_gateway::{queue::Queue, Config, Intents, Shard, ShardId}; + +#[derive(Debug)] +struct HttpQueue(Client); + +impl Queue for HttpQueue { + fn enqueue(&self, id: u32) -> oneshot::Receiver<()> { + let (mut tx, rx) = oneshot::channel(); + let uri = format!("http://127.0.0.1:8000/?shard={id}"); + let req = self.0.get(uri.parse().unwrap()); + + tokio::spawn(async move { + tokio::select! { + _ = tx.closed() => {} + res = req => { + match res { + Ok(_) => _ = tx.send(()), + Err(source) => tracing::info!("error sending request: {source}"), + } + } + } + }); + + rx + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let token = env::var("DISCORD_TOKEN")?; + let intents = Intents::GUILDS | Intents::GUILD_VOICE_STATES; + + let config = Config::builder(token, intents) + .queue(Arc::new(HttpQueue(Client::new()))) + .build(); + + let mut shard = Shard::with_config(ShardId::ONE, config); + + loop { + let event = match shard.next_event().await { + Ok(event) => event, + Err(source) => { + tracing::warn!(?source, "error receiving event"); + + if source.is_fatal() { + break; + } + + continue; + } + }; + + tracing::debug!(?event, "received event"); + } + + Ok(()) +} diff --git a/twilight-gateway-queue/Cargo.toml b/twilight-gateway-queue/Cargo.toml index 67049acfc27..c1daf688c71 100644 --- a/twilight-gateway-queue/Cargo.toml +++ b/twilight-gateway-queue/Cargo.toml @@ -13,18 +13,10 @@ rust-version.workspace = true version = "0.15.4" [dependencies] -tokio = { default-features = false, features = ["rt", "sync", "time"], version = "1.0" } -tracing = { default-features = false, features = ["std", "attributes"], version = "0.1" } - -# Optional dependencies. -twilight-http = { default-features = false, optional = true, path = "../twilight-http", version = "0.15.4" } +tokio = { default-features = false, features = ["macros", "rt", "sync", "time"], version = "1.15" } +tracing = { default-features = false, features = ["std"], version = "0.1" } [dev-dependencies] static_assertions = { default-features = false, version = "1" } - -[features] -default = ["twilight-http"] - -[package.metadata.docs.rs] -all-features = true -rustdoc-args = ["--cfg", "docsrs"] +tokio = { default-features = false, features = ["test-util"], version = "1.0" } +twilight-http = { default-features = false, path = "../twilight-http", version = "0.15.2" } diff --git a/twilight-gateway-queue/README.md b/twilight-gateway-queue/README.md index edd8fc2bb6c..4d604ef8065 100644 --- a/twilight-gateway-queue/README.md +++ b/twilight-gateway-queue/README.md @@ -1,42 +1,31 @@ # twilight-gateway-queue -Ratelimiting functionality for queueing new gateway sessions. - -The gateway ratelimits how often clients can initialize new sessions. -Instances of a queue are given to shards so that they can request to -initialize a session. - -Queue implementations must point to the same broker so that all shards -across all shard groups, processes, and other forms of multi-serviced -applications, can work together and use the same ratelimiting source. That is, -if two shard groups are in two different processes, then the the two processes -must use some unified form of ratelimiting: this can either mean using IPC to -communicate ratelimiting or a broker. - -## Provided queues - -Most users only need the [`LocalQueue`]: it's a single-process queue for -smaller bots. Larger bots need the [`LargeBotQueue`], which supports -single-process [Sharding for Large Bots] through the use of bucket -releasing. - -By default, the gateway's `stream` module and `Shard`s use the [`LocalQueue`]. -This can be overridden via the `ShardBuilder::queue` configuration method. - -## Advanced use cases - -Large bots, and smaller bots out of design, may need to implement their own -queue. The most common reason to need this is if you have shard groups in -multiple processes. A broker to manage ratelimiting across shard groups is -required, so a [`Queue`] trait is provided that shards can use to make requests -to create sessions. - -## Features - -### Twilight-HTTP - -The `twilight-http` feature brings in support for [`LargeBotQueue`]. - -This is enabled by default. - -[Sharding for Large Bots]: https://discord.com/developers/docs/topics/gateway#sharding-for-large-bots +[![codecov badge][]][codecov link] [![discord badge][]][discord link] [![github badge][]][github link] [![license badge][]][license link] ![rust badge] + +Rate limiting functionality for gateway `IDENTIFY` commands. + +Discord allows bot's shards to send a limited amount of `IDENTIFY` commands +every 5 seconds, with a daily limit from 1000 to 2000 commands, and invalidates +*all* shard sessions upon exceeding it. Each identify interval may be filled by +shards' IDs modulo `max_concurrency` and such a set of shards is called a +bucket. See [Discord Docs/Sharding]. + +To coordinate this, a [`Queue`] should process each identify request and shards +should wait for its signal to proceed before continuing and otherwise retry. The +provided [`InMemoryQueue`] never fails or cancels requests and is therefore a +good starting point for custom implementations. It can also be composed to +support multiple processes; see [`gateway-queue-http`] and [`gateway-queue`] for +a HTTP client and server implementation, respectively. + +[codecov badge]: https://img.shields.io/codecov/c/gh/twilight-rs/twilight?logo=codecov&style=for-the-badge&token=E9ERLJL0L2 +[codecov link]: https://app.codecov.io/gh/twilight-rs/twilight/ +[discord badge]: https://img.shields.io/discord/745809834183753828?color=%237289DA&label=discord%20server&logo=discord&style=for-the-badge +[Discord Docs/Sharding]: https://discord.com/developers/docs/topics/gateway#sharding +[discord link]: https://discord.gg/twilight-rs +[`gateway-queue`]: https://github.com/twilight-rs/gateway-queue +[`gateway-queue-http`]: https://github.com/twilight-rs/twilight/blob/main/examples/gateway-queue-http.rs +[github badge]: https://img.shields.io/badge/github-twilight-6f42c1.svg?style=for-the-badge&logo=github +[github link]: https://github.com/twilight-rs/twilight +[license badge]: https://img.shields.io/badge/license-ISC-blue.svg?style=for-the-badge&logo=pastebin +[license link]: https://github.com/twilight-rs/twilight/blob/main/LICENSE.md +[rust badge]: https://img.shields.io/badge/rust-1.67+-93450a.svg?style=for-the-badge&logo=rust diff --git a/twilight-gateway-queue/src/day_limiter.rs b/twilight-gateway-queue/src/day_limiter.rs deleted file mode 100644 index 92b75108841..00000000000 --- a/twilight-gateway-queue/src/day_limiter.rs +++ /dev/null @@ -1,124 +0,0 @@ -use std::{ - error::Error, - fmt::{Display, Formatter, Result as FmtResult}, - sync::Arc, - time::Duration, -}; -use tokio::{ - sync::Mutex, - time::{self, Instant}, -}; -use twilight_http::Client; - -/// Creating a day limiter queue failed. -#[derive(Debug)] -pub struct DayLimiterError { - kind: DayLimiterErrorType, - source: Option>, -} - -impl Display for DayLimiterError { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - match &self.kind { - DayLimiterErrorType::RetrievingSessionAvailability { .. } => { - f.write_str("retrieving the bot's gateway session availability failed") - } - } - } -} - -impl Error for DayLimiterError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - self.source - .as_ref() - .map(|source| &**source as &(dyn Error + 'static)) - } -} - -/// Type of [`DayLimiterError`] that occurred. -#[derive(Debug)] -#[non_exhaustive] -pub enum DayLimiterErrorType { - /// Retrieving the bot's available gateway session initiation information - /// via the HTTP API failed. - RetrievingSessionAvailability, -} - -#[derive(Debug)] -pub(crate) struct DayLimiter(pub(crate) Mutex); - -#[derive(Debug)] -pub(crate) struct DayLimiterInner { - pub http: Arc, - pub last_check: Instant, - pub next_reset: Duration, - pub total: u16, - pub current: u16, -} - -impl DayLimiter { - pub async fn new(http: Arc) -> Result { - let info = http - .gateway() - .authed() - .await - .map_err(|source| DayLimiterError { - kind: DayLimiterErrorType::RetrievingSessionAvailability, - source: Some(Box::new(source)), - })? - .model() - .await - .map_err(|source| DayLimiterError { - kind: DayLimiterErrorType::RetrievingSessionAvailability, - source: Some(Box::new(source)), - })?; - - let last_check = Instant::now(); - - let next_reset = Duration::from_millis(info.session_start_limit.reset_after); - let total = info.session_start_limit.total; - let remaining = info.session_start_limit.remaining; - debug_assert!(total >= remaining); - let current = total - remaining; - Ok(DayLimiter(Mutex::new(DayLimiterInner { - http, - last_check, - next_reset, - total: info.session_start_limit.total, - current, - }))) - } - - pub async fn get(&self) { - let mut lock = self.0.lock().await; - if lock.current < lock.total { - lock.current += 1; - } else { - let wait = lock.last_check + lock.next_reset; - time::sleep_until(wait).await; - if let Ok(res) = lock.http.gateway().authed().await { - if let Ok(info) = res.model().await { - let last_check = Instant::now(); - let next_reset = Duration::from_millis(info.session_start_limit.reset_after); - - tracing::info!("next session start limit reset in: {next_reset:.2?}"); - - let total = info.session_start_limit.total; - let remaining = info.session_start_limit.remaining; - assert!(total >= remaining); - let current = total - remaining; - lock.last_check = last_check; - lock.next_reset = next_reset; - lock.total = total; - lock.current = current + 1; - - return; - } - } - - tracing::warn!( - "unable to get new session limits, skipping (this may cause bad things)" - ); - } - } -} diff --git a/twilight-gateway-queue/src/in_memory.rs b/twilight-gateway-queue/src/in_memory.rs new file mode 100644 index 00000000000..4f80d771849 --- /dev/null +++ b/twilight-gateway-queue/src/in_memory.rs @@ -0,0 +1,272 @@ +//! Memory based [`Queue`] implementation and supporting items. + +use super::{Queue, IDENTIFY_DELAY, LIMIT_PERIOD}; +use std::{collections::VecDeque, fmt::Debug, iter}; +use tokio::{ + sync::{mpsc, oneshot}, + task::yield_now, + time::{sleep_until, Duration, Instant}, +}; + +/// Possible messages from the [`InMemoryQueue`] to the [`runner`]. +#[derive(Debug)] +enum Message { + /// Request a permit. + Request { + /// For this shard. + shard: u32, + /// Indicate readiness through this sender. + tx: oneshot::Sender<()>, + }, + /// Update the runner's settings. + Update(Settings), +} + +/// [`runner`]'s settings. +#[derive(Debug)] +struct Settings { + /// The maximum number of concurrent permits to grant. `0` instantly grants + /// all permits. + max_concurrency: u8, + /// Remaining daily permits. + remaining: u16, + /// Time until the daily permits reset. + reset_after: Duration, + /// The number of permits to reset to. + total: u16, +} + +/// [`InMemoryQueue`]'s background task runner. +/// +/// Buckets requests such that only one timer is necessary. +async fn runner( + mut rx: mpsc::UnboundedReceiver, + Settings { + max_concurrency, + mut remaining, + reset_after, + mut total, + }: Settings, +) { + let (interval, reset_at) = { + let now = Instant::now(); + (sleep_until(now), sleep_until(now + reset_after)) + }; + tokio::pin!(interval, reset_at); + + let mut queues = iter::repeat_with(VecDeque::new) + .take(max_concurrency.into()) + .collect::>(); + + loop { + tokio::select! { + biased; + _ = &mut reset_at, if remaining != total => { + remaining = total; + } + message = rx.recv() => { + match message { + Some(Message::Request { shard, tx }) => { + if queues.is_empty() { + _ = tx.send(()); + } else { + queues[shard as usize % queues.len()] + .push_back((shard, tx)); + } + } + Some(Message::Update(update)) => { + let (max_concurrency, reset_after); + Settings { + max_concurrency, + remaining, + reset_after, + total, + } = update; + + if remaining != total { + reset_at.as_mut().reset(Instant::now() + reset_after); + } + + if max_concurrency as usize != queues.len() { + let unbalanced = queues.into_vec().into_iter().flatten(); + queues = iter::repeat_with(VecDeque::new) + .take(max_concurrency.into()) + .collect(); + for (shard, tx) in unbalanced { + queues[(shard % u32::from(max_concurrency)) as usize] + .push_back((shard, tx)); + } + } + } + None => break, + } + } + _ = &mut interval, if queues.iter().any(|queue| !queue.is_empty()) => { + let span = tracing::info_span!("bucket", capacity = %queues.len()); + let now = Instant::now(); + interval.as_mut().reset(now + IDENTIFY_DELAY); + + if remaining == total { + reset_at.as_mut().reset(now + LIMIT_PERIOD); + } + + for (ratelimit_key, queue) in queues.iter_mut().enumerate() { + if remaining == 0 { + let duration = reset_at.deadline().saturating_duration_since(now); + tracing::debug!(?duration, "sleeping until remaining count refills"); + (&mut reset_at).await; + remaining = total; + + break; + } + + while let Some((id, tx)) = queue.pop_front() { + let calculated_ratelimit_key = (id % u32::from(max_concurrency)) as usize; + debug_assert_eq!(ratelimit_key, calculated_ratelimit_key); + + if tx.send(()).is_err() { + continue; + } + tracing::debug!(parent: &span, ratelimit_key, "allowing shard {id}"); + remaining -= 1; + + // Reschedule behind shard for ordering correctness. + yield_now().await; + break; + } + } + } + } + } +} + +/// Memory based [`Queue`] implementation backed by an efficient background task. +/// +/// [`InMemoryQueue::update`] allows for dynamically changing the queue's +/// settings. +/// +/// Cloning the queue is cheap and just increments a reference counter. +/// +/// **Note:** A `max_concurrency` of `0` processes all requests instantly, +/// effectively disabling the queue. +#[derive(Clone, Debug)] +pub struct InMemoryQueue { + /// Sender to communicate with the background [task runner]. + /// + /// [task runner]: runner + tx: mpsc::UnboundedSender, +} + +impl InMemoryQueue { + /// Creates a new `InMemoryQueue` with custom settings. + /// + /// # Panics + /// + /// Panics if `total` < `remaining`. + pub fn new(max_concurrency: u8, remaining: u16, reset_after: Duration, total: u16) -> Self { + assert!(total >= remaining); + let (tx, rx) = mpsc::unbounded_channel(); + + tokio::spawn(runner( + rx, + Settings { + max_concurrency, + remaining, + reset_after, + total, + }, + )); + + Self { tx } + } + + /// Update the queue with new info from the [Get Gateway Bot] endpoint. + /// + /// May be regularly called as the bot joins/leaves guilds. + /// + /// # Example + /// + /// ```no_run + /// # use twilight_gateway_queue::InMemoryQueue; + /// # let rt = tokio::runtime::Builder::new_current_thread() + /// # .enable_time() + /// # .build() + /// # .unwrap(); + /// use std::time::Duration; + /// use twilight_http::Client; + /// + /// # rt.block_on(async { + /// # let queue = InMemoryQueue::default(); + /// # let token = String::new(); + /// let client = Client::new(token); + /// let session = client + /// .gateway() + /// .authed() + /// .await? + /// .model() + /// .await? + /// .session_start_limit; + /// queue.update( + /// session.max_concurrency, + /// session.remaining, + /// Duration::from_millis(session.reset_after), + /// session.total, + /// ); + /// # Ok::<(), Box>(()) + /// # }); + /// ``` + /// + /// # Panics + /// + /// Panics if `total` < `remaining`. + /// + /// [Get Gateway Bot]: https://discord.com/developers/docs/topics/gateway#get-gateway-bot + pub fn update(&self, max_concurrency: u8, remaining: u16, reset_after: Duration, total: u16) { + assert!(total >= remaining); + + self.tx + .send(Message::Update(Settings { + max_concurrency, + remaining, + reset_after, + total, + })) + .expect("receiver dropped after sender"); + } +} + +impl Default for InMemoryQueue { + /// Creates a new `InMemoryQueue` with Discord's default settings. + /// + /// Currently these are: + /// + /// * `max_concurrency`: 1 + /// * `remaining`: 1000 + /// * `reset_after`: [`LIMIT_PERIOD`] + /// * `total`: 1000. + fn default() -> Self { + Self::new(1, 1000, LIMIT_PERIOD, 1000) + } +} + +impl Queue for InMemoryQueue { + fn enqueue(&self, shard: u32) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(Message::Request { shard, tx }) + .expect("receiver dropped after sender"); + + rx + } +} + +#[cfg(test)] +mod tests { + use super::InMemoryQueue; + use crate::Queue; + use static_assertions::assert_impl_all; + use std::fmt::Debug; + + assert_impl_all!(InMemoryQueue: Clone, Debug, Default, Send, Sync, Queue); +} diff --git a/twilight-gateway-queue/src/large_bot_queue.rs b/twilight-gateway-queue/src/large_bot_queue.rs deleted file mode 100644 index 5b995838364..00000000000 --- a/twilight-gateway-queue/src/large_bot_queue.rs +++ /dev/null @@ -1,117 +0,0 @@ -use super::{day_limiter::DayLimiter, Queue}; -use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; -use tokio::{ - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot::{self, Sender}, - }, - time::sleep, -}; -use twilight_http::Client; - -/// Queue built for single-process groups of shards that require identifying via -/// [Sharding for Large Bots]. -/// -/// Usage with other processes will cause inconsistencies between each process's -/// ratelimit buckets. If using multiple processes for shard groups, then refer -/// to the [module-level] documentation. -/// -/// [Sharding for Large Bots]: https://discord.com/developers/docs/topics/gateway#sharding-for-very-large-bots -/// [module-level]: crate -#[derive(Debug)] -pub struct LargeBotQueue { - buckets: Vec>>, - limiter: DayLimiter, -} - -impl LargeBotQueue { - /// Create a new large bot queue. - /// - /// You must provide the number of buckets Discord requires your bot to - /// connect with. - /// - /// The number of buckets is provided via Discord as `max_concurrency` - /// which can be fetched with [`Client::gateway`]. - /// - /// # Panics - /// - /// Panics if retrieving the session start limints fails. - pub async fn new(buckets: usize, http: Arc) -> Self { - let mut queues = Vec::with_capacity(buckets); - for _ in 0..buckets { - let (tx, rx) = unbounded_channel(); - - tokio::spawn(waiter(rx)); - - queues.push(tx); - } - - let limiter = DayLimiter::new(http).await.expect( - "Getting the first session limits failed, \ - Is network connection available?", - ); - - // The level_enabled macro does not turn off with the dynamic - // tracing levels. It is made for the static_max_level_xxx features - // And will return false if you do not use those features of if - // You use the feature but then dynamically set a lower feature. - if tracing::level_enabled!(tracing::Level::INFO) { - let lock = limiter.0.lock().await; - - tracing::info!( - "{}/{} identifies used before next reset in {:.2?}", - lock.current, - lock.total, - lock.next_reset - ); - } - - Self { - buckets: queues, - limiter, - } - } -} - -async fn waiter(mut rx: UnboundedReceiver>) { - const DUR: Duration = Duration::from_secs(6); - while let Some(req) = rx.recv().await { - if let Err(source) = req.send(()) { - tracing::warn!("skipping, send failed with: {source:?}"); - } else { - sleep(DUR).await; - } - } -} - -impl Queue for LargeBotQueue { - /// Request to be able to identify with the gateway. This will place this - /// request behind all other requests, and the returned future will resolve - /// once the request has been completed. - fn request(&'_ self, shard_id: [u32; 2]) -> Pin + Send + '_>> { - #[allow(clippy::cast_possible_truncation)] - let bucket = (shard_id[0] % (self.buckets.len() as u32)) as usize; - let (tx, rx) = oneshot::channel(); - - Box::pin(async move { - self.limiter.get().await; - if let Err(source) = self.buckets[bucket].send(tx) { - tracing::warn!("skipping, send failed with: {source:?}"); - return; - } - - tracing::info!("waiting for allowance on shard {}", shard_id[0]); - - _ = rx.await; - }) - } -} - -#[cfg(test)] -mod tests { - use super::{LargeBotQueue, Queue}; - use static_assertions::assert_impl_all; - use std::fmt::Debug; - - assert_impl_all!(LargeBotQueue: Debug, Queue, Send, Sync); -} diff --git a/twilight-gateway-queue/src/lib.rs b/twilight-gateway-queue/src/lib.rs index 83a3549915a..d7b43d8375f 100644 --- a/twilight-gateway-queue/src/lib.rs +++ b/twilight-gateway-queue/src/lib.rs @@ -1,153 +1,56 @@ -#![cfg_attr(docsrs, feature(doc_auto_cfg))] #![doc = include_str!("../README.md")] #![warn( clippy::missing_const_for_fn, + clippy::missing_docs_in_private_items, clippy::pedantic, missing_docs, unsafe_code )] -#![allow( - clippy::module_name_repetitions, - clippy::must_use_candidate, - clippy::unnecessary_wraps -)] - -#[cfg(feature = "twilight-http")] -mod day_limiter; -#[cfg(feature = "twilight-http")] -mod large_bot_queue; - -#[cfg(feature = "twilight-http")] -pub use large_bot_queue::LargeBotQueue; - -use std::{ - fmt::Debug, - future::{self, Future}, - pin::Pin, - time::Duration, -}; -use tokio::{ - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot::{self, Sender}, - }, - time::sleep, -}; - -/// Queue for shards to request the ability to initialize new sessions with the -/// gateway. -/// -/// This will usually only need to be implemented when you have a multi-process -/// sharding setup. Refer to the [module-level] documentation for more -/// information. -/// -/// [module-level]: crate -pub trait Queue: Debug + Send + Sync { - /// A shard has requested the ability to request a session initialization - /// with the gateway. - /// - /// The returned future must resolve only when the shard can initiate the - /// session. - fn request<'a>(&'a self, shard_id: [u32; 2]) -> Pin + Send + 'a>>; -} - -/// A local, in-process implementation of a [`Queue`] which manages the -/// connection attempts of one or more shards. -/// -/// The queue will take incoming requests and then queue them, releasing one of -/// the requests every 6 seconds. The queue is necessary because there's a -/// ratelimit on how often shards can initiate sessions. -/// -/// Handling shard queues usually won't need to be manually handled due to the -/// gateway having built-in queueing when managing multiple shards. -/// -/// # When not to use this -/// -/// This queue implementation is "local", meaning it's intended to be used if -/// you manage shards only in this process. If you run shards in multiple -/// different processes (do you utilize microservices a lot?), then you **must -/// not** use this implementation. Shards across multiple processes may -/// create new sessions at the same time, which is bad. -/// -/// It should also not be used for very large sharding, for that the -/// [`LargeBotQueue`] can be used. -/// -/// If you can't use this, look into an alternative implementation of the -/// [`Queue`], such as the [`gateway-queue`] broker. -/// -/// [`gateway-queue`]: https://github.com/twilight-rs/gateway-queue -#[derive(Clone, Debug)] -pub struct LocalQueue(UnboundedSender>); - -impl Default for LocalQueue { - fn default() -> Self { - Self::new() - } -} - -impl LocalQueue { - /// Creates a new local queue. - pub fn new() -> Self { - let (tx, rx) = unbounded_channel(); - - tokio::spawn(waiter(rx)); - - Self(tx) - } -} +#![allow(clippy::module_name_repetitions, clippy::must_use_candidate)] -async fn waiter(mut rx: UnboundedReceiver>) { - const DUR: Duration = Duration::from_secs(6); - while let Some(req) = rx.recv().await { - if let Err(source) = req.send(()) { - tracing::warn!("skipping, send failed: {source:?}"); - } else { - sleep(DUR).await; - } - } -} +mod in_memory; -impl Queue for LocalQueue { - /// Request to be able to identify with the gateway. This will place this - /// request behind all other requests, and the returned future will resolve - /// once the request has been completed. - fn request(&'_ self, [id, total]: [u32; 2]) -> Pin + Send + '_>> { - Box::pin(async move { - let (tx, rx) = oneshot::channel(); +pub use in_memory::InMemoryQueue; - if let Err(source) = self.0.send(tx) { - tracing::warn!("skipping, send failed: {source:?}"); - return; - } +use std::fmt::Debug; +use tokio::{sync::oneshot, time::Duration}; - tracing::info!("shard {id}/{total} waiting for allowance"); +/// Period between buckets. +pub const IDENTIFY_DELAY: Duration = Duration::from_secs(5); - _ = rx.await; - }) - } -} +/// Duration from the first identify until the remaining count resets to the +/// total count. +pub const LIMIT_PERIOD: Duration = Duration::from_secs(60 * 60 * 24); -/// An implementation of [`Queue`] that instantly allows requests. +/// Abstraction for types processing gateway identify requests. /// -/// Useful when running behind a proxy gateway. Running without a -/// functional queue **will** get you ratelimited. -#[derive(Debug)] -pub struct NoOpQueue; +/// For convenience in twilight-gateway, implementors must also implement +/// [`Debug`]. +pub trait Queue: Debug { + /// Enqueue a shard with this ID. + /// + /// Send `()` to signal the shard to proceed. Note that shards may have + /// dropped the receiver prior. + /// + /// Closing the channel should causes the shard to requeue. + fn enqueue(&self, id: u32) -> oneshot::Receiver<()>; +} -impl Queue for NoOpQueue { - fn request(&'_ self, [_id, _total]: [u32; 2]) -> Pin + Send + '_>> { - Box::pin(future::ready(())) +impl Queue for &T +where + T: Queue, +{ + fn enqueue(&self, shard: u32) -> oneshot::Receiver<()> { + (**self).enqueue(shard) } } #[cfg(test)] mod tests { - use super::{LocalQueue, NoOpQueue, Queue}; + use super::Queue; use static_assertions::{assert_impl_all, assert_obj_safe}; use std::fmt::Debug; - assert_impl_all!(LocalQueue: Clone, Debug, Queue, Send, Sync); - assert_impl_all!(NoOpQueue: Debug, Queue, Send, Sync); - assert_impl_all!(dyn Queue: Debug, Send, Sync); + assert_impl_all!(dyn Queue: Debug); assert_obj_safe!(Queue); } diff --git a/twilight-gateway-queue/tests/common/mod.rs b/twilight-gateway-queue/tests/common/mod.rs new file mode 100644 index 00000000000..02e8a27bccb --- /dev/null +++ b/twilight-gateway-queue/tests/common/mod.rs @@ -0,0 +1,84 @@ +use tokio::time::{advance, Duration, Instant}; +use twilight_gateway_queue::{Queue, IDENTIFY_DELAY, LIMIT_PERIOD}; + +pub async fn same_id_is_serial(queue: impl Queue) { + let now = Instant::now(); + + let t1 = queue.enqueue(0); + let t2 = queue.enqueue(0); + + _ = t1.await; + _ = t2.await; + + assert!(now.elapsed() >= IDENTIFY_DELAY, "ran concurrently"); +} + +/// Requires a queue with `max_concurrency` > 1. +pub async fn different_id_is_parallel(queue: impl Queue) { + let now = Instant::now(); + + let mut t1 = queue.enqueue(1); + let t2 = queue.enqueue(0); + + tokio::select! { + biased; + _ = &mut t1 => panic!("not started in order"), + _ = t2 => { + _ = t1.await; + assert!(now.elapsed() < IDENTIFY_DELAY, "ran serially"); + } + } +} + +/// Requires a queue with `remaining` of 0. +pub async fn reset_after_refills(queue: impl Queue, reset_after: Duration) { + let now = Instant::now(); + + let t1 = queue.enqueue(0); + + _ = t1.await; + + assert!( + (now.elapsed().as_secs_f64() - reset_after.as_secs_f64()).abs() <= 1e-2, + "did not refill in time" + ); +} + +/// Requires a fresh queue with `remaining` of 1. +pub async fn reset_after_started(queue: impl Queue) { + advance(LIMIT_PERIOD / 2).await; + + let t1 = queue.enqueue(0); + let t2 = queue.enqueue(0); + + _ = t1.await; + + let now = Instant::now(); + + _ = t2.await; + + assert!( + (now.elapsed().as_secs_f64() - LIMIT_PERIOD.as_secs_f64()).abs() <= 1e-2, + "queue misstimed remaining refill" + ); +} + +/// Requires a queue with `max_concurrency` >= 4. +pub async fn multi_bucket(queue: impl Queue) { + let now = Instant::now(); + + let t1 = queue.enqueue(0); + let t2 = queue.enqueue(1); + let t3 = queue.enqueue(3); + let t4 = queue.enqueue(3); + + _ = t1.await; + _ = t2.await; + _ = t3.await; + + assert!(now.elapsed() < IDENTIFY_DELAY, "ran serially"); + + _ = t4.await; + + assert!(now.elapsed() >= IDENTIFY_DELAY, "ran concurrently"); +} diff --git a/twilight-gateway-queue/tests/in_memory.rs b/twilight-gateway-queue/tests/in_memory.rs new file mode 100644 index 00000000000..5714992fab9 --- /dev/null +++ b/twilight-gateway-queue/tests/in_memory.rs @@ -0,0 +1,53 @@ +mod common; + +use common::*; +use tokio::time::{Duration, Instant}; +use twilight_gateway_queue::{InMemoryQueue, Queue, IDENTIFY_DELAY}; + +#[tokio::test] +async fn disabled_is_instant() { + let queue = InMemoryQueue::new(0, 0, Duration::ZERO, 0); + let now = Instant::now(); + + let t1 = queue.enqueue(0); + let t2 = queue.enqueue(0); + + _ = t1.await; + _ = t2.await; + + assert!(now.elapsed() < IDENTIFY_DELAY, "did not run instantly"); +} + +#[tokio::test] +async fn update_fills_bucket() { + let queue = InMemoryQueue::new(1, 10, Duration::from_secs(60), 10); + let now = Instant::now(); + + // Background task not run due to single-threaded runtime. + let t1 = queue.enqueue(0); + let t2 = queue.enqueue(1); + queue.update(2, 10, Duration::from_secs(60), 10); + + _ = t1.await; + _ = t2.await; + + assert!(now.elapsed() < IDENTIFY_DELAY, "ran serially"); +} + +#[tokio::test(start_paused = true)] +async fn integration() { + let mut queue = InMemoryQueue::new(1, 1000, Duration::ZERO, 1000); + same_id_is_serial(&queue).await; + + queue = InMemoryQueue::new(2, 1000, Duration::ZERO, 1000); + different_id_is_parallel(&queue).await; + + queue = InMemoryQueue::new(1, 0, Duration::from_secs(60), 1); + reset_after_refills(&queue, Duration::from_secs(60)).await; + + queue = InMemoryQueue::new(1, 1, Duration::ZERO, 1); + reset_after_started(&queue).await; + + queue = InMemoryQueue::new(4, 1000, Duration::ZERO, 1000); + multi_bucket(queue).await; +} diff --git a/twilight-gateway/src/config.rs b/twilight-gateway/src/config.rs index 7db1acff5a8..8c14e6bba40 100644 --- a/twilight-gateway/src/config.rs +++ b/twilight-gateway/src/config.rs @@ -1,11 +1,14 @@ //! User configuration for shards. -use crate::{tls::TlsContainer, EventTypeFlags, Session}; +use crate::{ + queue::{InMemoryQueue, Queue}, + tls::TlsContainer, + EventTypeFlags, Session, +}; use std::{ fmt::{Debug, Formatter, Result as FmtResult}, sync::Arc, }; -use twilight_gateway_queue::{LocalQueue, Queue}; use twilight_model::gateway::{ payload::outgoing::{identify::IdentifyProperties, update_presence::UpdatePresencePayload}, Intents, @@ -56,7 +59,7 @@ pub struct Config { /// Gateway proxy URL. proxy_url: Option>, /// Queue in use by the shard. - queue: Arc, + queue: Arc, /// Whether [outgoing message] ratelimiting is enabled. /// /// [outgoing message]: crate::Shard::send @@ -132,7 +135,7 @@ impl Config { } /// Immutable reference to the queue in use by the shard. - pub fn queue(&self) -> &Arc { + pub fn queue(&self) -> &Arc { &self.queue } @@ -189,7 +192,7 @@ impl ConfigBuilder { large_threshold: 50, presence: None, proxy_url: None, - queue: Arc::new(LocalQueue::new()), + queue: Arc::new(InMemoryQueue::default()), ratelimit_messages: true, session: None, tls: TlsContainer::new().unwrap(), @@ -348,12 +351,11 @@ impl ConfigBuilder { /// Set the queue to use for queueing shard sessions. /// - /// Defaults to a [`LocalQueue`]. - /// - /// Refer to the [`queue`] module for more information. + /// Defaults to [`InMemoryQueue`] with its default settings. /// - /// [`queue`]: crate::queue - pub fn queue(mut self, queue: Arc) -> Self { + /// Note that [`InMemoryQueue`] with a `max_concurrency` of `0` effectively + /// turns itself into a no-op. + pub fn queue(mut self, queue: Arc) -> Self { self.inner.queue = queue; self diff --git a/twilight-gateway/src/shard.rs b/twilight-gateway/src/shard.rs index 57d28df06e3..6dfc5f143a6 100644 --- a/twilight-gateway/src/shard.rs +++ b/twilight-gateway/src/shard.rs @@ -30,8 +30,8 @@ //! which is then forwarded via [`Shard::close`]; or //! b. the interval for the shard to send the next heartbeat occurs, in which //! case [`Shard::heartbeat`] is called; or -//! c. the background identify queue task finishes, in which case -//! [`Shard::send`] is called with the identify payload; or +//! c. the identify receiver is ready, in which case [`Shard::send`] is called +//! with the identify payload; or //! d. the shard receives a command from the user over the [user channel], //! which is then forwarded via [`Shard::send`]; or //! e. the shard receives a message from Discord via the websocket connection. @@ -86,7 +86,7 @@ use std::{ task::{Context, Poll}, }; use tokio::{ - task::JoinHandle, + sync::oneshot, time::{self, Duration, Instant, Interval, MissedTickBehavior}, }; use tokio_tungstenite::tungstenite::{Error as TungsteniteError, Message as TungsteniteMessage}; @@ -367,8 +367,8 @@ pub struct Shard { heartbeat_interval_event: bool, /// ID of the shard. id: ShardId, - /// Identify queue background task handle. - identify_handle: Option>, + /// Identify queue receiver. + identify_rx: Option>, /// Zlib decompressor. #[cfg(any(feature = "zlib-stock", feature = "zlib-simd"))] inflater: Inflater, @@ -416,7 +416,7 @@ impl Shard { heartbeat_interval: None, heartbeat_interval_event: false, id: shard_id, - identify_handle: None, + identify_rx: None, #[cfg(any(feature = "zlib-stock", feature = "zlib-simd"))] inflater: Inflater::new(), next_action: None, @@ -555,7 +555,7 @@ impl Shard { /// /// Returns a [`ReceiveMessageErrorType::SendingMessage`] error type if the /// shard failed to send a message to the gateway, such as a heartbeat. - #[tracing::instrument(fields(id = %self.id()), name = "shard", skip(self))] + #[tracing::instrument(fields(id = %self.id), name = "shard", skip(self))] pub async fn next_message(&mut self) -> Result { /// Actions the shard might take. enum Action { @@ -640,13 +640,18 @@ impl Shard { .as_mut() .map_or(false, |ratelimiter| ratelimiter.poll_ready(cx).is_pending()); - if !ratelimited - && self - .identify_handle + if !ratelimited { + if let Some(Poll::Ready(canceled)) = self + .identify_rx .as_mut() - .map_or(false, |handle| Pin::new(handle).poll(cx).is_ready()) - { - return Poll::Ready(Action::Identify); + .map(|rx| Pin::new(rx).poll(cx).map(|r| r.is_err())) + { + if !canceled { + self.identify_rx = None; + return Poll::Ready(Action::Identify); + } + self.identify_rx = Some(self.config.queue().enqueue(self.id.number())); + } } if !ratelimited && self.status.is_identified() { @@ -755,8 +760,6 @@ impl Shard { continue; } Action::Identify => { - self.identify_handle = None; - tracing::debug!("sending identify"); let identify = Identify::new(IdentifyInfo { compress: false, @@ -768,7 +771,7 @@ impl Shard { .identify_properties() .cloned() .unwrap_or_else(default_identify_properties), - shard: Some(self.id()), + shard: Some(self.id), token: self.config.token().to_owned(), }); let json = @@ -1002,13 +1005,14 @@ impl Shard { // May not send any additional WebSocket messages. self.heartbeat_interval = None; self.ratelimiter = None; + // Abort identify. + self.identify_rx = None; // Not resuming, drop session and resume URL. // https://discord.com/developers/docs/topics/gateway#initiating-a-disconnect if matches!(initiator, CloseInitiator::Shard(1000 | 1001)) { self.resume_gateway_url = None; self.session = None; } - // Avoid setting the status to FatallyClosed should it match for Shard initiated disconnect. self.status = match initiator { CloseInitiator::Gateway(close_code) => ConnectionStatus::from_close_code(close_code), _ => ConnectionStatus::Disconnected { @@ -1130,24 +1134,7 @@ impl Shard { if self.session.is_some() { self.next_action = Some(NextAction::Resume); } else { - // Can not use `MessageSender` since it is only polled after - // the shard is identified. - - // If the JoinHandle is finished, or there is none (def: true), we create a new one - if self - .identify_handle - .as_ref() - .map_or(true, JoinHandle::is_finished) - { - self.identify_handle = Some(tokio::spawn({ - let shard_id = self.id(); - let queue = self.config().queue().clone(); - - async move { - queue.request([shard_id.number(), shard_id.total()]).await; - } - })); - } + self.identify_rx = Some(self.config.queue().enqueue(self.id.number())); } } Some(OpCode::InvalidSession) => { diff --git a/twilight-gateway/src/stream.rs b/twilight-gateway/src/stream.rs index 229c0e52962..4ba58a654d9 100644 --- a/twilight-gateway/src/stream.rs +++ b/twilight-gateway/src/stream.rs @@ -27,7 +27,6 @@ //! //! See the [gateway-parallel] example for how to implement this. //! -//! [`ConfigBuilder::queue`]: crate::ConfigBuilder::queue //! [gateway-parallel]: https://github.com/twilight-rs/twilight/blob/main/examples/gateway-parallel.rs //! [session queue]: crate::queue @@ -419,7 +418,7 @@ pub fn create_bucket Config>( /// /// ```no_run /// use std::{collections::HashMap, env, sync::Arc}; -/// use twilight_gateway::{queue::LocalQueue, stream, Config, Intents}; +/// use twilight_gateway::{stream, Config, Intents}; /// /// # #[tokio::main] /// # async fn main() -> Result<(), Box> {