Skip to content

Commit

Permalink
feat(storage): cherry pick #10183 (#10194)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Jun 6, 2023
1 parent cdfc947 commit 30de7d0
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 23 deletions.
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ message CompactTaskProgress {
uint64 task_id = 1;
uint32 num_ssts_sealed = 2;
uint32 num_ssts_uploaded = 3;
uint64 num_progress_key = 4;
}

// The measurement of the workload on a compactor to determine whether it is idle.
Expand Down
60 changes: 52 additions & 8 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::time::{Duration, Instant, SystemTime};
use fail::fail_point;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_hummock_sdk::compact::estimate_state_for_compaction;
use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId};
use risingwave_pb::hummock::subscribe_compact_tasks_response::Task;
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -58,6 +59,7 @@ struct TaskHeartbeat {
task: CompactTask,
num_ssts_sealed: u32,
num_ssts_uploaded: u32,
num_progress_key: u64,
create_time: Instant,
expire_at: u64,
}
Expand Down Expand Up @@ -325,14 +327,46 @@ impl CompactorManager {
now: u64,
) -> Vec<(HummockCompactionTaskId, (HummockContextId, CompactTask))> {
let mut cancellable_tasks = vec![];
const MAX_TASK_DURATION_SEC: u64 = 2700;

for (context_id, heartbeats) in task_heartbeats {
{
for TaskHeartbeat {
expire_at, task, ..
expire_at,
task,
create_time,
num_ssts_sealed,
num_ssts_uploaded,
num_progress_key,
} in heartbeats.values()
{
if *expire_at < now {
let task_duration_too_long =
create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
if *expire_at < now || task_duration_too_long {
// 1. task heartbeat expire
// 2. task duration is too long
cancellable_tasks.push((task.get_task_id(), (*context_id, task.clone())));

if task_duration_too_long {
let (need_quota, total_file_count, total_key_count) =
estimate_state_for_compaction(task);
tracing::info!(
"CompactionGroupId {} Task {} duration too long create_time {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} need_quota {} total_file_count {} total_key_count {} target_level {} base_level {} target_sub_level_id {} task_type {}",
task.compaction_group_id,
task.task_id,
create_time,
num_ssts_sealed,
num_ssts_uploaded,
num_progress_key,
need_quota,
total_file_count,
total_key_count,
task.target_level,
task.base_level,
task.target_sub_level_id,
task.task_type,
);
}
}
}
}
Expand All @@ -353,6 +387,7 @@ impl CompactorManager {
task,
num_ssts_sealed: 0,
num_ssts_uploaded: 0,
num_progress_key: 0,
create_time: Instant::now(),
expire_at: now + self.task_expiry_seconds,
},
Expand Down Expand Up @@ -382,10 +417,16 @@ impl CompactorManager {
if let Some(heartbeats) = guard.get_mut(&context_id) {
for progress in progress_list {
if let Some(task_ref) = heartbeats.get_mut(&progress.task_id) {
// Refresh the expiry of the task as it is showing progress.
task_ref.expire_at = now + self.task_expiry_seconds;
task_ref.num_ssts_sealed = progress.num_ssts_sealed;
task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
if task_ref.num_ssts_sealed < progress.num_ssts_sealed
|| task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
|| task_ref.num_progress_key < progress.num_progress_key
{
// Refresh the expiry of the task as it is showing progress.
task_ref.expire_at = now + self.task_expiry_seconds;
task_ref.num_ssts_sealed = progress.num_ssts_sealed;
task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
task_ref.num_progress_key = progress.num_progress_key;
}
}
}
}
Expand Down Expand Up @@ -485,9 +526,10 @@ mod tests {
task_id: expired[0].1.task_id,
num_ssts_sealed: 0,
num_ssts_uploaded: 0,
num_progress_key: 0,
}],
);
assert_eq!(compactor_manager.get_expired_tasks().len(), 0);
assert_eq!(compactor_manager.get_expired_tasks().len(), 1);

// Mimic compaction heartbeat with invalid task id
compactor_manager.update_task_heartbeats(
Expand All @@ -496,9 +538,10 @@ mod tests {
task_id: expired[0].1.task_id + 1,
num_ssts_sealed: 1,
num_ssts_uploaded: 1,
num_progress_key: 100,
}],
);
assert_eq!(compactor_manager.get_expired_tasks().len(), 0);
assert_eq!(compactor_manager.get_expired_tasks().len(), 1);

// Mimic effective compaction heartbeat
compactor_manager.update_task_heartbeats(
Expand All @@ -507,6 +550,7 @@ mod tests {
task_id: expired[0].1.task_id,
num_ssts_sealed: 1,
num_ssts_uploaded: 1,
num_progress_key: 100,
}],
);
assert_eq!(compactor_manager.get_expired_tasks().len(), 0);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ where
(count, size)
};

let (compact_task_size, compact_task_file_count) =
let (compact_task_size, compact_task_file_count, _) =
estimate_state_for_compaction(&compact_task);

if compact_task.input_ssts[0].level_idx == 0 {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ async fn test_hummock_compaction_task_heartbeat() {
task_id: compact_task.task_id,
num_ssts_sealed: i + 1,
num_ssts_uploaded: 0,
num_progress_key: 0,
};
compactor_manager.update_task_heartbeats(context_id, &vec![req]);
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
Expand Down Expand Up @@ -1133,6 +1134,7 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() {
task_id: compact_task.task_id,
num_ssts_sealed: 1,
num_ssts_uploaded: 1,
num_progress_key: 0,
};
compactor_manager.update_task_heartbeats(context_id, &vec![req.clone()]);

Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ async fn compact<I: HummockIterator<Direction = Forward>>(iter: I, sstable_store
Arc::new(CompactorMetrics::unused()),
iter,
DummyCompactionFilter,
None,
)
.await
.unwrap();
Expand Down
7 changes: 5 additions & 2 deletions src/storage/hummock_sdk/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,25 @@ pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo)
}
}

pub fn estimate_state_for_compaction(task: &CompactTask) -> (u64, usize) {
pub fn estimate_state_for_compaction(task: &CompactTask) -> (u64, usize, u64) {
let mut total_memory_size = 0;
let mut total_file_count = 0;
let mut total_key_count = 0;
for level in &task.input_ssts {
if level.level_type == LevelType::Nonoverlapping as i32 {
if let Some(table) = level.table_infos.first() {
total_memory_size += table.file_size * task.splits.len() as u64;
total_key_count += table.total_key_count;
}
} else {
for table in &level.table_infos {
total_memory_size += table.file_size;
total_key_count += table.total_key_count;
}
}

total_file_count += level.table_infos.len();
}

(total_memory_size, total_file_count)
(total_memory_size, total_file_count, total_key_count)
}
2 changes: 2 additions & 0 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ impl CompactorRunner {
del_agg,
filter_key_extractor,
Some(task_progress),
Some(self.compact_task.task_id),
Some(self.split_index),
)
.await?;
Ok((self.split_index, ssts, compaction_stat))
Expand Down
57 changes: 46 additions & 11 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use itertools::Itertools;
use risingwave_hummock_sdk::compact::{compact_task_to_string, estimate_state_for_compaction};
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap};
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_pb::hummock::subscribe_compact_tasks_response::Task;
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -165,15 +165,8 @@ impl Compactor {
])
.start_timer();

let (need_quota, file_counts) = estimate_state_for_compaction(&compact_task);
tracing::info!(
"Ready to handle compaction task: {} need memory: {} input_file_counts {} target_level {} compression_algorithm {:?}",
compact_task.task_id,
need_quota,
file_counts,
compact_task.target_level,
compact_task.compression_algorithm,
);
let (need_quota, total_file_count, total_key_count) =
estimate_state_for_compaction(&compact_task);

let mut multi_filter = build_multi_compaction_filter(&compact_task);

Expand Down Expand Up @@ -282,6 +275,18 @@ impl Compactor {
let task_memory_capacity_with_parallelism =
estimate_task_memory_capacity(context.clone(), &compact_task) * parallelism;

tracing::info!(
"Ready to handle compaction task: {} need memory: {} input_file_counts {} total_key_count {} target_level {} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}",
compact_task.task_id,
need_quota,
total_file_count,
total_key_count,
compact_task.target_level,
compact_task.compression_algorithm,
parallelism,
task_memory_capacity_with_parallelism
);

// If the task does not have enough memory, it should cancel the task and let the meta
// reschedule it, so that it does not occupy the compactor's resources.
let memory_detector = context
Expand Down Expand Up @@ -498,6 +503,7 @@ impl Compactor {
task_id,
num_ssts_sealed: progress.num_ssts_sealed.load(Ordering::Relaxed),
num_ssts_uploaded: progress.num_ssts_uploaded.load(Ordering::Relaxed),
num_progress_key: progress.num_progress_key.load(Ordering::Relaxed),
});
}

Expand Down Expand Up @@ -623,6 +629,7 @@ impl Compactor {
compactor_metrics: Arc<CompactorMetrics>,
mut iter: impl HummockIterator<Direction = Forward>,
mut compaction_filter: impl CompactionFilter,
task_progress: Option<Arc<TaskProgress>>,
) -> HummockResult<CompactionStatistics>
where
F: TableBuilderFactory,
Expand Down Expand Up @@ -655,7 +662,16 @@ impl Compactor {
let mut last_table_stats = TableStats::default();
let mut last_table_id = None;
let mut compaction_statistics = CompactionStatistics::default();
let mut progress_key_num: u64 = 0;
const PROGRESS_KEY_INTERVAL: u64 = 100;
while iter.is_valid() {
progress_key_num += 1;

if let Some(task_progress) = task_progress.as_ref() && progress_key_num >= PROGRESS_KEY_INTERVAL {
task_progress.inc_progress_key(progress_key_num);
progress_key_num = 0;
}

let mut iter_key = iter.key();
compaction_statistics.iter_total_key_counts += 1;

Expand Down Expand Up @@ -760,6 +776,12 @@ impl Compactor {

iter.next().await?;
}

if let Some(task_progress) = task_progress.as_ref() && progress_key_num > 0 {
// Avoid losing the progress_key_num in the last Interval
task_progress.inc_progress_key(progress_key_num);
}

if let Some(last_table_id) = last_table_id.take() {
table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
}
Expand Down Expand Up @@ -797,6 +819,8 @@ impl Compactor {
del_agg: Arc<CompactionDeleteRanges>,
filter_key_extractor: Arc<FilterKeyExtractorImpl>,
task_progress: Option<Arc<TaskProgress>>,
task_id: Option<HummockCompactionTaskId>,
split_index: Option<usize>,
) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
// Monitor time cost building shared buffer to SSTs.
let compact_timer = if self.context.is_share_buffer_compact {
Expand Down Expand Up @@ -908,6 +932,16 @@ impl Compactor {
debug_assert!(ssts
.iter()
.all(|table_info| table_info.sst_info.get_table_ids().is_sorted()));

if task_id.is_some() {
// skip shared buffer compaction
tracing::info!(
"Finish Task {:?} split_index {:?} sst count {}",
task_id,
split_index,
ssts.len()
);
}
Ok((ssts, table_stats_map))
}

Expand All @@ -934,7 +968,7 @@ impl Compactor {
let mut sst_builder = CapacitySplitTableBuilder::new(
builder_factory,
self.context.compactor_metrics.clone(),
task_progress,
task_progress.clone(),
del_agg,
self.task_config.key_range.clone(),
self.task_config.is_target_l0_or_lbase,
Expand All @@ -947,6 +981,7 @@ impl Compactor {
self.context.compactor_metrics.clone(),
iter,
compaction_filter,
task_progress,
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ impl SharedBufferCompactRunner {
del_agg,
filter_key_extractor,
None,
None,
None,
)
.await?;
Ok((self.split_index, ssts, table_stats_map))
Expand Down
8 changes: 7 additions & 1 deletion src/storage/src/hummock/compactor/task_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;

use parking_lot::Mutex;
Expand All @@ -26,6 +26,7 @@ pub type TaskProgressManagerRef = Arc<Mutex<HashMap<HummockCompactionTaskId, Arc
pub struct TaskProgress {
pub num_ssts_sealed: AtomicU32,
pub num_ssts_uploaded: AtomicU32,
pub num_progress_key: AtomicU64,
}

impl TaskProgress {
Expand All @@ -36,6 +37,11 @@ impl TaskProgress {
pub fn inc_ssts_uploaded(&self) {
self.num_ssts_uploaded.fetch_add(1, Ordering::Relaxed);
}

pub fn inc_progress_key(&self, inc_key_num: u64) {
self.num_progress_key
.fetch_add(inc_key_num, Ordering::Relaxed);
}
}

/// An RAII object that contains a [`TaskProgress`] and shares it to all the splits of the task.
Expand Down

0 comments on commit 30de7d0

Please sign in to comment.