Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(gateway): cleanup parallel example #2223

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 15 additions & 37 deletions examples/gateway-parallel.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
//! Run the recommended number of shards in `available_parallelism` tasks. Note
//! that neither [`ShardEventStream`] nor [`ShardMessageStream`] is required for
//! parallelism; each shard can run on an independent task.
//!
//! [`ShardMessageStream`]: twilight_gateway::stream::ShardMessageStream

use futures_util::{future::join_all, StreamExt};
use std::{env, iter, thread};
//! Run the recommended number of shards, each in their own task.

use std::env;
use tokio::{signal, sync::watch, task::JoinSet};
use twilight_gateway::{
stream::{self, ShardEventStream},
CloseFrame, Config, Intents, Shard,
};
use twilight_gateway::{stream, CloseFrame, Config, Intents, Shard};
use twilight_http::Client;

#[tokio::main]
Expand All @@ -19,37 +11,25 @@ async fn main() -> anyhow::Result<()> {

let token = env::var("DISCORD_TOKEN")?;
let client = Client::new(token.clone());
let config = Config::new(token.clone(), Intents::GUILDS);

let tasks = thread::available_parallelism()?.get();
let config = Config::new(token, Intents::GUILDS);

// Split shards into a vec of `tasks` vecs of shards.
let init = iter::repeat_with(Vec::new)
.take(tasks)
.collect::<Vec<Vec<_>>>();
let shards = stream::create_recommended(&client, config, |_, builder| builder.build())
.await?
.enumerate()
.fold(init, |mut fold, (idx, shard)| {
fold[idx % tasks].push(shard);
fold
});
.collect::<Vec<_>>();
vilgotf marked this conversation as resolved.
Show resolved Hide resolved

let (tx, rx) = watch::channel(false);

let mut set = JoinSet::new();

for mut shards in shards {
for mut shard in shards {
let mut rx = rx.clone();
set.spawn(async move {
// Run `process` and `rx.changed()` concurrently, returning when
// the first branch completes, cancelling the other one.
// Run the two futures concurrently, returning when the first branch
// completes and cancels the other one.
tokio::select! {
_ = process(shards.iter_mut()) => {},
_ = runner(&mut shard) => {},
_ = rx.changed() => {
join_all(shards.iter_mut().map(|shard| async move {
shard.close(CloseFrame::NORMAL).await
})).await;
_ = shard.close(CloseFrame::NORMAL).await;
}
}
});
Expand All @@ -68,12 +48,11 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

async fn process(shards: impl Iterator<Item = &mut Shard>) {
let mut stream = ShardEventStream::new(shards);
async fn runner(shard: &mut Shard) {
loop {
let (shard, event) = match stream.next().await {
Some((shard, Ok(event))) => (shard, event),
Some((_, Err(source))) => {
let event = match shard.next_event().await {
Ok(event) => event,
Err(source) => {
tracing::warn!(?source, "error receiving event");

if source.is_fatal() {
Expand All @@ -82,7 +61,6 @@ async fn process(shards: impl Iterator<Item = &mut Shard>) {

continue;
}
None => break,
};

tracing::debug!(?event, shard = ?shard.id(), "received event");
Expand Down