From dc7962dc5c795fadebeaf0eb0d2a0ce7a7226d7a Mon Sep 17 00:00:00 2001 From: Tim Vilgot Mikael Fredenberg Date: Sun, 25 Jun 2023 13:29:43 +0200 Subject: [PATCH] refactor(gateway): rework `queue`, remove `Queue` trait --- .github/labeler.yml | 5 - Cargo.toml | 1 - README.md | 7 - book/src/SUMMARY.md | 1 - .../section_5_gateway_queue.md | 15 -- book/src/chapter_2_multi-serviced_approach.md | 21 +- twilight-gateway-queue/CHANGELOG.md | 227 ------------------ twilight-gateway-queue/Cargo.toml | 30 --- twilight-gateway-queue/README.md | 42 ---- twilight-gateway-queue/src/day_limiter.rs | 124 ---------- twilight-gateway-queue/src/large_bot_queue.rs | 113 --------- twilight-gateway-queue/src/lib.rs | 153 ------------ twilight-gateway/Cargo.toml | 3 +- twilight-gateway/src/config.rs | 22 +- twilight-gateway/src/lib.rs | 4 +- twilight-gateway/src/queue.rs | 187 +++++++++++++++ twilight-gateway/src/shard.rs | 58 ++--- twilight-gateway/src/stream.rs | 7 +- twilight/Cargo.toml | 1 - 19 files changed, 227 insertions(+), 794 deletions(-) delete mode 100644 book/src/chapter_1_crates/section_7_first_party/section_5_gateway_queue.md delete mode 100644 twilight-gateway-queue/CHANGELOG.md delete mode 100644 twilight-gateway-queue/Cargo.toml delete mode 100644 twilight-gateway-queue/README.md delete mode 100644 twilight-gateway-queue/src/day_limiter.rs delete mode 100644 twilight-gateway-queue/src/large_bot_queue.rs delete mode 100644 twilight-gateway-queue/src/lib.rs create mode 100644 twilight-gateway/src/queue.rs diff --git a/.github/labeler.yml b/.github/labeler.yml index d139fc43fe7..df47f5ccc08 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -20,11 +20,6 @@ labels: matcher: files: "twilight-gateway/**/*" - - label: "c-gateway-queue" - sync: true - matcher: - files: "twilight-gateway-queue/**/*" - - label: "c-http" sync: true matcher: diff --git a/Cargo.toml b/Cargo.toml index 68329409a06..f8ff8ed694b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,6 @@ members = [ "twilight", "twilight-cache-inmemory", "twilight-gateway", - "twilight-gateway-queue", "twilight-http", "twilight-http-ratelimiting", "twilight-lavalink", diff --git a/README.md b/README.md index e24e76ec182..23bcf1f299c 100644 --- a/README.md +++ b/README.md @@ -102,12 +102,6 @@ easier; - A calculator to calculate the permissions of a member in a guild or channel. -### [`twilight-gateway-queue`] - -A trait and some implementations that are used by the gateway to ratelimit -identify calls. Developers should prefer to use the re-exports of these -crates through the gateway. - ## Examples The following example is a template for bootstrapping a new bot using @@ -212,7 +206,6 @@ All first-party crates are licensed under [ISC][LICENSE.md] [logo]: https://raw.githubusercontent.com/twilight-rs/twilight/main/logo.png [rust badge]: https://img.shields.io/badge/rust-1.67+-93450a.svg?style=for-the-badge&logo=rust [`twilight-cache-inmemory`]: https://twilight.rs/chapter_1_crates/section_4_cache_inmemory.html -[`twilight-gateway-queue`]: https://twilight.rs/chapter_1_crates/section_7_first_party/section_5_gateway_queue.html [`twilight-gateway`]: https://twilight.rs/chapter_1_crates/section_3_gateway.html [`twilight-http`]: https://twilight.rs/chapter_1_crates/section_2_http.html [`twilight-lavalink`]: https://twilight.rs/chapter_1_crates/section_7_first_party/section_3_lavalink.html diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index f9631ec58fb..e3e726dc1b0 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -12,7 +12,6 @@ - [Mention](./chapter_1_crates/section_7_first_party/section_2_mention.md) - [Lavalink](./chapter_1_crates/section_7_first_party/section_3_lavalink.md) - [Util](./chapter_1_crates/section_7_first_party/section_4_util.md) - - [Gateway Queue](./chapter_1_crates/section_7_first_party/section_5_gateway_queue.md) - [Third-party](./chapter_1_crates/section_8_third_party.md) - [Multi-Serviced Approach](./chapter_2_multi-serviced_approach.md) - [Bots Using Twilight](./chapter_3_bots_using_twilight.md) diff --git a/book/src/chapter_1_crates/section_7_first_party/section_5_gateway_queue.md b/book/src/chapter_1_crates/section_7_first_party/section_5_gateway_queue.md deleted file mode 100644 index 942877f2d51..00000000000 --- a/book/src/chapter_1_crates/section_7_first_party/section_5_gateway_queue.md +++ /dev/null @@ -1,15 +0,0 @@ -# Gateway queue - -`twilight-gateway-queue` is a trait and some implementations that are used by -the [gateway] to ratelimit `identify` calls. Developers should prefer to use the -re-exports of these crates through the [gateway]. - -## Links - -*source*: - -*docs*: - -*crates.io*: - -[gateway]: ../section_3_gateway.html diff --git a/book/src/chapter_2_multi-serviced_approach.md b/book/src/chapter_2_multi-serviced_approach.md index dcd1ff39a67..daad4449bbf 100644 --- a/book/src/chapter_2_multi-serviced_approach.md +++ b/book/src/chapter_2_multi-serviced_approach.md @@ -8,25 +8,13 @@ you have a small bot and just want to get it going in a monolithic application, then it's also a good choice. It's easy to split off parts of your application into other services as your application grows. -## Gateway groups +## Gateway One of the popular design choices when creating a multi-serviced application is to have a service that only connects shards to the gateway and sends the events -to a broker to be processed. As bots grow into hundreds or thousands of shards, -multiple instances of the application can be created and groups of shards can be -managed by each. Twilight is an excellent choice for this use case: you can -receive either events that come in in a loop and send the payloads to the -appropriate broker stream, or you can loop over received payloads' bytes to send -off. - -## Gateway session ratelimiting - -If multiple shard groups are used, then they need to be queued and their session -initialization ratelimited. The Gateway includes a Queue trait which can be -implemented; the gateway will submit a request to the queue before starting a -session. Twilight comes with a queue that supports sharding and Large Bot -sharding, but when multiple shard groups are in use then a custom queue will -need to be implemented. Refer to [gateway-queue] for an example of this. +to a broker to be processed. Twilight is an excellent choice for this use case: +just receive and send the payloads to the appropriate broker stream. Twilight +shards need only partially deserialize payloads to function. ## HTTP proxy ratelimiting @@ -42,5 +30,4 @@ it's flexible enough to do anything you need, and if you find something it can't then we'll fix it. The goal is to remove all limitations on designs and allow you to do what you need. -[gateway-queue]: https://github.com/twilight-rs/gateway-queue [http-proxy]: https://github.com/twilight-rs/http-proxy diff --git a/twilight-gateway-queue/CHANGELOG.md b/twilight-gateway-queue/CHANGELOG.md deleted file mode 100644 index 6187e850093..00000000000 --- a/twilight-gateway-queue/CHANGELOG.md +++ /dev/null @@ -1,227 +0,0 @@ -# Changelog - -## [0.15.2] - 2023-04-27 - -### Bug Fixes - -- fix update of DayLimiter's next_reset field ([#2175](https://github.com/twilight-rs/twilight/issues/2175)) -- Fix clippy up to 1.69.0 ([#2198](https://github.com/twilight-rs/twilight/issues/2198)) - -## [0.15.1] - 2023-02-26 - -### Refactor - -- change deny lints to warn ([#2144](https://github.com/twilight-rs/twilight/issues/2144)) - -## [0.15.0] - 2023-02-05 - -This major version bump of the Gateway Queue is done to match all of the other -crates in the ecosystem receiving a major version bump. There are no changes. - -## [0.14.1] - 2023-02-05 - -### Bug Fixes - -- skip sleep on failed messages ([#2113](https://github.com/twilight-rs/twilight/issues/2113)) - -## [0.14.0] - 2022-11-14 - -MSRV has been bumped to 1.64 ([#1897] - [@vilgotf]). - -### Refactor - -[**breaking**] drop TLS features ([#1842] - [@vilgotf]). `LargeBotQueue` -requires a `Client` from twilight-http, meaning that users must also depend on -twilight-http to use it. TLS requirements should therefore only be specified in -twilight-http, with twilight-gateway-queue inheriting said requirements. - -[#1842]: https://github.com/twilight-rs/twilight/issues/1842 -[#1897]: https://github.com/twilight-rs/twilight/issues/1897 - -## [0.13.1] - 2022-09-11 - -### Documentation - -- clarify `buckets` for large bot queue ([#1895](https://github.com/twilight-rs/twilight/issues/1895)) - -## [0.11.1] - 2022-07-07 - -### Documentation - -- fix sharding hyperlink ([#1741](https://github.com/twilight-rs/twilight/issues/1741)) - -### Refactor - -- standardize clippy lints ([#1779](https://github.com/twilight-rs/twilight/issues/1779)) - -Changelog for `twilight-gateway-queue`. - -## [0.11.0] - 2022-05-15 - -MSRV has been bumped to 1.60. - -### Changes - -`tracing` is no longer an optional dependency and is always enabled -([#1684], [#1730] - [@vilgotf], [@zeylahellyer]). - -[#1730]: https://github.com/twilight-rs/twilight/pull/1730 -[#1684]: https://github.com/twilight-rs/twilight/pull/1684 - -## [0.10.1] - 2022-04-15 - -### Changes - -Link to `tracing` in the README ([#1652] - [@zeylahellyer]). - -[#1652]: https://github.com/twilight-rs/twilight/pull/1652 - -## [0.10.0] - 2022-03-10 - -### Changes - -`twilight-http` is now an optional feature, and enabled by default ([#1489] - -[@Gelbpunkt]). - -[#1489]: https://github.com/twilight-rs/twilight/pull/1489 - -## [0.9.1] - 2022-02-12 - -### Additions - -Add `NoOpQueue`, which does not ratelimit any requests ([#1490] - [@Gelbpunkt]). - -[#1490]: https://github.com/twilight-rs/twilight/pull/1490 - -## [0.9.0] - 2022-01-22 - -### Changes - -The `rustls` feature has been removed ([#1314] - [@Gelbpunkt]). Users must -manually select one of `rustls-native-roots` or `rustls-webpki-roots`. - -The MSRV has been updated to 1.57 ([#1402] - [@zeylahellyer]). - -The Rust edition has been updated to 2021 ([#1412] - [@vilgotf]). - -[#1314]: https://github.com/twilight-rs/twilight/pull/1314 -[#1402]: https://github.com/twilight-rs/twilight/pull/1402 -[#1412]: https://github.com/twilight-rs/twilight/pull/1412 - -## [0.8.1] - 2022-01-21 - -### Changes - -Remove two unneeded calls to `clone` ([#1440] - [@vilgotf]). - -[#1440]: https://github.com/twilight-rs/twilight/pull/1440 - -## [0.8.0] - 2021-12-03 - -### Changes - -`tracing` is now an optional feature, and enabled by default ([#1203] - -[@Gelbpunkt]). - -[#1203]: https://github.com/twilight-rs/twilight/pull/1203 - -## [0.7.0] - 2021-10-21 - -### Changes - -The MSRV has been updated to 1.53 ([#1161] - [@7596ff]). - -[#1161]: https://github.com/twilight-rs/twilight/pull/1147 - -## [0.6.0] - 2021-07-31 - -This major version bump of the Gateway Queue is done to match all of the other -crates in the ecosystem receiving a major version bump. There are no changes. - -## [0.5.1] - 2021-07-23 - -### Changes - -`#![deny(unsafe_code)]` has been added, ensuring no unsafe code exists in the -crate ([#1042] - [@zeylahellyer]). - -[#1042]: https://github.com/twilight-rs/twilight/pull/1042 - -## [0.5.0] - 2021-06-13 - -This major version bump of the Gateway Queue is done to match all of the other -crates in the ecosystem receiving a major version bump. There are no changes. - -## [0.4.0] - 2021-05-12 - -### Upgrade Path - -The MSRV is now Rust 1.49. - -Errors are no longer enums and don't expose their concrete underlying error -source. You can access the underlying error via the implemented -`std::error::Error::source` method or the `into_parts` or `into_source` methods -on each error struct, which will return a boxed `std::error::Error`. To access -the reason for the error use the `kind` or `into_parts` method on error structs; -the returned error type is an enum with variants for each potential reason the -error occurred. - -### Enhancements - -The `futures-channel` and `futures-util` dependencies have been removed -([#785] - [@Gelbpunkt]). - -[#785]: https://github.com/twilight-rs/twilight/pull/785 - -## [0.3.0] - 2021-01-08 - -This major version bump of the Gateway Queue is done to match all of the other -crates in the ecosystem receiving a major version bump. There are no changes. - -## [0.2.1] - 2020-11-11 - -### Enhancements - -Remove broken cross-crate links ([#595] - [@vivian]). - -## [0.2.0] - 2020-10-30 - -This major version bump of the Gateway Queue crate is done to match all of the -other crates in the ecosystem receiving a major version bump. There are no -changes. - -## [0.2.0-beta.0] - 2020-10-10 - -This major version bump of the Gateway Queue crate is done to match all of the -other crates in the ecosystem receiving a major version bump. There are no -changes. - -## [0.1.0] - 2020-10-07 - -Initial release. - -[@7596ff]: https://github.com/7596ff -[@Gelbpunkt]: https://github.com/Gelbpunkt -[@vilgotf]: https://github.com/vilgotf -[@vivian]: https://github.com/vivian -[@zeylahellyer]: https://github.com/zeylahellyer - -[#595]: https://github.com/twilight-rs/twilight/pull/595 - -[0.11.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.11.0 -[0.10.1]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.10.1 -[0.10.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.10.0 -[0.9.1]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.9.1 -[0.9.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.9.0 -[0.8.1]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.8.1 -[0.8.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.8.0 -[0.7.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.7.0 -[0.6.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.6.0 -[0.5.1]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.5.1 -[0.5.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.5.0 -[0.4.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-0.4.0 -[0.3.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-v0.3.0 -[0.2.1]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-v0.2.1 -[0.2.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-v0.2.0 -[0.2.0-beta.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-v0.2.0-beta.0 -[0.1.0]: https://github.com/twilight-rs/twilight/releases/tag/gateway-queue-v0.1.0 diff --git a/twilight-gateway-queue/Cargo.toml b/twilight-gateway-queue/Cargo.toml deleted file mode 100644 index 513d60f7023..00000000000 --- a/twilight-gateway-queue/Cargo.toml +++ /dev/null @@ -1,30 +0,0 @@ -[package] -authors.workspace = true -description = "Discord Gateway connection queue implementation for the Twilight ecosystem." -edition.workspace = true -homepage = "https://twilight.rs/" -include.workspace = true -keywords = ["discord", "discord-api", "twilight"] -license.workspace = true -name = "twilight-gateway-queue" -publish = true -repository.workspace = true -rust-version.workspace = true -version = "0.15.2" - -[dependencies] -tokio = { default-features = false, features = ["rt", "sync", "time"], version = "1.0" } -tracing = { default-features = false, features = ["std", "attributes"], version = "0.1" } - -# Optional dependencies. -twilight-http = { default-features = false, optional = true, path = "../twilight-http", version = "0.15.2" } - -[dev-dependencies] -static_assertions = { default-features = false, version = "1" } - -[features] -default = ["twilight-http"] - -[package.metadata.docs.rs] -all-features = true -rustdoc-args = ["--cfg", "docsrs"] diff --git a/twilight-gateway-queue/README.md b/twilight-gateway-queue/README.md deleted file mode 100644 index edd8fc2bb6c..00000000000 --- a/twilight-gateway-queue/README.md +++ /dev/null @@ -1,42 +0,0 @@ -# twilight-gateway-queue - -Ratelimiting functionality for queueing new gateway sessions. - -The gateway ratelimits how often clients can initialize new sessions. -Instances of a queue are given to shards so that they can request to -initialize a session. - -Queue implementations must point to the same broker so that all shards -across all shard groups, processes, and other forms of multi-serviced -applications, can work together and use the same ratelimiting source. That is, -if two shard groups are in two different processes, then the the two processes -must use some unified form of ratelimiting: this can either mean using IPC to -communicate ratelimiting or a broker. - -## Provided queues - -Most users only need the [`LocalQueue`]: it's a single-process queue for -smaller bots. Larger bots need the [`LargeBotQueue`], which supports -single-process [Sharding for Large Bots] through the use of bucket -releasing. - -By default, the gateway's `stream` module and `Shard`s use the [`LocalQueue`]. -This can be overridden via the `ShardBuilder::queue` configuration method. - -## Advanced use cases - -Large bots, and smaller bots out of design, may need to implement their own -queue. The most common reason to need this is if you have shard groups in -multiple processes. A broker to manage ratelimiting across shard groups is -required, so a [`Queue`] trait is provided that shards can use to make requests -to create sessions. - -## Features - -### Twilight-HTTP - -The `twilight-http` feature brings in support for [`LargeBotQueue`]. - -This is enabled by default. - -[Sharding for Large Bots]: https://discord.com/developers/docs/topics/gateway#sharding-for-large-bots diff --git a/twilight-gateway-queue/src/day_limiter.rs b/twilight-gateway-queue/src/day_limiter.rs deleted file mode 100644 index f9fd8f11e02..00000000000 --- a/twilight-gateway-queue/src/day_limiter.rs +++ /dev/null @@ -1,124 +0,0 @@ -use std::{ - error::Error, - fmt::{Display, Formatter, Result as FmtResult}, - sync::Arc, - time::Duration, -}; -use tokio::{ - sync::Mutex, - time::{self, Instant}, -}; -use twilight_http::Client; - -/// Creating a day limiter queue failed. -#[derive(Debug)] -pub struct DayLimiterError { - kind: DayLimiterErrorType, - source: Option>, -} - -impl Display for DayLimiterError { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - match &self.kind { - DayLimiterErrorType::RetrievingSessionAvailability { .. } => { - f.write_str("retrieving the bot's gateway session availability failed") - } - } - } -} - -impl Error for DayLimiterError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - self.source - .as_ref() - .map(|source| &**source as &(dyn Error + 'static)) - } -} - -/// Type of [`DayLimiterError`] that occurred. -#[derive(Debug)] -#[non_exhaustive] -pub enum DayLimiterErrorType { - /// Retrieving the bot's available gateway session initiation information - /// via the HTTP API failed. - RetrievingSessionAvailability, -} - -#[derive(Debug)] -pub(crate) struct DayLimiter(pub(crate) Mutex); - -#[derive(Debug)] -pub(crate) struct DayLimiterInner { - pub http: Arc, - pub last_check: Instant, - pub next_reset: Duration, - pub total: u64, - pub current: u64, -} - -impl DayLimiter { - pub async fn new(http: Arc) -> Result { - let info = http - .gateway() - .authed() - .await - .map_err(|source| DayLimiterError { - kind: DayLimiterErrorType::RetrievingSessionAvailability, - source: Some(Box::new(source)), - })? - .model() - .await - .map_err(|source| DayLimiterError { - kind: DayLimiterErrorType::RetrievingSessionAvailability, - source: Some(Box::new(source)), - })?; - - let last_check = Instant::now(); - - let next_reset = Duration::from_millis(info.session_start_limit.reset_after); - let total = info.session_start_limit.total; - let remaining = info.session_start_limit.remaining; - debug_assert!(total >= remaining); - let current = total - remaining; - Ok(DayLimiter(Mutex::new(DayLimiterInner { - http, - last_check, - next_reset, - total: info.session_start_limit.total, - current, - }))) - } - - pub async fn get(&self) { - let mut lock = self.0.lock().await; - if lock.current < lock.total { - lock.current += 1; - } else { - let wait = lock.last_check + lock.next_reset; - time::sleep_until(wait).await; - if let Ok(res) = lock.http.gateway().authed().await { - if let Ok(info) = res.model().await { - let last_check = Instant::now(); - let next_reset = Duration::from_millis(info.session_start_limit.reset_after); - - tracing::info!("next session start limit reset in: {next_reset:.2?}"); - - let total = info.session_start_limit.total; - let remaining = info.session_start_limit.remaining; - assert!(total >= remaining); - let current = total - remaining; - lock.last_check = last_check; - lock.next_reset = next_reset; - lock.total = total; - lock.current = current + 1; - - return; - } - } - - tracing::warn!( - "unable to get new session limits, skipping (this may cause bad things)" - ); - } - } -} diff --git a/twilight-gateway-queue/src/large_bot_queue.rs b/twilight-gateway-queue/src/large_bot_queue.rs deleted file mode 100644 index 1cc66952b05..00000000000 --- a/twilight-gateway-queue/src/large_bot_queue.rs +++ /dev/null @@ -1,113 +0,0 @@ -use super::{day_limiter::DayLimiter, Queue}; -use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; -use tokio::{ - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot::{self, Sender}, - }, - time::sleep, -}; -use twilight_http::Client; - -/// Queue built for single-process groups of shards that require identifying via -/// [Sharding for Large Bots]. -/// -/// Usage with other processes will cause inconsistencies between each process's -/// ratelimit buckets. If using multiple processes for shard groups, then refer -/// to the [module-level] documentation. -/// -/// [Sharding for Large Bots]: https://discord.com/developers/docs/topics/gateway#sharding-for-very-large-bots -/// [module-level]: crate -#[derive(Debug)] -pub struct LargeBotQueue { - buckets: Vec>>, - limiter: DayLimiter, -} - -impl LargeBotQueue { - /// Create a new large bot queue. - /// - /// You must provide the number of buckets Discord requires your bot to - /// connect with. - /// - /// The number of buckets is provided via Discord as `max_concurrency` - /// which can be fetched with [`Client::gateway`]. - pub async fn new(buckets: usize, http: Arc) -> Self { - let mut queues = Vec::with_capacity(buckets); - for _ in 0..buckets { - let (tx, rx) = unbounded_channel(); - - tokio::spawn(waiter(rx)); - - queues.push(tx); - } - - let limiter = DayLimiter::new(http).await.expect( - "Getting the first session limits failed, \ - Is network connection available?", - ); - - // The level_enabled macro does not turn off with the dynamic - // tracing levels. It is made for the static_max_level_xxx features - // And will return false if you do not use those features of if - // You use the feature but then dynamically set a lower feature. - if tracing::level_enabled!(tracing::Level::INFO) { - let lock = limiter.0.lock().await; - - tracing::info!( - "{}/{} identifies used before next reset in {:.2?}", - lock.current, - lock.total, - lock.next_reset - ); - } - - Self { - buckets: queues, - limiter, - } - } -} - -async fn waiter(mut rx: UnboundedReceiver>) { - const DUR: Duration = Duration::from_secs(6); - while let Some(req) = rx.recv().await { - if let Err(source) = req.send(()) { - tracing::warn!("skipping, send failed with: {source:?}"); - } else { - sleep(DUR).await; - } - } -} - -impl Queue for LargeBotQueue { - /// Request to be able to identify with the gateway. This will place this - /// request behind all other requests, and the returned future will resolve - /// once the request has been completed. - fn request(&'_ self, shard_id: [u64; 2]) -> Pin + Send + '_>> { - #[allow(clippy::cast_possible_truncation)] - let bucket = (shard_id[0] % (self.buckets.len() as u64)) as usize; - let (tx, rx) = oneshot::channel(); - - Box::pin(async move { - self.limiter.get().await; - if let Err(source) = self.buckets[bucket].send(tx) { - tracing::warn!("skipping, send failed with: {source:?}"); - return; - } - - tracing::info!("waiting for allowance on shard {}", shard_id[0]); - - _ = rx.await; - }) - } -} - -#[cfg(test)] -mod tests { - use super::{LargeBotQueue, Queue}; - use static_assertions::assert_impl_all; - use std::fmt::Debug; - - assert_impl_all!(LargeBotQueue: Debug, Queue, Send, Sync); -} diff --git a/twilight-gateway-queue/src/lib.rs b/twilight-gateway-queue/src/lib.rs deleted file mode 100644 index 65db5f79f46..00000000000 --- a/twilight-gateway-queue/src/lib.rs +++ /dev/null @@ -1,153 +0,0 @@ -#![cfg_attr(docsrs, feature(doc_auto_cfg))] -#![doc = include_str!("../README.md")] -#![warn( - clippy::missing_const_for_fn, - clippy::pedantic, - missing_docs, - unsafe_code -)] -#![allow( - clippy::module_name_repetitions, - clippy::must_use_candidate, - clippy::unnecessary_wraps -)] - -#[cfg(feature = "twilight-http")] -mod day_limiter; -#[cfg(feature = "twilight-http")] -mod large_bot_queue; - -#[cfg(feature = "twilight-http")] -pub use large_bot_queue::LargeBotQueue; - -use std::{ - fmt::Debug, - future::{self, Future}, - pin::Pin, - time::Duration, -}; -use tokio::{ - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot::{self, Sender}, - }, - time::sleep, -}; - -/// Queue for shards to request the ability to initialize new sessions with the -/// gateway. -/// -/// This will usually only need to be implemented when you have a multi-process -/// sharding setup. Refer to the [module-level] documentation for more -/// information. -/// -/// [module-level]: crate -pub trait Queue: Debug + Send + Sync { - /// A shard has requested the ability to request a session initialization - /// with the gateway. - /// - /// The returned future must resolve only when the shard can initiate the - /// session. - fn request<'a>(&'a self, shard_id: [u64; 2]) -> Pin + Send + 'a>>; -} - -/// A local, in-process implementation of a [`Queue`] which manages the -/// connection attempts of one or more shards. -/// -/// The queue will take incoming requests and then queue them, releasing one of -/// the requests every 6 seconds. The queue is necessary because there's a -/// ratelimit on how often shards can initiate sessions. -/// -/// Handling shard queues usually won't need to be manually handled due to the -/// gateway having built-in queueing when managing multiple shards. -/// -/// # When not to use this -/// -/// This queue implementation is "local", meaning it's intended to be used if -/// you manage shards only in this process. If you run shards in multiple -/// different processes (do you utilize microservices a lot?), then you **must -/// not** use this implementation. Shards across multiple processes may -/// create new sessions at the same time, which is bad. -/// -/// It should also not be used for very large sharding, for that the -/// [`LargeBotQueue`] can be used. -/// -/// If you can't use this, look into an alternative implementation of the -/// [`Queue`], such as the [`gateway-queue`] broker. -/// -/// [`gateway-queue`]: https://github.com/twilight-rs/gateway-queue -#[derive(Clone, Debug)] -pub struct LocalQueue(UnboundedSender>); - -impl Default for LocalQueue { - fn default() -> Self { - Self::new() - } -} - -impl LocalQueue { - /// Creates a new local queue. - pub fn new() -> Self { - let (tx, rx) = unbounded_channel(); - - tokio::spawn(waiter(rx)); - - Self(tx) - } -} - -async fn waiter(mut rx: UnboundedReceiver>) { - const DUR: Duration = Duration::from_secs(6); - while let Some(req) = rx.recv().await { - if let Err(source) = req.send(()) { - tracing::warn!("skipping, send failed: {source:?}"); - } else { - sleep(DUR).await; - } - } -} - -impl Queue for LocalQueue { - /// Request to be able to identify with the gateway. This will place this - /// request behind all other requests, and the returned future will resolve - /// once the request has been completed. - fn request(&'_ self, [id, total]: [u64; 2]) -> Pin + Send + '_>> { - Box::pin(async move { - let (tx, rx) = oneshot::channel(); - - if let Err(source) = self.0.send(tx) { - tracing::warn!("skipping, send failed: {source:?}"); - return; - } - - tracing::info!("shard {id}/{total} waiting for allowance"); - - _ = rx.await; - }) - } -} - -/// An implementation of [`Queue`] that instantly allows requests. -/// -/// Useful when running behind a proxy gateway. Running without a -/// functional queue **will** get you ratelimited. -#[derive(Debug)] -pub struct NoOpQueue; - -impl Queue for NoOpQueue { - fn request(&'_ self, [_id, _total]: [u64; 2]) -> Pin + Send + '_>> { - Box::pin(future::ready(())) - } -} - -#[cfg(test)] -mod tests { - use super::{LocalQueue, NoOpQueue, Queue}; - use static_assertions::{assert_impl_all, assert_obj_safe}; - use std::fmt::Debug; - - assert_impl_all!(LocalQueue: Clone, Debug, Queue, Send, Sync); - assert_impl_all!(NoOpQueue: Debug, Queue, Send, Sync); - assert_impl_all!(dyn Queue: Debug, Send, Sync); - assert_obj_safe!(Queue); -} diff --git a/twilight-gateway/Cargo.toml b/twilight-gateway/Cargo.toml index 8c2b81aadcf..425268f397a 100644 --- a/twilight-gateway/Cargo.toml +++ b/twilight-gateway/Cargo.toml @@ -19,10 +19,9 @@ futures-util = { default-features = false, features = ["std"], version = "0.3" } rand = { default-features = false, features = ["std", "std_rng"], version = "0.8" } serde = { default-features = false, features = ["derive"], version = "1" } serde_json = { default-features = false, features = ["std"], version = "1" } -tokio = { default-features = false, features = ["net", "rt", "sync", "time"], version = "1.8" } +tokio = { default-features = false, features = ["macros", "net", "rt", "sync", "time"], version = "1.8" } tokio-tungstenite = { default-features = false, features = ["connect"], version = "0.18" } tracing = { default-features = false, features = ["std", "attributes"], version = "0.1" } -twilight-gateway-queue = { default-features = false, path = "../twilight-gateway-queue", version = "0.15.2" } twilight-model = { default-features = false, path = "../twilight-model", version = "0.15.2" } # Optional diff --git a/twilight-gateway/src/config.rs b/twilight-gateway/src/config.rs index 2fe7e17f60c..ffc8e623291 100644 --- a/twilight-gateway/src/config.rs +++ b/twilight-gateway/src/config.rs @@ -1,11 +1,7 @@ //! User configuration for shards. -use crate::{tls::TlsContainer, EventTypeFlags, Session}; -use std::{ - fmt::{Debug, Formatter, Result as FmtResult}, - sync::Arc, -}; -use twilight_gateway_queue::{LocalQueue, Queue}; +use crate::{tls::TlsContainer, EventTypeFlags, Queue, Session}; +use std::fmt::{Debug, Formatter, Result as FmtResult}; use twilight_model::gateway::{ payload::outgoing::{identify::IdentifyProperties, update_presence::UpdatePresencePayload}, Intents, @@ -57,7 +53,7 @@ pub struct Config { /// Gateway proxy URL. proxy_url: Option>, /// Queue in use by the shard. - queue: Arc, + queue: Queue, /// Whether [outgoing message] ratelimiting is enabled. /// /// [outgoing message]: crate::Shard::send @@ -133,7 +129,7 @@ impl Config { } /// Immutable reference to the queue in use by the shard. - pub fn queue(&self) -> &Arc { + pub fn queue(&self) -> &Queue { &self.queue } @@ -190,7 +186,7 @@ impl ConfigBuilder { large_threshold: 50, presence: None, proxy_url: None, - queue: Arc::new(LocalQueue::new()), + queue: Queue::default(), ratelimit_messages: true, session: None, tls: TlsContainer::new().unwrap(), @@ -347,13 +343,7 @@ impl ConfigBuilder { } /// Set the queue to use for queueing shard sessions. - /// - /// Defaults to a [`LocalQueue`]. - /// - /// Refer to the [`queue`] module for more information. - /// - /// [`queue`]: crate::queue - pub fn queue(mut self, queue: Arc) -> Self { + pub fn queue(mut self, queue: Queue) -> Self { self.inner.queue = queue; self diff --git a/twilight-gateway/src/lib.rs b/twilight-gateway/src/lib.rs index c64e1e87338..2081746bac8 100644 --- a/twilight-gateway/src/lib.rs +++ b/twilight-gateway/src/lib.rs @@ -26,6 +26,7 @@ mod inflater; mod json; mod latency; mod message; +mod queue; mod ratelimiter; mod session; mod shard; @@ -41,14 +42,13 @@ pub use self::{ json::parse, latency::Latency, message::Message, + queue::Queue, ratelimiter::CommandRatelimiter, session::Session, shard::{ConnectionStatus, Shard}, }; pub use twilight_model::gateway::{CloseFrame, Intents, ShardId}; -#[doc(no_inline)] -pub use twilight_gateway_queue as queue; #[doc(no_inline)] pub use twilight_model::gateway::event::{Event, EventType}; diff --git a/twilight-gateway/src/queue.rs b/twilight-gateway/src/queue.rs new file mode 100644 index 00000000000..9e6fda724d5 --- /dev/null +++ b/twilight-gateway/src/queue.rs @@ -0,0 +1,187 @@ +//! Ratelimiter for gateway `IDENTIFY` commands. +//! +//! Discord limits how often shards can send `IDENTIFY` commands to once every 5 +//! seconds per bucket, with a global daily limit. + +use std::{ + collections::BTreeMap, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::{ + sync::{mpsc, oneshot}, + time::{self, Duration, Instant}, +}; + +/// Delay between `IDENTIFY` commands. +const IDENTIFY_DELAY: Duration = Duration::from_secs(5); + +#[derive(Debug)] +enum Message { + Request { shard: u64, tx: oneshot::Sender<()> }, + Update(Update), +} + +#[derive(Debug)] +struct Update { + max_concurrency: u8, + remaining: u16, + reset_at: Instant, + total: u16, +} + +/// Readiess tokens each [`IDENTIFY_DELAY`]. +async fn waiter( + mut rx: mpsc::UnboundedReceiver, + mut max_concurrency: u8, + mut remaining: u16, + reset_at: Instant, + mut total: u16, +) { + let sleep = time::sleep(Duration::ZERO); + let reset_at = time::sleep_until(reset_at); + tokio::pin! { + sleep, + reset_at + }; + let mut requests = BTreeMap::new(); + + 'outer: loop { + tokio::select! { + biased; + _ = &mut reset_at => { + remaining = total; + reset_at.as_mut().reset(Instant::now() + Duration::from_secs(60 * 60 * 24)); + } + message = rx.recv() => { + match message { + Some(Message::Request{shard, tx}) => { + if max_concurrency == 0 { + _ = tx.send(()); + } else { + requests.insert(shard, tx); + } + } + Some(Message::Update(update)) => { + let deadline; + Update {max_concurrency, remaining, reset_at: deadline, total} = update; + reset_at.as_mut().reset(deadline); + } + None => break, + } + } + _ = &mut sleep, if !requests.is_empty() => { + let mut removed = 0; + while removed < max_concurrency { + if remaining == 0 { + (&mut reset_at).await; + remaining = total; + reset_at.as_mut().reset(Instant::now() + Duration::from_secs(60 * 60 * 24)); + continue 'outer; + } + if let Some((_, tx)) = requests.pop_first() { + if tx.is_closed() { + continue; + } + _ = tx.send(()); + remaining -= 1; + removed += 1; + } + } + sleep.as_mut().reset(Instant::now() + IDENTIFY_DELAY); + } + } + } +} + +/// Queue for shards to request the ability to send `IDENTIFY` commands to +/// initialize new gateway sessions. +/// +/// The queue resets the `remaining` amout to `total` after `reset_after` and +/// then every 24 hours. [`Queue::update`] can override these values after the +/// queue's creation. +/// +/// Cloning a queue is cheap and just increments a reference counter. +#[derive(Clone, Debug)] +pub struct Queue { + tx: mpsc::UnboundedSender, +} + +impl Queue { + /// Creates a new [`Queue`] with custom settings. + pub fn new(max_concurrency: u8, remaining: u16, reset_after: Duration, total: u16) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + + tokio::spawn(waiter( + rx, + max_concurrency, + remaining, + Instant::now() + reset_after, + total, + )); + + Self { tx } + } + + /// Update the queue with new info from the [Get Gateway Bot] endpoint. + /// + /// May be regularly called as the bot joins/leaves guilds. + /// + /// [Get Gateway Bot]: https://discord.com/developers/docs/topics/gateway#get-gateway-bot + pub fn update(&self, max_concurrency: u8, remaining: u16, reset_after: Duration, total: u16) { + self.tx + .send(Message::Update(Update { + max_concurrency, + remaining, + reset_at: Instant::now() + reset_after, + total, + })) + .expect("receiver dropped after sender"); + } + + /// Reserve a token from the queue. + /// + /// Duplicate requests drop the previous request. + pub(crate) fn request(&self, shard: u64) -> Token { + let (tx, rx) = oneshot::channel(); + self.tx + .send(Message::Request { shard, tx }) + .expect("receiver dropped after sender"); + + Token { rx } + } +} + +impl Default for Queue { + /// Creates a new queue with the default settings of: + /// + /// * `max_concurrency`: 1 + /// * `remaining`: 1000 + /// * `reset_after`: 24 hours + /// * `total`: 1000. + fn default() -> Self { + Self::new(1, 1000, Duration::from_secs(60 * 60 * 24), 1000) + } +} + +/// Holds a queue reservation. +#[derive(Debug)] +pub(crate) struct Token { + rx: oneshot::Receiver<()>, +} + +impl Token { + /// Polls for readiness. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the queue is waiting to allow this token + /// * `Poll::Ready(true)` if the queue allows this token. + /// * `Poll::Ready(false)` if the queue canceled this token. + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.rx).poll(cx).map(|r| r.is_ok()) + } +} diff --git a/twilight-gateway/src/shard.rs b/twilight-gateway/src/shard.rs index 04f6a122541..48fa56beda7 100644 --- a/twilight-gateway/src/shard.rs +++ b/twilight-gateway/src/shard.rs @@ -30,8 +30,8 @@ //! which is then forwarded via [`Shard::close`]; or //! b. the interval for the shard to send the next heartbeat occurs, in which //! case [`Shard::heartbeat`] is called; or -//! c. the background identify queue task finishes, in which case -//! [`Shard::send`] is called with the identify payload; or +//! c. the identify token is ready, in which case [`Shard::send`] is called +//! with the identify payload; or //! d. the shard receives a command from the user over the [user channel], //! which is then forwarded via [`Shard::send`]; or //! e. the shard receives a message from Discord via the websocket connection. @@ -65,6 +65,7 @@ use crate::{ }, json::{self, UnknownEventError}, latency::Latency, + queue::Token, ratelimiter::CommandRatelimiter, session::Session, Config, Message, ShardId, @@ -85,10 +86,7 @@ use std::{ str, task::{Context, Poll}, }; -use tokio::{ - task::JoinHandle, - time::{self, Duration, Instant, Interval, MissedTickBehavior}, -}; +use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior}; use tokio_tungstenite::tungstenite::{Error as TungsteniteError, Message as TungsteniteMessage}; use twilight_model::gateway::{ event::{Event, GatewayEventDeserializer}, @@ -250,7 +248,7 @@ struct MinimalReady { /// update its internal state. Note that the [`next_event`] method internally /// calls [`next_message`]. /// -/// Shards go through an [identify queue][`queue`] that ratelimits the amount of +/// Shards go through an [`Queue`] that ratelimits the amount of /// concurrent identifies (across all shards) per 5 seconds. Exceeding this /// limit invalidates the shard's session and it is therefore very important to /// reuse the same queue when running multiple shards. Note that shards must be @@ -328,7 +326,7 @@ struct MinimalReady { /// [`next_event`]: Shard::next_event /// [`next_message`]: Shard::next_message /// [`stream`]: crate::stream -/// [`queue`]: crate::queue +/// [`Queue`]: crate::Queue #[derive(Debug)] pub struct Shard { /// User provided configuration. @@ -354,8 +352,8 @@ pub struct Shard { heartbeat_interval_event: bool, /// ID of the shard. id: ShardId, - /// Identify queue background task handle. - identify_handle: Option>, + /// Identify token. + identify_token: Option, /// Zlib decompressor. #[cfg(any(feature = "zlib-stock", feature = "zlib-simd"))] inflater: Inflater, @@ -401,7 +399,7 @@ impl Shard { heartbeat_interval: None, heartbeat_interval_event: false, id: shard_id, - identify_handle: None, + identify_token: None, #[cfg(any(feature = "zlib-stock", feature = "zlib-simd"))] inflater: Inflater::new(), latency: Latency::new(), @@ -556,7 +554,7 @@ impl Shard { /// /// Returns a [`ReceiveMessageErrorType::SendingMessage`] error type if the /// shard failed to send a message to the gateway, such as a heartbeat. - #[tracing::instrument(fields(id = %self.id()), name = "shard", skip(self))] + #[tracing::instrument(fields(id = %self.id), name = "shard", skip(self))] pub async fn next_message(&mut self) -> Result { /// Actions the shard might take. enum Action { @@ -611,13 +609,16 @@ impl Shard { ratelimiter.poll_available(cx).is_pending() }); - if !ratelimited - && self - .identify_handle - .as_mut() - .map_or(false, |handle| Pin::new(handle).poll(cx).is_ready()) - { - return Poll::Ready(Action::Identify); + if !ratelimited { + if let Some(Poll::Ready(canceled)) = + self.identify_token.as_mut().map(|token| token.poll(cx)) + { + if !canceled { + self.identify_token = None; + return Poll::Ready(Action::Identify); + } + self.identify_token = Some(self.config.queue().request(self.id.number())); + } } if !ratelimited && self.status.is_identified() { @@ -726,8 +727,6 @@ impl Shard { continue; } Action::Identify => { - self.identify_handle = None; - tracing::debug!("sending identify"); let identify = Identify::new(IdentifyInfo { compress: false, @@ -739,7 +738,7 @@ impl Shard { .identify_properties() .cloned() .unwrap_or_else(default_identify_properties), - shard: Some(self.id()), + shard: Some(self.id), token: self.config.token().to_owned(), }); let json = @@ -1107,7 +1106,7 @@ impl Shard { let jitter = heartbeat_interval.mul_f64(rand::random()); tracing::debug!(?heartbeat_interval, ?jitter, "received hello"); - if self.config().ratelimit_messages() { + if self.config.ratelimit_messages() { self.ratelimiter = Some(CommandRatelimiter::new(heartbeat_interval).await); } @@ -1123,21 +1122,12 @@ impl Shard { Some(session) => { tracing::debug!(sequence = session.sequence(), "sending resume"); let resume = - Resume::new(session.sequence(), session.id(), self.config().token()); + Resume::new(session.sequence(), session.id(), self.config.token()); let json = command::prepare(&resume).map_err(ProcessError::from_send)?; self.send(json).await.map_err(ProcessError::from_send)?; } None => { - // Can not use `MessageSender` since it is only polled - // after the shard is identified. - self.identify_handle = Some(tokio::spawn({ - let shard_id = self.id(); - let queue = self.config().queue().clone(); - - async move { - queue.request([shard_id.number(), shard_id.total()]).await; - } - })); + self.identify_token = Some(self.config.queue().request(self.id.number())); } } } diff --git a/twilight-gateway/src/stream.rs b/twilight-gateway/src/stream.rs index 6711fd1024b..ed7f200ed45 100644 --- a/twilight-gateway/src/stream.rs +++ b/twilight-gateway/src/stream.rs @@ -3,7 +3,7 @@ //! Multiple shards may easily be created at once, with a per shard config //! created from a `Fn(ShardId, ConfigBuilder) -> Config` closure, with the help //! of the `create_` set of functions. These functions will reuse shards' -//! TLS context and [session queue], something otherwise achieved by cloning an +//! TLS context and [`Queue`], something otherwise achieved by cloning an //! existing [`Config`]. //! //! # Concurrency @@ -27,9 +27,8 @@ //! //! See the [gateway-parallel] example for how to implement this. //! -//! [`ConfigBuilder::queue`]: crate::ConfigBuilder::queue //! [gateway-parallel]: https://github.com/twilight-rs/twilight/blob/main/examples/gateway-parallel.rs -//! [session queue]: crate::queue +//! [`Queue`]: crate::Queue use crate::{error::ReceiveMessageError, message::Message, Config, ConfigBuilder, Shard, ShardId}; use futures_util::{ @@ -419,7 +418,7 @@ pub fn create_bucket Config>( /// /// ```no_run /// use std::{collections::HashMap, env, sync::Arc}; -/// use twilight_gateway::{queue::LocalQueue, stream, Config, Intents}; +/// use twilight_gateway::{stream, Config, Intents}; /// /// # #[tokio::main] /// # async fn main() -> Result<(), Box> { diff --git a/twilight/Cargo.toml b/twilight/Cargo.toml index 4bd453174fe..3dc63de7b30 100644 --- a/twilight/Cargo.toml +++ b/twilight/Cargo.toml @@ -20,7 +20,6 @@ tracing = "0.1" tracing-subscriber = { default-features = false, features = ["fmt", "tracing-log"], version = "0.3" } twilight-cache-inmemory = { default-features = false, path = "../twilight-cache-inmemory", version = "0.15.2" } twilight-gateway = { default-features = false, features = ["rustls-native-roots"], path = "../twilight-gateway", version = "0.15.2" } -twilight-gateway-queue = { default-features = false, path = "../twilight-gateway-queue", version = "0.15.2" } twilight-http = { default-features = false, features = ["rustls-native-roots"], path = "../twilight-http", version = "0.15.2" } twilight-http-ratelimiting = { default-features = false, path = "../twilight-http-ratelimiting", version = "0.15.1" } twilight-lavalink = { default-features = false, path = "../twilight-lavalink", version = "0.15.1" }