Skip to content

Commit

Permalink
refactor(gateway-queue)!: rewrite crate (#2228)
Browse files Browse the repository at this point in the history
Superceds and is a full rewrite of the proposed rewrite in #2172.
Primarily motivated by removing the use of a background task `Shard`
when waiting to identify. By returning a concrete `Future` type
(implementing `Debug`) it becomes much easier to store it inside of
`Shard`. Of course, I did not stop there, and you will find the
gateway-queue's documentation much improved in addition to code
reduction and an (initial) integration test suit. The provided
`InMemoryQueue` should be more flexible for advanced use cases than
`LargeBotQueue` whilst offering more features without added complexity
than `LocalQueue`.

Refer to #2172 for motivation on changing the identify interval from 6
seconds to 5 seconds.
  • Loading branch information
vilgotf committed Sep 10, 2023
1 parent 78002c3 commit cbe1d10
Show file tree
Hide file tree
Showing 14 changed files with 581 additions and 476 deletions.
15 changes: 7 additions & 8 deletions book/src/chapter_2_multi-serviced_approach.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,18 @@ One of the popular design choices when creating a multi-serviced application is
to have a service that only connects shards to the gateway and sends the events
to a broker to be processed. As bots grow into hundreds or thousands of shards,
multiple instances of the application can be created and groups of shards can be
managed by each. Twilight is an excellent choice for this use case: you can
receive either events that come in in a loop and send the payloads to the
appropriate broker stream, or you can loop over received payloads' bytes to send
off.
managed by each. Twilight is an excellent choice for this use case: just receive
and send the payloads to the appropriate broker stream. Twilight shards need
only partially deserialize payloads to function.

## Gateway session ratelimiting

If multiple shard groups are used, then they need to be queued and their session
initialization ratelimited. The Gateway includes a Queue trait which can be
initialization ratelimited. The gateway includes a Queue trait which can be
implemented; the gateway will submit a request to the queue before starting a
session. Twilight comes with a queue that supports sharding and Large Bot
sharding, but when multiple shard groups are in use then a custom queue will
need to be implemented. Refer to [gateway-queue] for an example of this.
session. Twilight comes with a queue that supports Large Bot sharding, but when
multiple shard groups are in use then a custom queue will need to be implemented.
Refer to [gateway-queue] for an example of this.

## HTTP proxy ratelimiting

Expand Down
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ twilight-standby = { path = "../twilight-standby" }
name = "gateway-parallel"
path = "gateway-parallel.rs"

[[example]]
name = "gateway-queue-http"
path = "gateway-queue-http.rs"

[[example]]
name = "gateway-reshard"
path = "gateway-reshard.rs"
Expand Down
62 changes: 62 additions & 0 deletions examples/gateway-queue-http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use hyper::client::{Client, HttpConnector};
use std::{env, sync::Arc};
use tokio::sync::oneshot;
use twilight_gateway::{queue::Queue, Config, Intents, Shard, ShardId};

#[derive(Debug)]
struct HttpQueue(Client<HttpConnector>);

impl Queue for HttpQueue {
fn enqueue(&self, id: u32) -> oneshot::Receiver<()> {
let (mut tx, rx) = oneshot::channel();
let uri = format!("http://127.0.0.1:8000/?shard={id}");
let req = self.0.get(uri.parse().unwrap());

tokio::spawn(async move {
tokio::select! {
_ = tx.closed() => {}
res = req => {
match res {
Ok(_) => _ = tx.send(()),
Err(source) => tracing::info!("error sending request: {source}"),
}
}
}
});

rx
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

let token = env::var("DISCORD_TOKEN")?;
let intents = Intents::GUILDS | Intents::GUILD_VOICE_STATES;

let config = Config::builder(token, intents)
.queue(Arc::new(HttpQueue(Client::new())))
.build();

let mut shard = Shard::with_config(ShardId::ONE, config);

loop {
let event = match shard.next_event().await {
Ok(event) => event,
Err(source) => {
tracing::warn!(?source, "error receiving event");

if source.is_fatal() {
break;
}

continue;
}
};

tracing::debug!(?event, "received event");
}

Ok(())
}
16 changes: 4 additions & 12 deletions twilight-gateway-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,10 @@ rust-version.workspace = true
version = "0.15.4"

[dependencies]
tokio = { default-features = false, features = ["rt", "sync", "time"], version = "1.0" }
tracing = { default-features = false, features = ["std", "attributes"], version = "0.1" }

# Optional dependencies.
twilight-http = { default-features = false, optional = true, path = "../twilight-http", version = "0.15.4" }
tokio = { default-features = false, features = ["macros", "rt", "sync", "time"], version = "1.15" }
tracing = { default-features = false, features = ["std"], version = "0.1" }

[dev-dependencies]
static_assertions = { default-features = false, version = "1" }

[features]
default = ["twilight-http"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
tokio = { default-features = false, features = ["test-util"], version = "1.0" }
twilight-http = { default-features = false, path = "../twilight-http", version = "0.15.2" }
69 changes: 29 additions & 40 deletions twilight-gateway-queue/README.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,31 @@
# twilight-gateway-queue

Ratelimiting functionality for queueing new gateway sessions.

The gateway ratelimits how often clients can initialize new sessions.
Instances of a queue are given to shards so that they can request to
initialize a session.

Queue implementations must point to the same broker so that all shards
across all shard groups, processes, and other forms of multi-serviced
applications, can work together and use the same ratelimiting source. That is,
if two shard groups are in two different processes, then the the two processes
must use some unified form of ratelimiting: this can either mean using IPC to
communicate ratelimiting or a broker.

## Provided queues

Most users only need the [`LocalQueue`]: it's a single-process queue for
smaller bots. Larger bots need the [`LargeBotQueue`], which supports
single-process [Sharding for Large Bots] through the use of bucket
releasing.

By default, the gateway's `stream` module and `Shard`s use the [`LocalQueue`].
This can be overridden via the `ShardBuilder::queue` configuration method.

## Advanced use cases

Large bots, and smaller bots out of design, may need to implement their own
queue. The most common reason to need this is if you have shard groups in
multiple processes. A broker to manage ratelimiting across shard groups is
required, so a [`Queue`] trait is provided that shards can use to make requests
to create sessions.

## Features

### Twilight-HTTP

The `twilight-http` feature brings in support for [`LargeBotQueue`].

This is enabled by default.

[Sharding for Large Bots]: https://discord.com/developers/docs/topics/gateway#sharding-for-large-bots
[![codecov badge][]][codecov link] [![discord badge][]][discord link] [![github badge][]][github link] [![license badge][]][license link] ![rust badge]

Rate limiting functionality for gateway `IDENTIFY` commands.

Discord allows bot's shards to send a limited amount of `IDENTIFY` commands
every 5 seconds, with a daily limit from 1000 to 2000 commands, and invalidates
*all* shard sessions upon exceeding it. Each identify interval may be filled by
shards' IDs modulo `max_concurrency` and such a set of shards is called a
bucket. See [Discord Docs/Sharding].

To coordinate this, a [`Queue`] should process each identify request and shards
should wait for its signal to proceed before continuing and otherwise retry. The
provided [`InMemoryQueue`] never fails or cancels requests and is therefore a
good starting point for custom implementations. It can also be composed to
support multiple processes; see [`gateway-queue-http`] and [`gateway-queue`] for
a HTTP client and server implementation, respectively.

[codecov badge]: https://img.shields.io/codecov/c/gh/twilight-rs/twilight?logo=codecov&style=for-the-badge&token=E9ERLJL0L2
[codecov link]: https://app.codecov.io/gh/twilight-rs/twilight/
[discord badge]: https://img.shields.io/discord/745809834183753828?color=%237289DA&label=discord%20server&logo=discord&style=for-the-badge
[Discord Docs/Sharding]: https://discord.com/developers/docs/topics/gateway#sharding
[discord link]: https://discord.gg/twilight-rs
[`gateway-queue`]: https://github.com/twilight-rs/gateway-queue
[`gateway-queue-http`]: https://github.com/twilight-rs/twilight/blob/main/examples/gateway-queue-http.rs
[github badge]: https://img.shields.io/badge/github-twilight-6f42c1.svg?style=for-the-badge&logo=github
[github link]: https://github.com/twilight-rs/twilight
[license badge]: https://img.shields.io/badge/license-ISC-blue.svg?style=for-the-badge&logo=pastebin
[license link]: https://github.com/twilight-rs/twilight/blob/main/LICENSE.md
[rust badge]: https://img.shields.io/badge/rust-1.67+-93450a.svg?style=for-the-badge&logo=rust
124 changes: 0 additions & 124 deletions twilight-gateway-queue/src/day_limiter.rs

This file was deleted.

Loading

0 comments on commit cbe1d10

Please sign in to comment.