From 30de7d0f38ff3afde3fdd5d671ea5951c7a7a71d Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 6 Jun 2023 19:35:17 +0800 Subject: [PATCH] feat(storage): cherry pick #10183 (#10194) --- proto/hummock.proto | 1 + src/meta/src/hummock/compactor_manager.rs | 60 ++++++++++++++++--- src/meta/src/hummock/manager/mod.rs | 2 +- src/meta/src/hummock/manager/tests.rs | 2 + src/storage/benches/bench_compactor.rs | 1 + src/storage/hummock_sdk/src/compact.rs | 7 ++- .../src/hummock/compactor/compactor_runner.rs | 2 + src/storage/src/hummock/compactor/mod.rs | 57 ++++++++++++++---- .../compactor/shared_buffer_compact.rs | 2 + .../src/hummock/compactor/task_progress.rs | 8 ++- 10 files changed, 119 insertions(+), 23 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 7723a4a5dcd48..84c2fbfda4e8d 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -378,6 +378,7 @@ message CompactTaskProgress { uint64 task_id = 1; uint32 num_ssts_sealed = 2; uint32 num_ssts_uploaded = 3; + uint64 num_progress_key = 4; } // The measurement of the workload on a compactor to determine whether it is idle. diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs index f57893623ad02..76ea43eadff43 100644 --- a/src/meta/src/hummock/compactor_manager.rs +++ b/src/meta/src/hummock/compactor_manager.rs @@ -21,6 +21,7 @@ use std::time::{Duration, Instant, SystemTime}; use fail::fail_point; use itertools::Itertools; use parking_lot::RwLock; +use risingwave_hummock_sdk::compact::estimate_state_for_compaction; use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId}; use risingwave_pb::hummock::subscribe_compact_tasks_response::Task; use risingwave_pb::hummock::{ @@ -58,6 +59,7 @@ struct TaskHeartbeat { task: CompactTask, num_ssts_sealed: u32, num_ssts_uploaded: u32, + num_progress_key: u64, create_time: Instant, expire_at: u64, } @@ -325,14 +327,46 @@ impl CompactorManager { now: u64, ) -> Vec<(HummockCompactionTaskId, (HummockContextId, CompactTask))> { let mut cancellable_tasks = vec![]; + const MAX_TASK_DURATION_SEC: u64 = 2700; + for (context_id, heartbeats) in task_heartbeats { { for TaskHeartbeat { - expire_at, task, .. + expire_at, + task, + create_time, + num_ssts_sealed, + num_ssts_uploaded, + num_progress_key, } in heartbeats.values() { - if *expire_at < now { + let task_duration_too_long = + create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC; + if *expire_at < now || task_duration_too_long { + // 1. task heartbeat expire + // 2. task duration is too long cancellable_tasks.push((task.get_task_id(), (*context_id, task.clone()))); + + if task_duration_too_long { + let (need_quota, total_file_count, total_key_count) = + estimate_state_for_compaction(task); + tracing::info!( + "CompactionGroupId {} Task {} duration too long create_time {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} need_quota {} total_file_count {} total_key_count {} target_level {} base_level {} target_sub_level_id {} task_type {}", + task.compaction_group_id, + task.task_id, + create_time, + num_ssts_sealed, + num_ssts_uploaded, + num_progress_key, + need_quota, + total_file_count, + total_key_count, + task.target_level, + task.base_level, + task.target_sub_level_id, + task.task_type, + ); + } } } } @@ -353,6 +387,7 @@ impl CompactorManager { task, num_ssts_sealed: 0, num_ssts_uploaded: 0, + num_progress_key: 0, create_time: Instant::now(), expire_at: now + self.task_expiry_seconds, }, @@ -382,10 +417,16 @@ impl CompactorManager { if let Some(heartbeats) = guard.get_mut(&context_id) { for progress in progress_list { if let Some(task_ref) = heartbeats.get_mut(&progress.task_id) { - // Refresh the expiry of the task as it is showing progress. - task_ref.expire_at = now + self.task_expiry_seconds; - task_ref.num_ssts_sealed = progress.num_ssts_sealed; - task_ref.num_ssts_uploaded = progress.num_ssts_uploaded; + if task_ref.num_ssts_sealed < progress.num_ssts_sealed + || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded + || task_ref.num_progress_key < progress.num_progress_key + { + // Refresh the expiry of the task as it is showing progress. + task_ref.expire_at = now + self.task_expiry_seconds; + task_ref.num_ssts_sealed = progress.num_ssts_sealed; + task_ref.num_ssts_uploaded = progress.num_ssts_uploaded; + task_ref.num_progress_key = progress.num_progress_key; + } } } } @@ -485,9 +526,10 @@ mod tests { task_id: expired[0].1.task_id, num_ssts_sealed: 0, num_ssts_uploaded: 0, + num_progress_key: 0, }], ); - assert_eq!(compactor_manager.get_expired_tasks().len(), 0); + assert_eq!(compactor_manager.get_expired_tasks().len(), 1); // Mimic compaction heartbeat with invalid task id compactor_manager.update_task_heartbeats( @@ -496,9 +538,10 @@ mod tests { task_id: expired[0].1.task_id + 1, num_ssts_sealed: 1, num_ssts_uploaded: 1, + num_progress_key: 100, }], ); - assert_eq!(compactor_manager.get_expired_tasks().len(), 0); + assert_eq!(compactor_manager.get_expired_tasks().len(), 1); // Mimic effective compaction heartbeat compactor_manager.update_task_heartbeats( @@ -507,6 +550,7 @@ mod tests { task_id: expired[0].1.task_id, num_ssts_sealed: 1, num_ssts_uploaded: 1, + num_progress_key: 100, }], ); assert_eq!(compactor_manager.get_expired_tasks().len(), 0); diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index e943e8a3723cb..b266d991c9c11 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -924,7 +924,7 @@ where (count, size) }; - let (compact_task_size, compact_task_file_count) = + let (compact_task_size, compact_task_file_count, _) = estimate_state_for_compaction(&compact_task); if compact_task.input_ssts[0].level_idx == 0 { diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 5b7fbf4fa9612..a9238cbce31e6 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -1004,6 +1004,7 @@ async fn test_hummock_compaction_task_heartbeat() { task_id: compact_task.task_id, num_ssts_sealed: i + 1, num_ssts_uploaded: 0, + num_progress_key: 0, }; compactor_manager.update_task_heartbeats(context_id, &vec![req]); tokio::time::sleep(std::time::Duration::from_millis(250)).await; @@ -1133,6 +1134,7 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() { task_id: compact_task.task_id, num_ssts_sealed: 1, num_ssts_uploaded: 1, + num_progress_key: 0, }; compactor_manager.update_task_heartbeats(context_id, &vec![req.clone()]); diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 2d25e92d54f77..2174b37afaadd 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -201,6 +201,7 @@ async fn compact>(iter: I, sstable_store Arc::new(CompactorMetrics::unused()), iter, DummyCompactionFilter, + None, ) .await .unwrap(); diff --git a/src/storage/hummock_sdk/src/compact.rs b/src/storage/hummock_sdk/src/compact.rs index b60c9b881fe0a..f3a27649fcb86 100644 --- a/src/storage/hummock_sdk/src/compact.rs +++ b/src/storage/hummock_sdk/src/compact.rs @@ -101,22 +101,25 @@ pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) } } -pub fn estimate_state_for_compaction(task: &CompactTask) -> (u64, usize) { +pub fn estimate_state_for_compaction(task: &CompactTask) -> (u64, usize, u64) { let mut total_memory_size = 0; let mut total_file_count = 0; + let mut total_key_count = 0; for level in &task.input_ssts { if level.level_type == LevelType::Nonoverlapping as i32 { if let Some(table) = level.table_infos.first() { total_memory_size += table.file_size * task.splits.len() as u64; + total_key_count += table.total_key_count; } } else { for table in &level.table_infos { total_memory_size += table.file_size; + total_key_count += table.total_key_count; } } total_file_count += level.table_infos.len(); } - (total_memory_size, total_file_count) + (total_memory_size, total_file_count, total_key_count) } diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 95240a8185dd6..4d74fa3aae6bf 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -102,6 +102,8 @@ impl CompactorRunner { del_agg, filter_key_extractor, Some(task_progress), + Some(self.compact_task.task_id), + Some(self.split_index), ) .await?; Ok((self.split_index, ssts, compaction_stat)) diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 632c2003ae5e5..1d2931b18e657 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -41,7 +41,7 @@ use itertools::Itertools; use risingwave_hummock_sdk::compact::{compact_task_to_string, estimate_state_for_compaction}; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap}; -use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::subscribe_compact_tasks_response::Task; use risingwave_pb::hummock::{ @@ -165,15 +165,8 @@ impl Compactor { ]) .start_timer(); - let (need_quota, file_counts) = estimate_state_for_compaction(&compact_task); - tracing::info!( - "Ready to handle compaction task: {} need memory: {} input_file_counts {} target_level {} compression_algorithm {:?}", - compact_task.task_id, - need_quota, - file_counts, - compact_task.target_level, - compact_task.compression_algorithm, - ); + let (need_quota, total_file_count, total_key_count) = + estimate_state_for_compaction(&compact_task); let mut multi_filter = build_multi_compaction_filter(&compact_task); @@ -282,6 +275,18 @@ impl Compactor { let task_memory_capacity_with_parallelism = estimate_task_memory_capacity(context.clone(), &compact_task) * parallelism; + tracing::info!( + "Ready to handle compaction task: {} need memory: {} input_file_counts {} total_key_count {} target_level {} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}", + compact_task.task_id, + need_quota, + total_file_count, + total_key_count, + compact_task.target_level, + compact_task.compression_algorithm, + parallelism, + task_memory_capacity_with_parallelism + ); + // If the task does not have enough memory, it should cancel the task and let the meta // reschedule it, so that it does not occupy the compactor's resources. let memory_detector = context @@ -498,6 +503,7 @@ impl Compactor { task_id, num_ssts_sealed: progress.num_ssts_sealed.load(Ordering::Relaxed), num_ssts_uploaded: progress.num_ssts_uploaded.load(Ordering::Relaxed), + num_progress_key: progress.num_progress_key.load(Ordering::Relaxed), }); } @@ -623,6 +629,7 @@ impl Compactor { compactor_metrics: Arc, mut iter: impl HummockIterator, mut compaction_filter: impl CompactionFilter, + task_progress: Option>, ) -> HummockResult where F: TableBuilderFactory, @@ -655,7 +662,16 @@ impl Compactor { let mut last_table_stats = TableStats::default(); let mut last_table_id = None; let mut compaction_statistics = CompactionStatistics::default(); + let mut progress_key_num: u64 = 0; + const PROGRESS_KEY_INTERVAL: u64 = 100; while iter.is_valid() { + progress_key_num += 1; + + if let Some(task_progress) = task_progress.as_ref() && progress_key_num >= PROGRESS_KEY_INTERVAL { + task_progress.inc_progress_key(progress_key_num); + progress_key_num = 0; + } + let mut iter_key = iter.key(); compaction_statistics.iter_total_key_counts += 1; @@ -760,6 +776,12 @@ impl Compactor { iter.next().await?; } + + if let Some(task_progress) = task_progress.as_ref() && progress_key_num > 0 { + // Avoid losing the progress_key_num in the last Interval + task_progress.inc_progress_key(progress_key_num); + } + if let Some(last_table_id) = last_table_id.take() { table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats)); } @@ -797,6 +819,8 @@ impl Compactor { del_agg: Arc, filter_key_extractor: Arc, task_progress: Option>, + task_id: Option, + split_index: Option, ) -> HummockResult<(Vec, CompactionStatistics)> { // Monitor time cost building shared buffer to SSTs. let compact_timer = if self.context.is_share_buffer_compact { @@ -908,6 +932,16 @@ impl Compactor { debug_assert!(ssts .iter() .all(|table_info| table_info.sst_info.get_table_ids().is_sorted())); + + if task_id.is_some() { + // skip shared buffer compaction + tracing::info!( + "Finish Task {:?} split_index {:?} sst count {}", + task_id, + split_index, + ssts.len() + ); + } Ok((ssts, table_stats_map)) } @@ -934,7 +968,7 @@ impl Compactor { let mut sst_builder = CapacitySplitTableBuilder::new( builder_factory, self.context.compactor_metrics.clone(), - task_progress, + task_progress.clone(), del_agg, self.task_config.key_range.clone(), self.task_config.is_target_l0_or_lbase, @@ -947,6 +981,7 @@ impl Compactor { self.context.compactor_metrics.clone(), iter, compaction_filter, + task_progress, ) .await?; diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 304082f44fbc8..8a3f28f6aeadd 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -481,6 +481,8 @@ impl SharedBufferCompactRunner { del_agg, filter_key_extractor, None, + None, + None, ) .await?; Ok((self.split_index, ssts, table_stats_map)) diff --git a/src/storage/src/hummock/compactor/task_progress.rs b/src/storage/src/hummock/compactor/task_progress.rs index 2f4f0283a56bf..ae175dbaafb7d 100644 --- a/src/storage/src/hummock/compactor/task_progress.rs +++ b/src/storage/src/hummock/compactor/task_progress.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use parking_lot::Mutex; @@ -26,6 +26,7 @@ pub type TaskProgressManagerRef = Arc