Skip to content

Commit

Permalink
pageserver: reorder upload queue when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jan 2, 2025
1 parent 4ab9eaa commit dad6139
Show file tree
Hide file tree
Showing 2 changed files with 913 additions and 89 deletions.
96 changes: 19 additions & 77 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,22 @@
//! The contract between client and its user is that the user is responsible of
//! scheduling operations in an order that keeps the remote consistent as
//! described above.
//!
//! From the user's perspective, the operations are executed sequentially.
//! Internally, the client knows which operations can be performed in parallel,
//! and which operations act like a "barrier" that require preceding operations
//! to finish. The calling code just needs to call the schedule-functions in the
//! correct order, and the client will parallelize the operations in a way that
//! is safe.
//! is safe. For more details, see `UploadOp::can_bypass`.
//!
//! The caller should be careful with deletion, though. They should not delete
//! local files that have been scheduled for upload but not yet finished uploading.
//! Otherwise the upload will fail. To wait for an upload to finish, use
//! the 'wait_completion' function (more on that later.)
//!
//! (TODO: the above isn't accurate with the introduction of `UploadOp::can_bypass`,
//! consider removing it as well as `wait_completion`)
//!
//! All of this relies on the following invariants:
//!
//! - We rely on read-after write consistency in the remote storage.
Expand Down Expand Up @@ -1797,57 +1801,16 @@ impl RemoteTimelineClient {
Ok(())
}

///
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
///
/// The caller needs to already hold the `upload_queue` lock.
/// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does,
/// to avoid tenants starving other tenants.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some(next_op) = upload_queue.queued_operations.front() {
// Can we run this task now?
let can_run_now = match next_op {
UploadOp::UploadLayer(..) => {
// Can always be scheduled.
true
}
UploadOp::UploadMetadata { .. } => {
// These can only be performed after all the preceding operations
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(..) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}

UploadOp::Barrier(_) | UploadOp::Shutdown => {
upload_queue.inprogress_tasks.is_empty()
}
};

// If we cannot launch this task, don't look any further.
//
// In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
// them now, but we don't try to do that currently. For example, if the frontmost task
// is an index-file upload that cannot proceed until preceding uploads have finished, we
// could still start layer uploads that were scheduled later.
if !can_run_now {
break;
}
while let Some(mut next_op) = upload_queue.next_ready() {
debug!("starting op: {next_op}");

if let UploadOp::Shutdown = next_op {
// leave the op in the queue but do not start more tasks; it will be dropped when
// the stop is called.
upload_queue.shutdown_ready.close();
break;
}

// We can launch this task. Remove it from the queue first.
let mut next_op = upload_queue.queued_operations.pop_front().unwrap();

debug!("starting op: {}", next_op);

// Update the counters and prepare
// Prepare upload.
match &mut next_op {
UploadOp::UploadLayer(layer, meta, mode) => {
if upload_queue
Expand All @@ -1858,18 +1821,14 @@ impl RemoteTimelineClient {
} else {
*mode = Some(OpType::MayReorder)
}
upload_queue.num_inprogress_layer_uploads += 1;
}
UploadOp::UploadMetadata { .. } => {
upload_queue.num_inprogress_metadata_uploads += 1;
}
UploadOp::UploadMetadata { .. } => {}
UploadOp::Delete(Delete { layers }) => {
for (name, meta) in layers {
upload_queue
.recently_deleted
.insert((name.clone(), meta.generation));
}
upload_queue.num_inprogress_deletions += 1;
}
UploadOp::Barrier(sender) => {
sender.send_replace(());
Expand Down Expand Up @@ -1967,6 +1926,8 @@ impl RemoteTimelineClient {

let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => {
// TODO: check if this mechanism can be removed now that can_bypass() performs
// conflict checks during scheduling.
if let Some(OpType::FlushDeletion) = mode {
if self.config.read().unwrap().block_deletions {
// Of course, this is not efficient... but usually the queue should be empty.
Expand Down Expand Up @@ -2189,13 +2150,8 @@ impl RemoteTimelineClient {
upload_queue.inprogress_tasks.remove(&task.task_id);

let lsn_update = match task.op {
UploadOp::UploadLayer(_, _, _) => {
upload_queue.num_inprogress_layer_uploads -= 1;
None
}
UploadOp::UploadLayer(_, _, _) => None,
UploadOp::UploadMetadata { ref uploaded } => {
upload_queue.num_inprogress_metadata_uploads -= 1;

// the task id is reused as a monotonicity check for storing the "clean"
// IndexPart.
let last_updater = upload_queue.clean.1;
Expand Down Expand Up @@ -2229,10 +2185,7 @@ impl RemoteTimelineClient {
None
}
}
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions -= 1;
None
}
UploadOp::Delete(_) => None,
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
};

Expand Down Expand Up @@ -2356,9 +2309,6 @@ impl RemoteTimelineClient {
visible_remote_consistent_lsn: initialized
.visible_remote_consistent_lsn
.clone(),
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::default(),
queued_operations: VecDeque::default(),
#[cfg(feature = "testing")]
Expand All @@ -2385,14 +2335,6 @@ impl RemoteTimelineClient {
}
};

// consistency check
assert_eq!(
qi.num_inprogress_layer_uploads
+ qi.num_inprogress_metadata_uploads
+ qi.num_inprogress_deletions,
qi.inprogress_tasks.len()
);

// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
Expand Down Expand Up @@ -2852,8 +2794,8 @@ mod tests {
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
assert!(upload_queue.queued_operations.is_empty());
assert!(upload_queue.inprogress_tasks.len() == 2);
assert!(upload_queue.num_inprogress_layer_uploads == 2);
assert_eq!(upload_queue.inprogress_tasks.len(), 2);
assert_eq!(upload_queue.num_inprogress_layer_uploads(), 2);

// also check that `latest_file_changes` was updated
assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
Expand Down Expand Up @@ -2923,8 +2865,8 @@ mod tests {
// Deletion schedules upload of the index file, and the file deletion itself
assert_eq!(upload_queue.queued_operations.len(), 2);
assert_eq!(upload_queue.inprogress_tasks.len(), 1);
assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
assert_eq!(upload_queue.num_inprogress_deletions, 0);
assert_eq!(upload_queue.num_inprogress_layer_uploads(), 1);
assert_eq!(upload_queue.num_inprogress_deletions(), 0);
assert_eq!(
upload_queue.latest_files_changes_since_metadata_upload_scheduled,
0
Expand Down
Loading

0 comments on commit dad6139

Please sign in to comment.