Skip to content

Commit

Permalink
edit2
Browse files Browse the repository at this point in the history
  • Loading branch information
hust-hhb committed Dec 5, 2024
1 parent 17c0f0f commit b5fdfa0
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 21 deletions.
6 changes: 3 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,10 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage =
static_cast<double>(doris::GlobalMemoryArbitrator::process_memory_usage());
double process_memory_usage =
cast_set<double>(doris::GlobalMemoryArbitrator::process_memory_usage());
bool memory_usage_high =
process_memory_usage > static_cast<double>(MemInfo::soft_mem_limit()) * 0.8;
process_memory_usage > cast_set<double>(MemInfo::soft_mem_limit()) * 0.8;
if (cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
}

int64_t CloudSizeBasedCumulativeCompactionPolicy::cloud_promotion_size(CloudTablet* t) const {
int64_t promotion_size = int64_t(static_cast<double>(t->base_size()) * _promotion_ratio);
int64_t promotion_size = int64_t(cast_set<double>(t->base_size()) * _promotion_ratio);
// promotion_size is between _size_based_promotion_size and _size_based_promotion_min_size
return promotion_size > _promotion_size ? _promotion_size
: promotion_size < _promotion_min_size ? _promotion_min_size
Expand Down
13 changes: 6 additions & 7 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Status CloudStorageEngine::open() {
_memtable_flush_executor = std::make_unique<MemTableFlushExecutor>();
// Use file cache disks number
_memtable_flush_executor->init(
static_cast<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size()));
cast_set<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size()));

_calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>();
_calc_delete_bitmap_executor->init();
Expand Down Expand Up @@ -323,9 +323,8 @@ void CloudStorageEngine::_check_file_cache_ttl_block_valid() {
for (const auto& rowset : rowsets) {
int64_t ttl_seconds = tablet->tablet_meta()->ttl_seconds();
if (rowset->newest_write_timestamp() + ttl_seconds <= UnixSeconds()) continue;
for (int64_t seg_id = 0; seg_id < rowset->num_segments(); seg_id++) {
auto hash = Segment::file_cache_key(rowset->rowset_id().to_string(),
static_cast<uint32_t>(seg_id));
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); seg_id++) {
auto hash = Segment::file_cache_key(rowset->rowset_id().to_string(), seg_id);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(hash);
file_cache->update_ttl_atime(hash);
}
Expand Down Expand Up @@ -548,7 +547,7 @@ std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_task
std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0,
[](int a, auto& b) { return a + b.second.size(); });
int num_base =
static_cast<int>(submitted_base_compactions.size() + submitted_full_compactions.size());
cast_set<int>(submitted_base_compactions.size() + submitted_full_compactions.size());
int n = thread_per_disk - num_cumu - num_base;
if (compaction_type == CompactionType::BASE_COMPACTION) {
// We need to reserve at least one thread for cumulative compaction,
Expand Down Expand Up @@ -826,7 +825,7 @@ Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
// cumu
std::string_view cumu = "CumulativeCompaction";
rapidjson::Value cumu_key;
cumu_key.SetString(cumu.data(), static_cast<uint32_t>(cumu.length()), root.GetAllocator());
cumu_key.SetString(cumu.data(), cast_set<uint32_t>(cumu.length()), root.GetAllocator());
rapidjson::Document cumu_arr;
cumu_arr.SetArray();
for (auto& [tablet_id, v] : _submitted_cumu_compactions) {
Expand All @@ -838,7 +837,7 @@ Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
// base
std::string_view base = "BaseCompaction";
rapidjson::Value base_key;
base_key.SetString(base.data(), static_cast<uint32_t>(base.length()), root.GetAllocator());
base_key.SetString(base.data(), cast_set<uint32_t>(base.length()), root.GetAllocator());
rapidjson::Document base_arr;
base_arr.SetArray();
for (auto& [tablet_id, _] : _submitted_base_compactions) {
Expand Down
6 changes: 3 additions & 3 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_write

return RowsetFactory::create_rowset_writer(_engine, context, false)
.transform([&](auto&& writer) {
writer->set_segment_start_id(rowset.num_segments());
writer->set_segment_start_id(cast_set<int32_t>(rowset.num_segments()));
return writer;
});
}
Expand Down Expand Up @@ -618,7 +618,7 @@ void CloudTablet::get_compaction_status(std::string* json_result) {
}
rapidjson::Value value;
std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), static_cast<uint32_t>(version_str.length()),
value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
versions_arr.GetAllocator());
versions_arr.PushBack(value, versions_arr.GetAllocator());
last_version = ver.second;
Expand All @@ -632,7 +632,7 @@ void CloudTablet::get_compaction_status(std::string* json_result) {
for (auto& rowset : stale_rowsets) {
rapidjson::Value value;
std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), static_cast<uint32_t>(version_str.length()),
value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
stale_versions_arr.GetAllocator());
stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
}
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
SegCompactionCandidatesSharedPtr& segments) {
segments = std::make_shared<SegCompactionCandidates>();
// skip last (maybe active) segment
int64_t last_segment = _num_segment - 1;
int32_t last_segment = _num_segment - 1;
size_t task_bytes = 0;
uint32_t task_rows = 0;
int32_t segid;
Expand Down Expand Up @@ -654,7 +654,7 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
_num_rows_written += rowset->num_rows();
_total_data_size += rowset->rowset_meta()->data_disk_size();
_total_index_size += rowset->rowset_meta()->index_disk_size();
_num_segment += rowset->num_segments();
_num_segment += cast_set<int32_t>(rowset->num_segments());
// append key_bounds to current rowset
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));

Expand Down Expand Up @@ -1044,7 +1044,7 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStati
if (segment_id >= _segment_num_rows.size()) {
_segment_num_rows.resize(segment_id + 1);
}
_segment_num_rows[segid_offset] = static_cast<uint32_t>(segstat.row_num);
_segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num);
}
VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id
<< " row_num:" << segstat.row_num << " data_size:" << segstat.data_size
Expand All @@ -1053,7 +1053,7 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStati
{
std::lock_guard<std::mutex> lock(_segment_set_mutex);
_segment_set.add(segid_offset);
while (_segment_set.contains(static_cast<uint32_t>(_num_segment))) {
while (_segment_set.contains(_num_segment)) {
_num_segment++;
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class BaseBetaRowsetWriter : public RowsetWriter {

int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); };

void set_segment_start_id(int64_t start_id) override {
void set_segment_start_id(int32_t start_id) override {
_segment_creator.set_segment_start_id(start_id);
_segment_start_id = start_id;
}
Expand Down Expand Up @@ -226,7 +226,7 @@ class BaseBetaRowsetWriter : public RowsetWriter {
return Status::OK();
}

std::atomic<int64_t> _num_segment; // number of consecutive flushed segments
std::atomic<int32_t> _num_segment; // number of consecutive flushed segments
roaring::Roaring _segment_set; // bitmap set to record flushed segment id
std::mutex _segment_set_mutex; // mutex for _segment_set
int32_t _segment_start_id; // basic write start from 0, partial update may be different
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class RowsetWriter {

virtual int32_t allocate_segment_id() = 0;

virtual void set_segment_start_id(int64_t num_segment) { LOG(FATAL) << "not supported!"; }
virtual void set_segment_start_id(int num_segment) { LOG(FATAL) << "not supported!"; }

virtual int64_t delete_bitmap_ns() { return 0; }

Expand Down

0 comments on commit b5fdfa0

Please sign in to comment.