From dad6139f95321b103e461f176351a971a43b835e Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 20 Dec 2024 13:56:12 +0100 Subject: [PATCH] pageserver: reorder upload queue when possible --- .../src/tenant/remote_timeline_client.rs | 96 +- pageserver/src/tenant/upload_queue.rs | 906 +++++++++++++++++- 2 files changed, 913 insertions(+), 89 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index eb85f1f7d27b..8f395794c216 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -63,18 +63,22 @@ //! The contract between client and its user is that the user is responsible of //! scheduling operations in an order that keeps the remote consistent as //! described above. +//! //! From the user's perspective, the operations are executed sequentially. //! Internally, the client knows which operations can be performed in parallel, //! and which operations act like a "barrier" that require preceding operations //! to finish. The calling code just needs to call the schedule-functions in the //! correct order, and the client will parallelize the operations in a way that -//! is safe. +//! is safe. For more details, see `UploadOp::can_bypass`. //! //! The caller should be careful with deletion, though. They should not delete //! local files that have been scheduled for upload but not yet finished uploading. //! Otherwise the upload will fail. To wait for an upload to finish, use //! the 'wait_completion' function (more on that later.) //! +//! (TODO: the above isn't accurate with the introduction of `UploadOp::can_bypass`, +//! consider removing it as well as `wait_completion`) +//! //! All of this relies on the following invariants: //! //! - We rely on read-after write consistency in the remote storage. @@ -1797,57 +1801,16 @@ impl RemoteTimelineClient { Ok(()) } - /// /// Pick next tasks from the queue, and start as many of them as possible without violating /// the ordering constraints. /// - /// The caller needs to already hold the `upload_queue` lock. + /// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does, + /// to avoid tenants starving other tenants. fn launch_queued_tasks(self: &Arc, upload_queue: &mut UploadQueueInitialized) { - while let Some(next_op) = upload_queue.queued_operations.front() { - // Can we run this task now? - let can_run_now = match next_op { - UploadOp::UploadLayer(..) => { - // Can always be scheduled. - true - } - UploadOp::UploadMetadata { .. } => { - // These can only be performed after all the preceding operations - // have finished. - upload_queue.inprogress_tasks.is_empty() - } - UploadOp::Delete(..) => { - // Wait for preceding uploads to finish. Concurrent deletions are OK, though. - upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len() - } - - UploadOp::Barrier(_) | UploadOp::Shutdown => { - upload_queue.inprogress_tasks.is_empty() - } - }; - - // If we cannot launch this task, don't look any further. - // - // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch - // them now, but we don't try to do that currently. For example, if the frontmost task - // is an index-file upload that cannot proceed until preceding uploads have finished, we - // could still start layer uploads that were scheduled later. - if !can_run_now { - break; - } + while let Some(mut next_op) = upload_queue.next_ready() { + debug!("starting op: {next_op}"); - if let UploadOp::Shutdown = next_op { - // leave the op in the queue but do not start more tasks; it will be dropped when - // the stop is called. - upload_queue.shutdown_ready.close(); - break; - } - - // We can launch this task. Remove it from the queue first. - let mut next_op = upload_queue.queued_operations.pop_front().unwrap(); - - debug!("starting op: {}", next_op); - - // Update the counters and prepare + // Prepare upload. match &mut next_op { UploadOp::UploadLayer(layer, meta, mode) => { if upload_queue @@ -1858,18 +1821,14 @@ impl RemoteTimelineClient { } else { *mode = Some(OpType::MayReorder) } - upload_queue.num_inprogress_layer_uploads += 1; - } - UploadOp::UploadMetadata { .. } => { - upload_queue.num_inprogress_metadata_uploads += 1; } + UploadOp::UploadMetadata { .. } => {} UploadOp::Delete(Delete { layers }) => { for (name, meta) in layers { upload_queue .recently_deleted .insert((name.clone(), meta.generation)); } - upload_queue.num_inprogress_deletions += 1; } UploadOp::Barrier(sender) => { sender.send_replace(()); @@ -1967,6 +1926,8 @@ impl RemoteTimelineClient { let upload_result: anyhow::Result<()> = match &task.op { UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => { + // TODO: check if this mechanism can be removed now that can_bypass() performs + // conflict checks during scheduling. if let Some(OpType::FlushDeletion) = mode { if self.config.read().unwrap().block_deletions { // Of course, this is not efficient... but usually the queue should be empty. @@ -2189,13 +2150,8 @@ impl RemoteTimelineClient { upload_queue.inprogress_tasks.remove(&task.task_id); let lsn_update = match task.op { - UploadOp::UploadLayer(_, _, _) => { - upload_queue.num_inprogress_layer_uploads -= 1; - None - } + UploadOp::UploadLayer(_, _, _) => None, UploadOp::UploadMetadata { ref uploaded } => { - upload_queue.num_inprogress_metadata_uploads -= 1; - // the task id is reused as a monotonicity check for storing the "clean" // IndexPart. let last_updater = upload_queue.clean.1; @@ -2229,10 +2185,7 @@ impl RemoteTimelineClient { None } } - UploadOp::Delete(_) => { - upload_queue.num_inprogress_deletions -= 1; - None - } + UploadOp::Delete(_) => None, UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(), }; @@ -2356,9 +2309,6 @@ impl RemoteTimelineClient { visible_remote_consistent_lsn: initialized .visible_remote_consistent_lsn .clone(), - num_inprogress_layer_uploads: 0, - num_inprogress_metadata_uploads: 0, - num_inprogress_deletions: 0, inprogress_tasks: HashMap::default(), queued_operations: VecDeque::default(), #[cfg(feature = "testing")] @@ -2385,14 +2335,6 @@ impl RemoteTimelineClient { } }; - // consistency check - assert_eq!( - qi.num_inprogress_layer_uploads - + qi.num_inprogress_metadata_uploads - + qi.num_inprogress_deletions, - qi.inprogress_tasks.len() - ); - // We don't need to do anything here for in-progress tasks. They will finish // on their own, decrement the unfinished-task counter themselves, and observe // that the queue is Stopped. @@ -2852,8 +2794,8 @@ mod tests { let mut guard = client.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut().unwrap(); assert!(upload_queue.queued_operations.is_empty()); - assert!(upload_queue.inprogress_tasks.len() == 2); - assert!(upload_queue.num_inprogress_layer_uploads == 2); + assert_eq!(upload_queue.inprogress_tasks.len(), 2); + assert_eq!(upload_queue.num_inprogress_layer_uploads(), 2); // also check that `latest_file_changes` was updated assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2); @@ -2923,8 +2865,8 @@ mod tests { // Deletion schedules upload of the index file, and the file deletion itself assert_eq!(upload_queue.queued_operations.len(), 2); assert_eq!(upload_queue.inprogress_tasks.len(), 1); - assert_eq!(upload_queue.num_inprogress_layer_uploads, 1); - assert_eq!(upload_queue.num_inprogress_deletions, 0); + assert_eq!(upload_queue.num_inprogress_layer_uploads(), 1); + assert_eq!(upload_queue.num_inprogress_deletions(), 0); assert_eq!( upload_queue.latest_files_changes_since_metadata_upload_scheduled, 0 diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index ef3aa759f303..a6c56ff20349 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -1,3 +1,5 @@ +use super::remote_timeline_client::is_same_remote_layer_path; +use super::storage_layer::AsLayerDesc as _; use super::storage_layer::LayerName; use super::storage_layer::ResidentLayer; use crate::tenant::metadata::TimelineMetadata; @@ -8,6 +10,7 @@ use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use chrono::NaiveDateTime; +use once_cell::sync::Lazy; use std::sync::Arc; use tracing::info; use utils::lsn::AtomicLsn; @@ -17,6 +20,11 @@ use utils::lsn::Lsn; use utils::generation::Generation; +/// Kill switch for upload queue reordering in case it causes problems. +/// TODO: remove this once we have confidence in it. +static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy = + Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true")); + // clippy warns that Uninitialized is much smaller than Initialized, which wastes // memory for Uninitialized variants. Doesn't matter in practice, there are not // that many upload queues in a running pageserver, and most of them are initialized @@ -70,11 +78,6 @@ pub(crate) struct UploadQueueInitialized { /// we skip validation) pub(crate) visible_remote_consistent_lsn: Arc, - // Breakdown of different kinds of tasks currently in-progress - pub(crate) num_inprogress_layer_uploads: usize, - pub(crate) num_inprogress_metadata_uploads: usize, - pub(crate) num_inprogress_deletions: usize, - /// Tasks that are currently in-progress. In-progress means that a tokio Task /// has been launched for it. An in-progress task can be busy uploading, but it can /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can @@ -122,6 +125,129 @@ impl UploadQueueInitialized { let lsn = self.clean.0.metadata.disk_consistent_lsn(); self.clean.1.map(|_| lsn) } + + /// Returns and removes the next ready operation from the queue, if any. This isn't necessarily + /// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump + /// the queue if it doesn't conflict with operations ahead of it. + /// + /// None may be returned even if the queue isn't empty, if no operations are ready yet. + pub(crate) fn next_ready(&mut self) -> Option { + // NB: this is quadratic, but queues are expected to be small. + for (i, candidate) in self.queued_operations.iter().enumerate() { + // If this candidate is ready, go for it. Otherwise, try the next one. + if self.is_ready(i) { + // Shutdown operations are left at the head of the queue, to prevent further + // operations from starting. Signal that we're ready to shut down. + if matches!(candidate, UploadOp::Shutdown) { + assert!(self.inprogress_tasks.is_empty(), "shutdown with tasks"); + assert_eq!(i, 0, "shutdown not at head of queue"); + self.shutdown_ready.close(); + return None; + } + + return self.queued_operations.remove(i); + } + + // Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up. + if matches!(candidate, UploadOp::Barrier(_) | UploadOp::Shutdown) { + return None; + } + + // If upload queue reordering is disabled, bail out after the first operation. + if *DISABLE_UPLOAD_QUEUE_REORDERING { + return None; + } + } + None + } + + /// Returns true if the queued operation at the given position is ready to be uploaded, i.e. if + /// it doesn't conflict with any in-progress or queued operations ahead of it. Operations are + /// allowed to skip the queue when it's safe to do so, to increase parallelism. + /// + /// The position must be valid for the queue size. + fn is_ready(&self, pos: usize) -> bool { + let candidate = self.queued_operations.get(pos).expect("invalid position"); + self + // Look at in-progress operations, in random order. + .inprogress_tasks + .values() + .map(|task| &task.op) + // Then queued operations ahead of the candidate, front-to-back. + .chain(self.queued_operations.iter().take(pos)) + // Keep track of the active index ahead of each operation. This is used to ensure that + // an upload doesn't skip the queue too far, such that it modifies a layer that's + // referenced by an active index. + // + // It's okay that in-progress operations are emitted in random order above, since at + // most one of them can be an index upload (enforced by can_bypass). + .scan(&self.clean.0, |next_active_index, op| { + let active_index = *next_active_index; + if let UploadOp::UploadMetadata { ref uploaded } = op { + *next_active_index = uploaded; // stash index for next operation after this + } + Some((op, active_index)) + }) + // Check if the candidate can bypass all of them. + .all(|(op, active_index)| candidate.can_bypass(op, active_index)) + } + + /// Returns the number of in-progress deletion operations. + #[cfg(test)] + pub(crate) fn num_inprogress_deletions(&self) -> usize { + self.inprogress_tasks + .iter() + .filter(|(_, t)| matches!(t.op, UploadOp::Delete(_))) + .count() + } + + /// Returns the number of in-progress layer uploads. + #[cfg(test)] + pub(crate) fn num_inprogress_layer_uploads(&self) -> usize { + self.inprogress_tasks + .iter() + .filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _))) + .count() + } + + /// Test helper that schedules all ready operations into inprogress_tasks, and returns + /// references to them. + /// + /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into + /// UploadQueue, so we can use the same code path. + #[cfg(test)] + fn schedule_ready(&mut self) -> Vec> { + let mut tasks = Vec::new(); + // NB: schedule operations one by one, to handle conflicts with inprogress_tasks. + while let Some(op) = self.next_ready() { + self.task_counter += 1; + let task = Arc::new(UploadTask { + task_id: self.task_counter, + op, + retries: 0.into(), + }); + self.inprogress_tasks.insert(task.task_id, task.clone()); + tasks.push(task); + } + tasks + } + + /// Test helper that marks an operation as completed, removing it from inprogress_tasks. + /// + /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into + /// UploadQueue, so we can use the same code path. + #[cfg(test)] + fn complete(&mut self, task_id: u64) { + let Some(task) = self.inprogress_tasks.remove(&task_id) else { + return; + }; + // Update the clean index on uploads. + if let UploadOp::UploadMetadata { ref uploaded } = task.op { + if task.task_id > self.clean.1.unwrap_or_default() { + self.clean = (*uploaded.clone(), Some(task.task_id)); + } + } + } } #[derive(Clone, Copy)] @@ -185,9 +311,6 @@ impl UploadQueue { visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)), // what follows are boring default initializations task_counter: 0, - num_inprogress_layer_uploads: 0, - num_inprogress_metadata_uploads: 0, - num_inprogress_deletions: 0, inprogress_tasks: HashMap::new(), queued_operations: VecDeque::new(), #[cfg(feature = "testing")] @@ -227,9 +350,6 @@ impl UploadQueue { ), // what follows are boring default initializations task_counter: 0, - num_inprogress_layer_uploads: 0, - num_inprogress_metadata_uploads: 0, - num_inprogress_deletions: 0, inprogress_tasks: HashMap::new(), queued_operations: VecDeque::new(), #[cfg(feature = "testing")] @@ -291,7 +411,7 @@ pub(crate) struct Delete { pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) enum UploadOp { /// Upload a layer file. The last field indicates the last operation for thie file. UploadLayer(ResidentLayer, LayerFileMetadata, Option), @@ -338,3 +458,765 @@ impl std::fmt::Display for UploadOp { } } } + +impl UploadOp { + /// Returns true if self can bypass other, i.e. if the operations don't conflict. index is the + /// active index when other would be uploaded -- if we allow self to bypass other, this would + /// be the active index when self is uploaded. + pub fn can_bypass(&self, other: &UploadOp, index: &IndexPart) -> bool { + match (self, other) { + // Nothing can bypass a barrier or shutdown, and it can't bypass anything. + (UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false, + (UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false, + + // Uploads and deletes can bypass each other unless they're for the same file. + (UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => { + let aname = &a.layer_desc().layer_name(); + let bname = &b.layer_desc().layer_name(); + !is_same_remote_layer_path(aname, ameta, bname, bmeta) + } + (UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d)) + | (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => { + d.layers.iter().all(|(dname, dmeta)| { + !is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta) + }) + } + + // Deletes are idempotent and can always bypass each other. + (UploadOp::Delete(_), UploadOp::Delete(_)) => true, + + // Uploads and deletes can bypass an index upload as long as neither the uploaded index + // nor the active index below it references the file. A layer can't be modified or + // deleted while referenced by an index. + // + // Similarly, index uploads can bypass uploads and deletes as long as neither the + // uploaded index nor the active index references the file (the latter would be + // incorrect use by the caller). + (UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i }) + | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => { + let uname = u.layer_desc().layer_name(); + !i.references(&uname, umeta) && !index.references(&uname, umeta) + } + (UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i }) + | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => { + d.layers.iter().all(|(dname, dmeta)| { + !i.references(dname, dmeta) && !index.references(dname, dmeta) + }) + } + + // Indexes can never bypass each other. + // TODO: we could coalesce them though, by only uploading the newest ready index. This + // is left for later, out of caution. + (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tenant::harness::{TenantHarness, TIMELINE_ID}; + use crate::tenant::storage_layer::layer::local_layer_path; + use crate::tenant::storage_layer::Layer; + use crate::tenant::Timeline; + use crate::DEFAULT_PG_VERSION; + use itertools::Itertools as _; + use std::str::FromStr as _; + use utils::shard::{ShardCount, ShardIndex, ShardNumber}; + + /// Test helper which asserts that two operations are the same, in lieu of UploadOp PartialEq. + #[track_caller] + fn assert_same_op(a: &UploadOp, b: &UploadOp) { + use UploadOp::*; + match (a, b) { + (UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => { + assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name()); + assert_eq!(ameta, bmeta); + assert_eq!(atype, btype); + } + (Delete(a), Delete(b)) => assert_eq!(a.layers, b.layers), + (UploadMetadata { uploaded: a }, UploadMetadata { uploaded: b }) => assert_eq!(a, b), + (Barrier(_), Barrier(_)) => {} + (Shutdown, Shutdown) => {} + (a, b) => panic!("{a:?} != {b:?}"), + } + } + + /// Test helper which asserts that two sets of operations are the same. + #[track_caller] + fn assert_same_ops<'a>( + a: impl IntoIterator, + b: impl IntoIterator, + ) { + a.into_iter() + .zip_eq(b) + .for_each(|(a, b)| assert_same_op(a, b)) + } + + /// Test helper to construct a test timeline. + /// + /// TODO: it really shouldn't be necessary to construct an entire tenant and timeline just to + /// test the upload queue -- decouple ResidentLayer from Timeline. + /// + /// TODO: the upload queue uses TimelineMetadata::example() instead, because there's no way to + /// obtain a TimelineMetadata from a Timeline. + fn make_timeline() -> Arc { + // Grab the current test name from the current thread name. + // TODO: TenantHarness shouldn't take a &'static str, but just leak the test name for now. + let test_name = std::thread::current().name().unwrap().to_string(); + let test_name = Box::leak(test_name.into_boxed_str()); + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to create runtime"); + + runtime + .block_on(async { + let harness = TenantHarness::create(test_name).await?; + let (tenant, ctx) = harness.load().await; + tenant + .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx) + .await + }) + .expect("failed to create timeline") + } + + /// Test helper to construct an (empty) resident layer. + fn make_layer(timeline: &Arc, name: &str) -> ResidentLayer { + make_layer_with_size(timeline, name, 0) + } + + /// Test helper to construct a resident layer with the given size. + fn make_layer_with_size(timeline: &Arc, name: &str, size: usize) -> ResidentLayer { + let metadata = LayerFileMetadata { + generation: timeline.generation, + shard: timeline.get_shard_index(), + file_size: size as u64, + }; + make_layer_with_metadata(timeline, name, metadata) + } + + /// Test helper to construct a layer with the given metadata. + fn make_layer_with_metadata( + timeline: &Arc, + name: &str, + metadata: LayerFileMetadata, + ) -> ResidentLayer { + let name = LayerName::from_str(name).expect("invalid name"); + let local_path = local_layer_path( + timeline.conf, + &timeline.tenant_shard_id, + &timeline.timeline_id, + &name, + &metadata.generation, + ); + std::fs::write(&local_path, vec![0; metadata.file_size as usize]) + .expect("failed to write file"); + Layer::for_resident(timeline.conf, timeline, local_path, name, metadata) + } + + /// Test helper to add a layer to an index and return a new index. + fn index_with(index: &IndexPart, layer: &ResidentLayer) -> Box { + let mut index = index.clone(); + index + .layer_metadata + .insert(layer.layer_desc().layer_name(), layer.metadata()); + Box::new(index) + } + + /// Test helper to remove a layer from an index and return a new index. + fn index_without(index: &IndexPart, layer: &ResidentLayer) -> Box { + let mut index = index.clone(); + index + .layer_metadata + .remove(&layer.layer_desc().layer_name()); + Box::new(index) + } + + /// Nothing can bypass a barrier, and it can't bypass inprogress tasks. + #[test] + fn schedule_barrier() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?; + let tli = make_timeline(); + + let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter + let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let (barrier, _) = tokio::sync::watch::channel(()); + + // Enqueue non-conflicting upload, delete, and index before and after a barrier. + let ops = [ + UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::Delete(Delete { + layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())], + }), + UploadOp::UploadMetadata { + uploaded: index.clone(), + }, + UploadOp::Barrier(barrier), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::Delete(Delete { + layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())], + }), + UploadOp::UploadMetadata { + uploaded: index.clone(), + }, + ]; + + queue.queued_operations.extend(ops.clone()); + + // Schedule the initial operations ahead of the barrier. + let tasks = queue.schedule_ready(); + + assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]); + assert!(matches!( + queue.queued_operations.front(), + Some(&UploadOp::Barrier(_)) + )); + + // Complete the initial operations. The barrier isn't scheduled while they're pending. + for task in tasks { + assert!(queue.schedule_ready().is_empty()); + queue.complete(task.task_id); + } + + // Schedule the barrier. The later tasks won't schedule until it completes. + let tasks = queue.schedule_ready(); + + assert_eq!(tasks.len(), 1); + assert!(matches!(tasks[0].op, UploadOp::Barrier(_))); + assert_eq!(queue.queued_operations.len(), 3); + + // Complete the barrier. The rest of the tasks schedule immediately. + queue.complete(tasks[0].task_id); + + let tasks = queue.schedule_ready(); + assert_same_ops(tasks.iter().map(|t| &t.op), &ops[4..]); + assert!(queue.queued_operations.is_empty()); + + Ok(()) + } + + /// Deletes can be scheduled in parallel, even if they're for the same file. + #[test] + fn schedule_delete_parallel() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?; + let tli = make_timeline(); + + // Enqueue a bunch of deletes, some with conflicting names. + let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + + let ops = [ + UploadOp::Delete(Delete { + layers: vec![(layer0.layer_desc().layer_name(), layer0.metadata())], + }), + UploadOp::Delete(Delete { + layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())], + }), + UploadOp::Delete(Delete { + layers: vec![ + (layer1.layer_desc().layer_name(), layer1.metadata()), + (layer2.layer_desc().layer_name(), layer2.metadata()), + ], + }), + UploadOp::Delete(Delete { + layers: vec![(layer2.layer_desc().layer_name(), layer2.metadata())], + }), + UploadOp::Delete(Delete { + layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())], + }), + ]; + + queue.queued_operations.extend(ops.clone()); + + // Schedule all ready operations. Since deletes don't conflict, they're all scheduled. + let tasks = queue.schedule_ready(); + + assert_same_ops(tasks.iter().map(|t| &t.op), &ops); + assert!(queue.queued_operations.is_empty()); + + Ok(()) + } + + /// Conflicting uploads are serialized. + #[test] + fn schedule_upload_conflicts() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let tli = make_timeline(); + + // Enqueue three versions of the same layer, with different file sizes. + let layer0a = make_layer_with_size(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51", 1); + let layer0b = make_layer_with_size(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51", 2); + let layer0c = make_layer_with_size(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51", 3); + + let ops = [ + UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None), + UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None), + UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None), + ]; + + queue.queued_operations.extend(ops.clone()); + + // Only one version should be scheduled and uploaded at a time. + for op in ops { + let tasks = queue.schedule_ready(); + assert_eq!(tasks.len(), 1); + assert_same_op(&tasks[0].op, &op); + queue.complete(tasks[0].task_id); + } + assert!(queue.schedule_ready().is_empty()); + assert!(queue.queued_operations.is_empty()); + + Ok(()) + } + + /// Conflicting uploads and deletes are serialized. + #[test] + fn schedule_upload_delete_conflicts() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let tli = make_timeline(); + + // Enqueue two layer uploads, with a delete of both layers in between them. These should be + // scheduled one at a time, since deletes can't bypass uploads and vice versa. + let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + + let ops = [ + UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::Delete(Delete { + layers: vec![ + (layer0.layer_desc().layer_name(), layer0.metadata()), + (layer1.layer_desc().layer_name(), layer1.metadata()), + ], + }), + UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), + ]; + + queue.queued_operations.extend(ops.clone()); + + // Only one version should be scheduled and uploaded at a time. + for op in ops { + let tasks = queue.schedule_ready(); + assert_eq!(tasks.len(), 1); + assert_same_op(&tasks[0].op, &op); + queue.complete(tasks[0].task_id); + } + assert!(queue.schedule_ready().is_empty()); + assert!(queue.queued_operations.is_empty()); + + Ok(()) + } + + /// Non-conflicting uploads and deletes can bypass the queue, avoiding the conflicting + /// delete/upload operations at the head of the queue. + #[test] + fn schedule_upload_delete_conflicts_bypass() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let tli = make_timeline(); + + // Enqueue two layer uploads, with a delete of both layers in between them. These should be + // scheduled one at a time, since deletes can't bypass uploads and vice versa. + // + // Also enqueue non-conflicting uploads and deletes at the end. These can bypass the queue + // and run immediately. + let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + + let ops = [ + UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::Delete(Delete { + layers: vec![ + (layer0.layer_desc().layer_name(), layer0.metadata()), + (layer1.layer_desc().layer_name(), layer1.metadata()), + ], + }), + UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::Delete(Delete { + layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())], + }), + ]; + + queue.queued_operations.extend(ops.clone()); + + // Operations 0, 3, and 4 are scheduled immediately. + let tasks = queue.schedule_ready(); + assert_same_ops(tasks.iter().map(|t| &t.op), [&ops[0], &ops[3], &ops[4]]); + assert_eq!(queue.queued_operations.len(), 2); + + Ok(()) + } + + /// Non-conflicting uploads are parallelized. + #[test] + fn schedule_upload_parallel() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let tli = make_timeline(); + + // Enqueue three different layer uploads. + let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + + let ops = [ + UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + ]; + + queue.queued_operations.extend(ops.clone()); + + // All uploads should be scheduled concurrently. + let tasks = queue.schedule_ready(); + + assert_same_ops(tasks.iter().map(|t| &t.op), &ops); + assert!(queue.queued_operations.is_empty()); + + Ok(()) + } + + /// Index uploads are serialized. + #[test] + fn schedule_index_serial() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + + // Enqueue three uploads of the current empty index. + let index = Box::new(queue.clean.0.clone()); + + let ops = [ + UploadOp::UploadMetadata { + uploaded: index.clone(), + }, + UploadOp::UploadMetadata { + uploaded: index.clone(), + }, + UploadOp::UploadMetadata { + uploaded: index.clone(), + }, + ]; + + queue.queued_operations.extend(ops.clone()); + + // The uploads should run serially. + for op in ops { + let tasks = queue.schedule_ready(); + assert_eq!(tasks.len(), 1); + assert_same_op(&tasks[0].op, &op); + queue.complete(tasks[0].task_id); + } + + assert!(queue.queued_operations.is_empty()); + + Ok(()) + } + + /// Chains of upload/index operations lead to parallel layer uploads and serial index uploads. + /// This is the common case with layer flushes. + #[test] + fn schedule_index_upload_chain() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let tli = make_timeline(); + + // Enqueue three uploads of the current empty index. + let index = Box::new(queue.clean.0.clone()); + let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let index0 = index_with(&index, &layer0); + let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let index1 = index_with(&index0, &layer1); + let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let index2 = index_with(&index1, &layer2); + + let ops = [ + UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::UploadMetadata { + uploaded: index0.clone(), + }, + UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), + UploadOp::UploadMetadata { + uploaded: index1.clone(), + }, + UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::UploadMetadata { + uploaded: index2.clone(), + }, + ]; + + queue.queued_operations.extend(ops.clone()); + + // The layer uploads should be scheduled immediately. The indexes must wait. + let upload_tasks = queue.schedule_ready(); + assert_same_ops( + upload_tasks.iter().map(|t| &t.op), + [&ops[0], &ops[2], &ops[4]], + ); + + // layer2 completes first. None of the indexes can upload yet. + queue.complete(upload_tasks[2].task_id); + assert!(queue.schedule_ready().is_empty()); + + // layer0 completes. index0 can upload. It completes. + queue.complete(upload_tasks[0].task_id); + let index_tasks = queue.schedule_ready(); + assert_eq!(index_tasks.len(), 1); + assert_same_op(&index_tasks[0].op, &ops[1]); + queue.complete(index_tasks[0].task_id); + + // layer 1 completes. This unblocks index 1 then index 2. + queue.complete(upload_tasks[1].task_id); + + let index_tasks = queue.schedule_ready(); + assert_eq!(index_tasks.len(), 1); + assert_same_op(&index_tasks[0].op, &ops[3]); + queue.complete(index_tasks[0].task_id); + + let index_tasks = queue.schedule_ready(); + assert_eq!(index_tasks.len(), 1); + assert_same_op(&index_tasks[0].op, &ops[5]); + queue.complete(index_tasks[0].task_id); + + assert!(queue.queued_operations.is_empty()); + + Ok(()) + } + + /// A delete can't bypass an index upload if an index ahead of it still references it. + #[test] + fn schedule_index_delete_dereferenced() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let tli = make_timeline(); + + // Create a layer to upload. + let layer = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let index_upload = index_with(&queue.clean.0, &layer); + + // Remove the layer reference in a new index, then delete the layer. + let index_deref = index_without(&index_upload, &layer); + + let ops = [ + // Initial upload. + UploadOp::UploadLayer(layer.clone(), layer.metadata(), None), + UploadOp::UploadMetadata { + uploaded: index_upload.clone(), + }, + // Dereference the layer and delete it. + UploadOp::UploadMetadata { + uploaded: index_deref.clone(), + }, + UploadOp::Delete(Delete { + layers: vec![(layer.layer_desc().layer_name(), layer.metadata())], + }), + ]; + + queue.queued_operations.extend(ops.clone()); + + // Operations are serialized. + for op in ops { + let tasks = queue.schedule_ready(); + assert_eq!(tasks.len(), 1); + assert_same_op(&tasks[0].op, &op); + queue.complete(tasks[0].task_id); + } + assert!(queue.queued_operations.is_empty()); + + Ok(()) + } + + /// An upload with a reused layer name doesn't clobber the previous layer. Specifically, a + /// dereference/upload/reference cycle can't allow the upload to bypass the reference. + #[test] + fn schedule_index_upload_dereferenced() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let tli = make_timeline(); + + // Create a layer to upload. + let layer = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + + // Upload the layer. Then dereference the layer, and upload/reference it again. + let index_upload = index_with(&queue.clean.0, &layer); + let index_deref = index_without(&index_upload, &layer); + let index_ref = index_with(&index_deref, &layer); + + let ops = [ + // Initial upload. + UploadOp::UploadLayer(layer.clone(), layer.metadata(), None), + UploadOp::UploadMetadata { + uploaded: index_upload.clone(), + }, + // Dereference the layer. + UploadOp::UploadMetadata { + uploaded: index_deref.clone(), + }, + // Replace and reference the layer. + UploadOp::UploadLayer(layer.clone(), layer.metadata(), None), + UploadOp::UploadMetadata { + uploaded: index_ref.clone(), + }, + ]; + + queue.queued_operations.extend(ops.clone()); + + // Operations are serialized. + for op in ops { + let tasks = queue.schedule_ready(); + assert_eq!(tasks.len(), 1); + assert_same_op(&tasks[0].op, &op); + queue.complete(tasks[0].task_id); + } + assert!(queue.queued_operations.is_empty()); + + Ok(()) + } + + /// Nothing can bypass a shutdown, and it waits for inprogress tasks. It's never returned from + /// next_ready(), but is left at the head of the queue. + #[test] + fn schedule_shutdown() -> anyhow::Result<()> { + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?; + let tli = make_timeline(); + + let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter + let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + + // Enqueue non-conflicting upload, delete, and index before and after a shutdown. + let ops = [ + UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::Delete(Delete { + layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())], + }), + UploadOp::UploadMetadata { + uploaded: index.clone(), + }, + UploadOp::Shutdown, + UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::Delete(Delete { + layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())], + }), + UploadOp::UploadMetadata { + uploaded: index.clone(), + }, + ]; + + queue.queued_operations.extend(ops.clone()); + + // Schedule the initial operations ahead of the shutdown. + let tasks = queue.schedule_ready(); + + assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]); + assert!(matches!( + queue.queued_operations.front(), + Some(&UploadOp::Shutdown) + )); + + // Complete the initial operations. The shutdown isn't triggered while they're pending. + for task in tasks { + assert!(queue.schedule_ready().is_empty()); + queue.complete(task.task_id); + } + + // The shutdown is triggered the next time we try to pull an operation. It isn't returned, + // but is left in the queue. + assert!(!queue.shutdown_ready.is_closed()); + assert!(queue.next_ready().is_none()); + assert!(queue.shutdown_ready.is_closed()); + + Ok(()) + } + + /// Tests that can_bypass takes name, generation and shard index into account for all operations. + #[test] + fn can_bypass_path() -> anyhow::Result<()> { + let tli = make_timeline(); + + let name0 = &"000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"; + let name1 = &"100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"; + + // Asserts that layers a and b either can or can't bypass each other, for all combinations + // of operations (except Delete and UploadMetadata which are special-cased). + #[track_caller] + fn assert_can_bypass(a: ResidentLayer, b: ResidentLayer, can_bypass: bool) { + let index = IndexPart::empty(TimelineMetadata::example()); + for (a, b) in make_ops(a).into_iter().zip(make_ops(b)) { + match (&a, &b) { + // Deletes can always bypass each other. + (UploadOp::Delete(_), UploadOp::Delete(_)) => assert!(a.can_bypass(&b, &index)), + // Indexes can never bypass each other. + (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => { + assert!(!a.can_bypass(&b, &index)) + } + // For other operations, assert as requested. + (a, b) => assert_eq!(a.can_bypass(b, &index), can_bypass), + } + } + } + + fn make_ops(layer: ResidentLayer) -> Vec { + let mut index = IndexPart::empty(TimelineMetadata::example()); + index + .layer_metadata + .insert(layer.layer_desc().layer_name(), layer.metadata()); + vec![ + UploadOp::UploadLayer(layer.clone(), layer.metadata(), None), + UploadOp::Delete(Delete { + layers: vec![(layer.layer_desc().layer_name(), layer.metadata())], + }), + UploadOp::UploadMetadata { + uploaded: Box::new(index), + }, + ] + } + + // Makes a ResidentLayer. + let layer = |name: &'static str, shard: Option, generation: u32| -> ResidentLayer { + let shard = shard + .map(|n| ShardIndex::new(ShardNumber(n), ShardCount(8))) + .unwrap_or(ShardIndex::unsharded()); + let metadata = LayerFileMetadata { + shard, + generation: Generation::Valid(generation), + file_size: 0, + }; + make_layer_with_metadata(&tli, name, metadata) + }; + + // Same name and metadata can't bypass. This goes both for unsharded and sharded, as well as + // 0 or >0 generation. + assert_can_bypass(layer(name0, None, 0), layer(name0, None, 0), false); + assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(0), 0), false); + assert_can_bypass(layer(name0, None, 1), layer(name0, None, 1), false); + + // Different names can bypass. + assert_can_bypass(layer(name0, None, 0), layer(name1, None, 0), true); + + // Different shards can bypass. Shard 0 is different from unsharded. + assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(1), 0), true); + assert_can_bypass(layer(name0, Some(0), 0), layer(name0, None, 0), true); + + // Different generations can bypass, both sharded and unsharded. + assert_can_bypass(layer(name0, None, 0), layer(name0, None, 1), true); + assert_can_bypass(layer(name0, Some(1), 0), layer(name0, Some(1), 1), true); + + Ok(()) + } +}