diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index f7b02649efae..37b64ebb6e17 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -466,6 +466,7 @@ struct LocalInstanceUnsyncData { sealed_data: VecDeque, // newer data comes first flushing_imms: VecDeque, + is_destroyed: bool, } impl LocalInstanceUnsyncData { @@ -476,10 +477,12 @@ impl LocalInstanceUnsyncData { current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)), sealed_data: VecDeque::new(), flushing_imms: Default::default(), + is_destroyed: false, } } fn add_imm(&mut self, imm: UploaderImm) { + assert!(!self.is_destroyed); assert_eq!(self.table_id, imm.table_id); self.current_epoch_data .as_mut() @@ -488,6 +491,7 @@ impl LocalInstanceUnsyncData { } fn local_seal_epoch(&mut self, next_epoch: HummockEpoch) -> HummockEpoch { + assert!(!self.is_destroyed); let data = self .current_epoch_data .as_mut() @@ -611,6 +615,10 @@ impl LocalInstanceUnsyncData { } } } + + fn is_finished(&self) -> bool { + self.is_destroyed && self.sealed_data.is_empty() + } } struct TableUnsyncData { @@ -889,18 +897,19 @@ impl UnsyncData { } } - fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) -> Option { - if let Some(table_id) = self.instance_table_id.remove(&instance_id) { + fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) { + if let Some(table_id) = self.instance_table_id.get(&instance_id) { debug!(instance_id, "destroy instance"); - let table_data = self.table_data.get_mut(&table_id).expect("should exist"); - assert!(table_data.instance_data.remove(&instance_id).is_some()); - if table_data.is_empty() { - Some(self.table_data.remove(&table_id).expect("should exist")) - } else { - None - } - } else { - None + let table_data = self.table_data.get_mut(table_id).expect("should exist"); + let instance_data = table_data + .instance_data + .get_mut(&instance_id) + .expect("should exist"); + assert!( + !instance_data.is_destroyed, + "cannot destroy an instance for twice" + ); + instance_data.is_destroyed = true; } } @@ -918,9 +927,20 @@ impl UnsyncData { } } assert!( - table_unsync_data.instance_data.is_empty(), + table_unsync_data + .instance_data + .values() + .all(|instance| instance.is_destroyed), "should be clear when dropping the read version instance" ); + for instance_id in table_unsync_data.instance_data.keys() { + assert_eq!( + *table_id, + self.instance_table_id + .remove(instance_id) + .expect("should exist") + ); + } } } debug_assert!(self @@ -997,6 +1017,18 @@ impl UploaderData { flush_payload.insert(instance_id, payload); } } + table_data.instance_data.retain(|instance_id, data| { + // remove the finished instances + if data.is_finished() { + assert_eq!( + self.unsync_data.instance_table_id.remove(instance_id), + Some(*table_id) + ); + false + } else { + true + } + }); if let Some((direction, watermarks)) = table_watermarks { Self::add_table_watermarks( &mut all_table_watermarks, @@ -1414,28 +1446,7 @@ impl HummockUploader { let UploaderState::Working(data) = &mut self.state else { return; }; - if let Some(removed_table_data) = data.unsync_data.may_destroy_instance(instance_id) { - data.task_manager.remove_table_spill_tasks( - removed_table_data.table_id, - removed_table_data - .spill_tasks - .into_values() - .flat_map(|task_ids| task_ids.into_iter()) - .filter(|task_id| { - if let Some((_, table_ids)) = data.unsync_data.spilled_data.get_mut(task_id) - { - assert!(table_ids.remove(&removed_table_data.table_id)); - if table_ids.is_empty() { - data.unsync_data.spilled_data.remove(task_id); - } - false - } else { - true - } - }), - ) - } - data.check_upload_task_consistency(); + data.unsync_data.may_destroy_instance(instance_id); } pub(crate) fn min_uncommitted_sst_id(&self) -> Option { @@ -1785,6 +1796,65 @@ pub(crate) mod tests { ); } + #[tokio::test] + async fn test_uploader_destroy_instance_before_sync() { + let mut uploader = test_uploader(dummy_success_upload_future); + let epoch1 = INITIAL_EPOCH.next_epoch(); + uploader.start_epochs_for_test([epoch1]); + let imm = gen_imm(epoch1).await; + uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1); + uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone()); + uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); + uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID); + + let (sync_tx, sync_rx) = oneshot::channel(); + uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); + assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch()); + assert_eq!(1, uploader.data().syncing_data.len()); + let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap(); + assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0); + assert!(syncing_data.uploaded.is_empty()); + assert!(!syncing_data.remaining_uploading_tasks.is_empty()); + + let staging_sst = uploader.next_uploaded_sst().await; + assert_eq!(&vec![epoch1], staging_sst.epochs()); + assert_eq!( + &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]), + staging_sst.imm_ids() + ); + assert_eq!( + &dummy_success_upload_output().new_value_ssts, + staging_sst.sstable_infos() + ); + + match sync_rx.await { + Ok(Ok(data)) => { + let SyncedData { + uploaded_ssts, + table_watermarks, + } = data; + assert_eq!(1, uploaded_ssts.len()); + let staging_sst = &uploaded_ssts[0]; + assert_eq!(&vec![epoch1], staging_sst.epochs()); + assert_eq!( + &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]), + staging_sst.imm_ids() + ); + assert_eq!( + &dummy_success_upload_output().new_value_ssts, + staging_sst.sstable_infos() + ); + assert!(table_watermarks.is_empty()); + } + _ => unreachable!(), + }; + assert!(!uploader + .data() + .unsync_data + .table_data + .contains_key(&TEST_TABLE_ID)); + } + #[tokio::test] async fn test_empty_uploader_sync() { let mut uploader = test_uploader(dummy_success_upload_future); diff --git a/src/storage/src/hummock/event_handler/uploader/task_manager.rs b/src/storage/src/hummock/event_handler/uploader/task_manager.rs index fd53fae1db32..c23aa648a722 100644 --- a/src/storage/src/hummock/event_handler/uploader/task_manager.rs +++ b/src/storage/src/hummock/event_handler/uploader/task_manager.rs @@ -136,24 +136,6 @@ impl TaskManager { ) } - pub(super) fn remove_table_spill_tasks( - &mut self, - table_id: TableId, - task_ids: impl Iterator, - ) { - for task_id in task_ids { - let entry = self.tasks.get_mut(&task_id).expect("should exist"); - let empty = must_match!(&mut entry.status, UploadingTaskStatus::Spilling(table_ids) => { - assert!(table_ids.remove(&table_id)); - table_ids.is_empty() - }); - if empty { - let task = self.tasks.remove(&task_id).expect("should exist").task; - task.join_handle.abort(); - } - } - } - pub(super) fn sync( &mut self, context: &UploaderContext,