Skip to content

Commit

Permalink
Implement a smarter task flushing scheduler
Browse files Browse the repository at this point in the history
Instead of just waiting on the oldest N tasks to finish, we continuously try to find already completed tasks.

Signed-off-by: Alex Saveau <[email protected]>
  • Loading branch information
SUPERCILEX committed Jan 28, 2024
1 parent cf59a96 commit fbc87d1
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions src/core/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
cmp::max,
collections::VecDeque,
io,
io, mem,
num::{NonZeroU64, NonZeroUsize},
ops::AddAssign,
path::PathBuf,
Expand Down Expand Up @@ -220,7 +220,19 @@ async fn flush_tasks(
) -> Result<(), Error> {
#[cfg(feature = "tracing")]
tracing::event!(tracing::Level::TRACE, "Flushing pending task queue");
for task in tasks.drain(..tasks.len() / 2) {

let mut drain = tasks.len() / 2;
while drain > 0 {
let task = {
let mut task = tasks.pop_front().unwrap();
if !task.is_finished()
&& let Some(h) = tasks.iter_mut().find(|h| h.is_finished())
{
mem::swap(&mut task, h);
}
task
};

#[cfg(not(feature = "dry_run"))]
let outcome = handle_task_result(task.await, stats)?;
#[cfg(feature = "dry_run")]
Expand All @@ -231,6 +243,8 @@ async fn flush_tasks(
vec.clear();
byte_counts_pool.push(vec);
}

drain -= 1;
}
Ok(())
}
Expand Down

0 comments on commit fbc87d1

Please sign in to comment.