From b014cf0c2d32caa0ede6c22e0c30e4ecfdcae3cf Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 12 Nov 2024 12:32:38 +0800 Subject: [PATCH] 1 --- be/src/cloud/cloud_cumulative_compaction.cpp | 8 +- .../cloud_cumulative_compaction_policy.cpp | 180 ++++++++++++++++++ be/src/common/config.cpp | 4 +- .../pipeline/exec/exchange_sink_operator.cpp | 4 +- 4 files changed, 189 insertions(+), 7 deletions(-) diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index b4ad6cf0d28ee2..8828a32a88dfb5 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -143,7 +143,7 @@ Status CloudCumulativeCompaction::prepare_compact() { LOG_WARNING("failed to prepare cumu compaction") .tag("job_id", _uuid) .tag("msg", resp.status().msg()); - return Status::Error("no suitable versions"); + return Status::Error("no suitable versions1"); } } else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) { (static_cast(_tablet.get()))->set_alter_version(resp.alter_version()); @@ -481,7 +481,7 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() { }); } if (candidate_rowsets.empty()) { - return Status::Error("no suitable versions"); + return Status::Error("no suitable versions2"); } std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); if (auto st = check_version_continuity(candidate_rowsets); !st.ok()) { @@ -507,12 +507,12 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() { &_last_delete_version, &compaction_score); if (_input_rowsets.empty()) { - return Status::Error("no suitable versions"); + return Status::Error("no suitable versions3"); } else if (_input_rowsets.size() == 1 && !_input_rowsets.front()->rowset_meta()->is_segments_overlapping()) { VLOG_DEBUG << "there is only one rowset and not overlapping. tablet_id=" << _tablet->tablet_id() << ", version=" << _input_rowsets.front()->version(); - return Status::Error("no suitable versions"); + return Status::Error("no suitable versions4"); } return Status::OK(); } diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp b/be/src/cloud/cloud_cumulative_compaction_policy.cpp index f9af469e56f60a..1fea9ae1b0ade4 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp +++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp @@ -17,6 +17,8 @@ #include "cloud/cloud_cumulative_compaction_policy.h" +#include + #include #include #include @@ -53,21 +55,43 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector* input_rowsets, Version* last_delete_version, size_t* compaction_score, bool allow_delete) { + LOG_WARNING("[lyk_debug]:") + .tag("tablet id", tablet->tablet_id()) + .tag("candidate_rowsets", candidate_rowsets.size()) + .tag("start rowset", candidate_rowsets.begin()->get()->rowset_id()) + .tag("max comapction score", max_compaction_score) + .tag("min comapction score", min_compaction_score) + .tag("last delete version", last_delete_version->to_string()) + .tag("allow delete", allow_delete); size_t promotion_size = cloud_promotion_size(tablet); auto max_version = tablet->max_version().first; + LOG_WARNING("[lyk_debug]: Cloud promotion size and max version") + .tag("promotion_size", promotion_size) + .tag("max_version", max_version); int transient_size = 0; *compaction_score = 0; int64_t total_size = 0; for (auto& rowset : candidate_rowsets) { // check whether this rowset is delete version + LOG_WARNING("[lyk_debug]: Processing rowset") + .tag("rowset_id", rowset->rowset_id()) + .tag("rowset_version", rowset->version().to_string()) + .tag("rowset_size", rowset->rowset_meta()->total_disk_size()); + if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) { *last_delete_version = rowset->version(); + LOG_WARNING("[lyk_debug]: Rowset has delete predicate, handling delete version") + .tag("last_delete_version", last_delete_version->to_string()); if (!input_rowsets->empty()) { // we meet a delete version, and there were other versions before. // we should compact those version before handling them over to base compaction + LOG_WARNING("[lyk_debug]: Break to compact earlier versions due to delete version"); break; } else { // we meet a delete version, and no other versions before, skip it and continue + LOG_WARNING( + "[lyk_debug]: Clear input rowsets and reset compaction score due to delete " + "version"); input_rowsets->clear(); *compaction_score = 0; transient_size = 0; @@ -77,11 +101,19 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( if (tablet->tablet_state() == TABLET_NOTREADY) { // If tablet under alter, keep latest 10 version so that base tablet max version // not merged in new tablet, and then we can copy data from base tablet + LOG_WARNING("[lyk_debug]: Tablet is not ready, checking version constraint") + .tag("tablet_state", "TABLET_NOTREADY") + .tag("max_version", max_version); if (rowset->version().second < max_version - 10) { + LOG_WARNING("[lyk_debug]: Skipping rowset due to version being too old") + .tag("rowset_version", rowset->version().to_string()); continue; } } if (*compaction_score >= max_compaction_score) { + LOG_WARNING("[lyk_debug]: Reached maximum compaction score") + .tag("compaction_score", *compaction_score) + .tag("max_compaction_score", max_compaction_score); // got enough segments break; } @@ -90,21 +122,35 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( transient_size += 1; input_rowsets->push_back(rowset); + LOG_WARNING("[lyk_debug]: Added rowset to compaction") + .tag("current_compaction_score", *compaction_score) + .tag("total_size", total_size); } if (total_size >= promotion_size) { + LOG_WARNING("[lyk_debug]: Total size exceeds promotion size, returning transient size") + .tag("total_size", total_size) + .tag("promotion_size", promotion_size); return transient_size; } // if there is delete version, do compaction directly if (last_delete_version->first != -1) { + LOG_WARNING("[lyk_debug]: Delete version detected, performing compaction directly") + .tag("last_delete_version", last_delete_version->to_string()); if (input_rowsets->size() == 1) { auto rs_meta = input_rowsets->front()->rowset_meta(); + LOG_WARNING("[lyk_debug]: Checking if rowset is overlapping") + .tag("rowset_id", input_rowsets->front()->rowset_id()) + .tag("is_segments_overlapping", rs_meta->is_segments_overlapping()); // if there is only one rowset and not overlapping, // we do not need to do cumulative compaction if (!rs_meta->is_segments_overlapping()) { input_rowsets->clear(); *compaction_score = 0; + LOG_WARNING( + "[lyk_debug]: Rowset not overlapping, clearing input rowsets and resetting " + "score"); } } return transient_size; @@ -112,41 +158,71 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( auto rs_begin = input_rowsets->begin(); size_t new_compaction_score = *compaction_score; + LOG_WARNING("[lyk_debug]: Starting rowset compaction checks"); while (rs_begin != input_rowsets->end()) { auto& rs_meta = (*rs_begin)->rowset_meta(); int current_level = _level_size(rs_meta->total_disk_size()); int remain_level = _level_size(total_size - rs_meta->total_disk_size()); + // Log the current level and remaining level + LOG_WARNING("[lyk_debug]: Checking rowset compaction level") + .tag("rowset_id", (*rs_begin)->rowset_id()) + .tag("current_level", current_level) + .tag("remain_level", remain_level); // if current level less then remain level, input rowsets contain current rowset // and process return; otherwise, input rowsets do not contain current rowset. if (current_level <= remain_level) { + LOG_WARNING( + "[lyk_debug]: Current level is less than or equal to remain level, breaking " + "the loop") + .tag("rowset_id", (*rs_begin)->rowset_id()); break; } total_size -= rs_meta->total_disk_size(); new_compaction_score -= rs_meta->get_compaction_score(); ++rs_begin; + LOG_WARNING("[lyk_debug]: Adjusting total size and compaction score") + .tag("new_total_size", total_size) + .tag("new_compaction_score", new_compaction_score) + .tag("next_rowset_id", rs_begin != input_rowsets->end() + ? (*rs_begin)->rowset_id().to_string() + : "end"); } if (rs_begin == input_rowsets->end()) { // No suitable level size found in `input_rowsets` + LOG_WARNING("[lyk_debug]: No suitable level size found, checking configurations"); if (config::prioritize_query_perf_in_compaction && tablet->keys_type() != DUP_KEYS) { // While tablet's key type is not `DUP_KEYS`, compacting rowset in such tablets has a significant // positive impact on queries and reduces space amplification, so we ignore level limitation and // pick candidate rowsets as input rowsets. + LOG_WARNING("[lyk_debug]: Prioritizing query performance, bypassing level limitation") + .tag("tablet_keys_type", tablet->keys_type()) + .tag("config_prioritize_query_perf", + config::prioritize_query_perf_in_compaction); return transient_size; } else if (*compaction_score >= max_compaction_score) { // Score of `input_rowsets` exceed max compaction score, which means `input_rowsets` will never change and // this tablet will never execute cumulative compaction. MUST execute compaction on these `input_rowsets` // to reduce compaction score. + LOG_WARNING("[lyk_debug]: Compaction score exceeds max score, terminating compaction") + .tag("compaction_score", *compaction_score) + .tag("max_compaction_score", max_compaction_score); RowsetSharedPtr rs_with_max_score; uint32_t max_score = 1; for (auto& rs : *input_rowsets) { if (rs->rowset_meta()->get_compaction_score() > max_score) { max_score = rs->rowset_meta()->get_compaction_score(); rs_with_max_score = rs; + LOG_WARNING("[lyk_debug]: Found rowset with higher compaction score") + .tag("rowset_id", rs->rowset_id()) + .tag("compaction_score", max_score); } } if (rs_with_max_score) { input_rowsets->clear(); input_rowsets->push_back(std::move(rs_with_max_score)); *compaction_score = max_score; + LOG_WARNING("[lyk_debug]: Chosen rowset for compaction with max score") + .tag("rowset_id", rs_with_max_score->rowset_id()) + .tag("compaction_score", max_score); return transient_size; } // Exceeding max compaction score, do compaction on all candidate rowsets anyway @@ -155,6 +231,9 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( } input_rowsets->erase(input_rowsets->begin(), rs_begin); *compaction_score = new_compaction_score; + LOG_WARNING("[lyk_debug]: After level size check, updated compaction score and total size") + .tag("new_compaction_score", *compaction_score) + .tag("total_size", total_size); VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = " << *compaction_score << ", total_size = " << total_size @@ -164,19 +243,34 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( // empty return if (input_rowsets->empty()) { + LOG_WARNING("[lyk_debug]: Input rowsets are empty, skipping compaction") + .tag("transient_size", transient_size); return transient_size; } // if we have a sufficient number of segments, we should process the compaction. // otherwise, we check number of segments and total_size whether can do compaction. if (total_size < _compaction_min_size && *compaction_score < min_compaction_score) { + LOG_WARNING("[lyk_debug]: Insufficient size and compaction score, clearing rowsets") + .tag("total_size", total_size) + .tag("compaction_score", *compaction_score) + .tag("min_compaction_score", min_compaction_score); input_rowsets->clear(); *compaction_score = 0; } else if (total_size >= _compaction_min_size && input_rowsets->size() == 1) { + LOG_WARNING( + "[lyk_debug]: Only one rowset and sufficient total size, checking for overlapping " + "segments") + .tag("total_size", total_size) + .tag("compaction_min_size", _compaction_min_size) + .tag("input_rowsets_size", input_rowsets->size()); auto rs_meta = input_rowsets->front()->rowset_meta(); // if there is only one rowset and not overlapping, // we do not need to do compaction if (!rs_meta->is_segments_overlapping()) { + LOG_WARNING( + "[lyk_debug]: Rowset segments are not overlapping, clearing rowsets and " + "resetting compaction score"); input_rowsets->clear(); *compaction_score = 0; } @@ -220,7 +314,17 @@ int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector* input_rowsets, Version* last_delete_version, size_t* compaction_score, bool allow_delete) { + LOG_WARNING("[time lyk_debug]:") + .tag("tablet id", tablet->tablet_id()) + .tag("candidate_rowsets", candidate_rowsets.size()) + .tag("start rowset", candidate_rowsets.begin()->get()->rowset_id()) + .tag("max comapction score", max_compaction_score) + .tag("min comapction score", min_compaction_score) + .tag("last delete version", last_delete_version->to_string()) + .tag("allow delete", allow_delete); if (tablet->tablet_state() == TABLET_NOTREADY) { + LOG_WARNING("[time lyk_debug]: Tablet state is not ready, skipping compaction") + .tag("tablet_state", tablet->tablet_state()); return 0; } @@ -233,15 +337,27 @@ int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( int64_t total_size = 0; for (const auto& rowset : candidate_rowsets) { + LOG_WARNING("[time lyk_debug]: Processing rowset") + .tag("rowset_id", rowset->rowset_id()) + .tag("has_delete_predicate", rowset->rowset_meta()->has_delete_predicate()); + // check whether this rowset is delete version if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) { *last_delete_version = rowset->version(); + LOG_WARNING("[time lyk_debug]: Rowset has delete predicate, processing delete version") + .tag("last_delete_version", last_delete_version->to_string()); if (!input_rowsets->empty()) { + LOG_WARNING( + "[time lyk_debug]: Found delete version, and there were other versions before. " + "Break compaction."); // we meet a delete version, and there were other versions before. // we should compact those version before handling them over to base compaction break; } else { // we meet a delete version, and no other versions before, skip it and continue + LOG_WARNING( + "[time lyk_debug]: Found delete version, but no previous versions, skipping it " + "and clearing rowsets"); input_rowsets->clear(); *compaction_score = 0; transient_size = 0; @@ -255,11 +371,20 @@ int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( transient_size += 1; input_rowsets->push_back(rowset); + LOG_WARNING("[time lyk_debug]: Updated compaction score and total size") + .tag("compaction_score", *compaction_score) + .tag("total_size", total_size) + .tag("transient_size", transient_size); // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) { + LOG_WARNING("[time lyk_debug]: Compaction goal size met, checking rowset overlap") + .tag("compaction_goal_size_mbytes", compaction_goal_size_mbytes) + .tag("total_size", total_size); if (input_rowsets->size() == 1 && !input_rowsets->front()->rowset_meta()->is_segments_overlapping()) { + LOG_WARNING("[time lyk_debug]: Only one non-overlapping rowset, skipping compaction") + .tag("rowset_id", input_rowsets->front()->rowset_id()); // Only 1 non-overlapping rowset, skip it input_rowsets->clear(); *compaction_score = 0; @@ -270,40 +395,76 @@ int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( } else if ( *compaction_score >= config::compaction_max_rowset_count) { // If the number of rowsets is too large: FDB_ERROR_CODE_TXN_TOO_LARGE + LOG_WARNING( + "[time lyk_debug]: Compaction score exceeds max rowset count, terminating " + "compaction") + .tag("compaction_score", *compaction_score) + .tag("compaction_max_rowset_count", config::compaction_max_rowset_count); return transient_size; } } // if there is delete version, do compaction directly if (last_delete_version->first != -1) { + LOG_WARNING("[time lyk_debug]: Last delete version found, checking rowset compaction logic") + .tag("last_delete_version", last_delete_version->to_string()); + // if there is only one rowset and not overlapping, // we do not need to do cumulative compaction if (input_rowsets->size() == 1 && !input_rowsets->front()->rowset_meta()->is_segments_overlapping()) { + LOG_WARNING( + "[time lyk_debug]: Only one rowset and segments are not overlapping, clearing " + "rowsets") + .tag("input_rowsets_size", input_rowsets->size()) + .tag("is_segments_overlapping", + input_rowsets->front()->rowset_meta()->is_segments_overlapping()); input_rowsets->clear(); *compaction_score = 0; } return transient_size; } + LOG_WARNING("[time lyk_debug]: Checking compaction score against file count threshold") + .tag("compaction_score", *compaction_score) + .tag("compaction_file_count_threshold", + tablet->tablet_meta()->time_series_compaction_file_count_threshold()); // Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold if (*compaction_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) { + LOG_WARNING( + "[time lyk_debug]: Compaction score exceeds or meets file count threshold, skipping " + "compaction") + .tag("compaction_score", *compaction_score); return transient_size; } // Condition 3: level1 achieve compaction_goal_size std::vector level1_rowsets; if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) { + LOG_WARNING("[time lyk_debug]: Checking if level 1 rowsets can meet compaction goal size") + .tag("level_threshold", + tablet->tablet_meta()->time_series_compaction_level_threshold()) + .tag("compaction_goal_size_mbytes", compaction_goal_size_mbytes); int64_t continuous_size = 0; for (const auto& rowset : candidate_rowsets) { const auto& rs_meta = rowset->rowset_meta(); if (rs_meta->compaction_level() == 0) { + LOG_WARNING("[time lyk_debug]: Encountered level 0 rowset, breaking out of loop") + .tag("rowset_id", rowset->rowset_id()) + .tag("compaction_level", rs_meta->compaction_level()); break; } level1_rowsets.push_back(rowset); continuous_size += rs_meta->total_disk_size(); + LOG_WARNING("[time lyk_debug]: Adding rowset to level1 rowsets") + .tag("rowset_id", rowset->rowset_id()) + .tag("continuous_size", continuous_size); if (level1_rowsets.size() >= 2) { if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) { + LOG_WARNING( + "[time lyk_debug]: Achieved compaction goal size, swapping level1 rowsets") + .tag("continuous_size", continuous_size) + .tag("compaction_goal_size", compaction_goal_size_mbytes * 1024 * 1024); input_rowsets->swap(level1_rowsets); return input_rowsets->size(); } @@ -317,10 +478,24 @@ int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( int64_t cumu_interval = now - last_cumu; // Condition 4: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second + LOG_WARNING("[time lyk_debug]: Checking compaction time interval") + .tag("last_cumu", last_cumu) + .tag("current_time", now) + .tag("cumu_interval", cumu_interval) + .tag("compaction_time_threshold", + tablet->tablet_meta()->time_series_compaction_time_threshold_seconds()); if (cumu_interval > (tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) { + LOG_WARNING( + "[time lyk_debug]: Compaction time interval exceeded, checking level threshold and " + "rowsets") + .tag("compaction_time_interval", cumu_interval); if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) { if (input_rowsets->empty() && level1_rowsets.size() >= 2) { + LOG_WARNING( + "[time lyk_debug]: Time threshold exceeded and enough level1 rowsets, " + "swapping") + .tag("level1_rowsets_size", level1_rowsets.size()); input_rowsets->swap(level1_rowsets); return input_rowsets->size(); } @@ -335,11 +510,16 @@ int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( input_rowsets, candidate_rowsets, tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold()); if (!input_rowsets->empty()) { + LOG_WARNING("[time lyk_debug]: Too many consecutive empty rowsets detected") + .tag("tablet_id", tablet->tablet_id()) + .tag("empty_rowsets_size", input_rowsets->size()); VLOG_NOTICE << "tablet is " << tablet->tablet_id() << ", there are too many consecutive empty rowsets, size is " << input_rowsets->size(); return 0; } + LOG_WARNING("[time lyk_debug]: Reset compaction score after empty rowsets check") + .tag("compaction_score", *compaction_score); *compaction_score = 0; return 0; diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0cda0ffdcc515c..b1c3ca2e228978 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1691,7 +1691,9 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t if (config::is_cloud_mode()) { auto st = config::set_config("enable_file_cache", "true", true, true); - LOG(INFO) << "set config enable_file_cache " << "true" << " " << st; + LOG(INFO) << "set config enable_file_cache " + << "true" + << " " << st; } return true; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 5f63aaf5973d77..675b0250c1c0bd 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -343,8 +343,8 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) { _compression_type = state->fragement_transmission_compression_type(); if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { if (_output_tuple_id == -1) { - RETURN_IF_ERROR( - vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, + _child_x->row_desc())); } else { auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false));