Skip to content

Commit

Permalink
Merge branch 'master' into fuzzy_disable_rf
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg authored Dec 24, 2024
2 parents d8ec9fd + ff690e4 commit ac68047
Show file tree
Hide file tree
Showing 3,501 changed files with 7,477 additions and 103,731 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
7 changes: 4 additions & 3 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ Status CloudBaseCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -343,7 +344,7 @@ Status CloudBaseCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("num_output_delete_bitmap", output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ Status CloudCumulativeCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms, range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -299,7 +300,8 @@ Status CloudCumulativeCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("number_output_delete_bitmap",
output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
void _check_file_cache_ttl_block_valid();

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id;

bool synced = false;
do {
Expand Down
15 changes: 11 additions & 4 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/cumulative_compaction_time_series_policy.h"
Expand Down Expand Up @@ -408,6 +409,9 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
auto rs_it = _stale_rs_version_map.find(v_ts->version());
if (rs_it != _stale_rs_version_map.end()) {
expired_rowsets.push_back(rs_it->second);
LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id()
<< " rowset_id=" << rs_it->second->rowset_id().to_string()
<< " version=" << rs_it->first.to_string();
_stale_rs_version_map.erase(rs_it);
} else {
LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
Expand Down Expand Up @@ -657,11 +661,14 @@ void CloudTablet::get_compaction_status(std::string* json_result) {
}

void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
_cumulative_point = new_point;
return;
}
// cumulative point should only be reset to -1, or be increased
CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point)
<< "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
_cumulative_point = new_point;
// FIXME: could happen in currently unresolved race conditions
LOG(WARNING) << "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
}

std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaActiveQueriesScanner::_s_tbls_columns = {
// name, type, size
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), true},
Expand Down Expand Up @@ -92,7 +94,7 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
_active_query_block->reserve(_block_rows_limit);

if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>("active queries schema is not match for FE and BE");
}
Expand All @@ -119,7 +121,7 @@ Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* bl

if (_active_query_block == nullptr) {
RETURN_IF_ERROR(_get_active_queries_block_from_fe());
_total_rows = _active_query_block->rows();
_total_rows = (int)_active_query_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_columns = {
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
Expand Down Expand Up @@ -76,7 +78,7 @@ Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Bloc

ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_block(
_task_stats_block.get());
_total_rows = _task_stats_block->rows();
_total_rows = (int)_task_stats_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaCatalogMetaCacheStatsScanner::_s_tbls_columns = {
{"CATALOG_NAME", TYPE_STRING, sizeof(StringRef), true},
{"CACHE_NAME", TYPE_STRING, sizeof(StringRef), true},
Expand Down Expand Up @@ -86,7 +88,7 @@ Status SchemaCatalogMetaCacheStatsScanner::_get_meta_cache_from_fe() {
_block->reserve(_block_rows_limit);

if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>(
"catalog meta cache stats schema is not match for FE and BE");
Expand Down Expand Up @@ -115,7 +117,7 @@ Status SchemaCatalogMetaCacheStatsScanner::get_next_block_internal(vectorized::B

if (_block == nullptr) {
RETURN_IF_ERROR(_get_meta_cache_from_fe());
_total_rows = _block->rows();
_total_rows = (int)_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
12 changes: 7 additions & 5 deletions be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "vec/common/string_ref.h"

namespace doris {
#include "common/compile_check_begin.h"

class RuntimeState;

namespace vectorized {
Expand Down Expand Up @@ -411,7 +413,7 @@ Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
{
std::vector<StringRef> strs(columns_num);
int offset_index = 0;
int cur_table_index = _table_index - _desc_result.tables_offset.size();
int cur_table_index = int(_table_index - _desc_result.tables_offset.size());

for (int i = 0; i < columns_num; ++i) {
while (_desc_result.tables_offset[offset_index] <= i) {
Expand Down Expand Up @@ -609,14 +611,14 @@ Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
// EXTRA
{
StringRef str = StringRef("", 0);
std::vector<void*> datas(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 17, datas));
std::vector<void*> filled_values(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 17, filled_values));
}
// PRIVILEGES
{
StringRef str = StringRef("", 0);
std::vector<void*> datas(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 18, datas));
std::vector<void*> filled_values(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 18, filled_values));
}
// COLUMN_COMMENT
{
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/schema_scanner/schema_file_cache_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaFileCacheStatisticsScanner::_s_tbls_columns = {
// name, type, size
Expand Down Expand Up @@ -68,7 +69,7 @@ Status SchemaFileCacheStatisticsScanner::get_next_block_internal(vectorized::Blo
_stats_block->reserve(_block_rows_limit);

ExecEnv::GetInstance()->file_cache_factory()->get_cache_stats_block(_stats_block.get());
_total_rows = _stats_block->rows();
_total_rows = (int)_stats_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/schema_scanner/schema_partitions_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

class RuntimeState;
namespace vectorized {
class Block;
Expand Down Expand Up @@ -138,7 +140,7 @@ Status SchemaPartitionsScanner::get_onedb_info_from_fe(int64_t dbId) {
}
_partitions_block->reserve(_block_rows_limit);
if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>("table options schema is not match for FE and BE");
}
Expand Down Expand Up @@ -178,7 +180,7 @@ Status SchemaPartitionsScanner::get_next_block_internal(vectorized::Block* block
if (_db_index < _db_result.db_ids.size()) {
RETURN_IF_ERROR(get_onedb_info_from_fe(_db_result.db_ids[_db_index]));
_row_idx = 0; // reset row index so that it start filling for next block.
_total_rows = _partitions_block->rows();
_total_rows = (int)_partitions_block->rows();
_db_index++;
}
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/schema_scanner/schema_processlist_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaProcessListScanner::_s_processlist_columns = {
{"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false},
Expand Down Expand Up @@ -126,7 +127,7 @@ Status SchemaProcessListScanner::_fill_block_impl(vectorized::Block* block) {
datas[row_idx] = &int_vals[row_idx];
} else if (_s_processlist_columns[col_idx].type == TYPE_DATETIMEV2) {
auto* dv = reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(&int_vals[row_idx]);
if (!dv->from_date_str(column_value.data(), column_value.size(), -1,
if (!dv->from_date_str(column_value.data(), (int)column_value.size(), -1,
config::allow_zero_date)) {
return Status::InternalError(
"process list meet invalid data, column={}, data={}, reason={}",
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/schema_scanner/schema_routine_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaRoutinesScanner::_s_tbls_columns = {
{"SPECIFIC_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
{"ROUTINE_CATALOG", TYPE_VARCHAR, sizeof(StringRef), true},
Expand Down Expand Up @@ -94,7 +96,7 @@ Status SchemaRoutinesScanner::get_block_from_fe() {
}
_routines_block->reserve(_block_rows_limit);
if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>("routine table schema is not match for FE and BE");
}
Expand All @@ -121,7 +123,7 @@ Status SchemaRoutinesScanner::get_next_block_internal(vectorized::Block* block,

if (_routines_block == nullptr) {
RETURN_IF_ERROR(get_block_from_fe());
_total_rows = _routines_block->rows();
_total_rows = (int)_routines_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
Loading

0 comments on commit ac68047

Please sign in to comment.