Skip to content

Commit

Permalink
feat(storage): lazily remove instance data in uploader to avoid data …
Browse files Browse the repository at this point in the history
…loss (#19931)
  • Loading branch information
wenym1 authored Dec 25, 2024
1 parent 78ab59d commit f660c2e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 52 deletions.
138 changes: 104 additions & 34 deletions src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ struct LocalInstanceUnsyncData {
sealed_data: VecDeque<LocalInstanceEpochData>,
// newer data comes first
flushing_imms: VecDeque<SharedBufferBatchId>,
is_destroyed: bool,
}

impl LocalInstanceUnsyncData {
Expand All @@ -476,10 +477,12 @@ impl LocalInstanceUnsyncData {
current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
sealed_data: VecDeque::new(),
flushing_imms: Default::default(),
is_destroyed: false,
}
}

fn add_imm(&mut self, imm: UploaderImm) {
assert!(!self.is_destroyed);
assert_eq!(self.table_id, imm.table_id);
self.current_epoch_data
.as_mut()
Expand All @@ -488,6 +491,7 @@ impl LocalInstanceUnsyncData {
}

fn local_seal_epoch(&mut self, next_epoch: HummockEpoch) -> HummockEpoch {
assert!(!self.is_destroyed);
let data = self
.current_epoch_data
.as_mut()
Expand Down Expand Up @@ -611,6 +615,10 @@ impl LocalInstanceUnsyncData {
}
}
}

fn is_finished(&self) -> bool {
self.is_destroyed && self.sealed_data.is_empty()
}
}

struct TableUnsyncData {
Expand Down Expand Up @@ -889,18 +897,19 @@ impl UnsyncData {
}
}

fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) -> Option<TableUnsyncData> {
if let Some(table_id) = self.instance_table_id.remove(&instance_id) {
fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
if let Some(table_id) = self.instance_table_id.get(&instance_id) {
debug!(instance_id, "destroy instance");
let table_data = self.table_data.get_mut(&table_id).expect("should exist");
assert!(table_data.instance_data.remove(&instance_id).is_some());
if table_data.is_empty() {
Some(self.table_data.remove(&table_id).expect("should exist"))
} else {
None
}
} else {
None
let table_data = self.table_data.get_mut(table_id).expect("should exist");
let instance_data = table_data
.instance_data
.get_mut(&instance_id)
.expect("should exist");
assert!(
!instance_data.is_destroyed,
"cannot destroy an instance for twice"
);
instance_data.is_destroyed = true;
}
}

Expand All @@ -918,9 +927,20 @@ impl UnsyncData {
}
}
assert!(
table_unsync_data.instance_data.is_empty(),
table_unsync_data
.instance_data
.values()
.all(|instance| instance.is_destroyed),
"should be clear when dropping the read version instance"
);
for instance_id in table_unsync_data.instance_data.keys() {
assert_eq!(
*table_id,
self.instance_table_id
.remove(instance_id)
.expect("should exist")
);
}
}
}
debug_assert!(self
Expand Down Expand Up @@ -997,6 +1017,18 @@ impl UploaderData {
flush_payload.insert(instance_id, payload);
}
}
table_data.instance_data.retain(|instance_id, data| {
// remove the finished instances
if data.is_finished() {
assert_eq!(
self.unsync_data.instance_table_id.remove(instance_id),
Some(*table_id)
);
false
} else {
true
}
});
if let Some((direction, watermarks)) = table_watermarks {
Self::add_table_watermarks(
&mut all_table_watermarks,
Expand Down Expand Up @@ -1414,28 +1446,7 @@ impl HummockUploader {
let UploaderState::Working(data) = &mut self.state else {
return;
};
if let Some(removed_table_data) = data.unsync_data.may_destroy_instance(instance_id) {
data.task_manager.remove_table_spill_tasks(
removed_table_data.table_id,
removed_table_data
.spill_tasks
.into_values()
.flat_map(|task_ids| task_ids.into_iter())
.filter(|task_id| {
if let Some((_, table_ids)) = data.unsync_data.spilled_data.get_mut(task_id)
{
assert!(table_ids.remove(&removed_table_data.table_id));
if table_ids.is_empty() {
data.unsync_data.spilled_data.remove(task_id);
}
false
} else {
true
}
}),
)
}
data.check_upload_task_consistency();
data.unsync_data.may_destroy_instance(instance_id);
}

pub(crate) fn min_uncommitted_sst_id(&self) -> Option<HummockSstableObjectId> {
Expand Down Expand Up @@ -1785,6 +1796,65 @@ pub(crate) mod tests {
);
}

#[tokio::test]
async fn test_uploader_destroy_instance_before_sync() {
let mut uploader = test_uploader(dummy_success_upload_future);
let epoch1 = INITIAL_EPOCH.next_epoch();
uploader.start_epochs_for_test([epoch1]);
let imm = gen_imm(epoch1).await;
uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);

let (sync_tx, sync_rx) = oneshot::channel();
uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
assert_eq!(1, uploader.data().syncing_data.len());
let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
assert!(syncing_data.uploaded.is_empty());
assert!(!syncing_data.remaining_uploading_tasks.is_empty());

let staging_sst = uploader.next_uploaded_sst().await;
assert_eq!(&vec![epoch1], staging_sst.epochs());
assert_eq!(
&HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
staging_sst.imm_ids()
);
assert_eq!(
&dummy_success_upload_output().new_value_ssts,
staging_sst.sstable_infos()
);

match sync_rx.await {
Ok(Ok(data)) => {
let SyncedData {
uploaded_ssts,
table_watermarks,
} = data;
assert_eq!(1, uploaded_ssts.len());
let staging_sst = &uploaded_ssts[0];
assert_eq!(&vec![epoch1], staging_sst.epochs());
assert_eq!(
&HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
staging_sst.imm_ids()
);
assert_eq!(
&dummy_success_upload_output().new_value_ssts,
staging_sst.sstable_infos()
);
assert!(table_watermarks.is_empty());
}
_ => unreachable!(),
};
assert!(!uploader
.data()
.unsync_data
.table_data
.contains_key(&TEST_TABLE_ID));
}

#[tokio::test]
async fn test_empty_uploader_sync() {
let mut uploader = test_uploader(dummy_success_upload_future);
Expand Down
18 changes: 0 additions & 18 deletions src/storage/src/hummock/event_handler/uploader/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,6 @@ impl TaskManager {
)
}

pub(super) fn remove_table_spill_tasks(
&mut self,
table_id: TableId,
task_ids: impl Iterator<Item = UploadingTaskId>,
) {
for task_id in task_ids {
let entry = self.tasks.get_mut(&task_id).expect("should exist");
let empty = must_match!(&mut entry.status, UploadingTaskStatus::Spilling(table_ids) => {
assert!(table_ids.remove(&table_id));
table_ids.is_empty()
});
if empty {
let task = self.tasks.remove(&task_id).expect("should exist").task;
task.join_handle.abort();
}
}
}

pub(super) fn sync(
&mut self,
context: &UploaderContext,
Expand Down

0 comments on commit f660c2e

Please sign in to comment.