Skip to content

Commit

Permalink
Merge branch 'branch-2.1' into case-from-master
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaokang authored Aug 11, 2024
2 parents 7d42b58 + 0ee0dd6 commit b1edda3
Show file tree
Hide file tree
Showing 1,050 changed files with 33,203 additions and 8,958 deletions.
18 changes: 14 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatur

std::atomic_ulong s_report_version(time(nullptr) * 10000);

void increase_report_version() {
s_report_version.fetch_add(1, std::memory_order_relaxed);
}

// FIXME(plat1ko): Paired register and remove task info
bool register_task_info(const TTaskType::type task_type, int64_t signature) {
if (task_type == TTaskType::type::PUSH_STORAGE_POLICY ||
Expand Down Expand Up @@ -197,7 +201,7 @@ void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req
}

if (status.ok()) {
s_report_version.fetch_add(1, std::memory_order_relaxed);
increase_report_version();
}

// Return result to fe
Expand Down Expand Up @@ -1363,7 +1367,9 @@ void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req)
COUNTER_UPDATE(profile->total_time_counter(), elapsed_time);
std::stringstream ss;
profile->pretty_print(&ss);
LOG(WARNING) << "create tablet cost(s) " << elapsed_time / 1e9 << std::endl << ss.str();
LOG(WARNING) << "create tablet " << create_tablet_req.tablet_id << " cost(s) "
<< elapsed_time / 1e9 << std::endl
<< ss.str();
}
});
DorisMetrics::instance()->create_tablet_requests_total->increment(1);
Expand All @@ -1379,7 +1385,7 @@ void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req)
.tag("tablet_id", create_tablet_req.tablet_id)
.error(status);
} else {
s_report_version.fetch_add(1, std::memory_order_relaxed);
increase_report_version();
// get path hash of the created tablet
TabletSharedPtr tablet;
{
Expand Down Expand Up @@ -1474,7 +1480,7 @@ void push_callback(const TAgentTaskRequest& req) {
.tag("signature", req.signature)
.tag("tablet_id", push_req.tablet_id)
.tag("push_type", push_req.push_type);
++s_report_version;
increase_report_version();
finish_task_request.__set_finish_tablet_infos(tablet_infos);
} else {
LOG_WARNING("failed to execute push task")
Expand Down Expand Up @@ -1741,6 +1747,10 @@ void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
LOG_INFO("successfully clone tablet")
.tag("signature", req.signature)
.tag("tablet_id", clone_req.tablet_id);
if (engine_task.is_new_tablet()) {
increase_report_version();
finish_task_request.__set_report_version(s_report_version);
}
finish_task_request.__set_finish_tablet_infos(tablet_infos);
}

Expand Down
14 changes: 9 additions & 5 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1");

DEFINE_Bool(enable_base_compaction_idle_sched, "true");
DEFINE_mInt64(base_compaction_min_rowset_num, "5");
DEFINE_mInt64(base_compaction_max_compaction_score, "20");
DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");

Expand Down Expand Up @@ -408,6 +409,7 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64");
// cumulative compaction policy: min and max delta file's number
DEFINE_mInt64(cumulative_compaction_min_deltas, "5");
DEFINE_mInt64(cumulative_compaction_max_deltas, "1000");
DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10");

// This config can be set to limit thread number in multiget thread pool.
DEFINE_mInt32(multi_get_max_threads, "10");
Expand Down Expand Up @@ -1033,11 +1035,11 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
// tree depth for bkd index
DEFINE_Int32(max_depth_in_bkd_tree, "32");
// index compaction
DEFINE_mBool(inverted_index_compaction_enable, "false");
DEFINE_mBool(inverted_index_compaction_enable, "true");
// Only for debug, do not use in production
DEFINE_mBool(debug_inverted_index_compaction, "false");
// index by RAM directory
DEFINE_mBool(inverted_index_ram_dir_enable, "false");
DEFINE_mBool(inverted_index_ram_dir_enable, "true");
// use num_broadcast_buffer blocks as buffer to do broadcast
DEFINE_Int32(num_broadcast_buffer, "32");

Expand All @@ -1051,8 +1053,6 @@ DEFINE_mInt64(max_tablet_io_errors, "-1");
DEFINE_Int32(tablet_path_check_interval_seconds, "-1");
DEFINE_mInt32(tablet_path_check_batch_size, "1000");

// Page size of row column, default 4KB
DEFINE_mInt64(row_column_page_size, "4096");
// it must be larger than or equal to 5MB
DEFINE_mInt32(s3_write_buffer_size, "5242880");
// The timeout config for S3 buffer allocation
Expand Down Expand Up @@ -1256,10 +1256,14 @@ DEFINE_Int64(min_row_group_size, "134217728");
// The time out milliseconds for remote fetch schema RPC, default 60s
DEFINE_mInt64(fetch_remote_schema_rpc_timeout_ms, "60000");

DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824");

DEFINE_mInt64(compaction_batch_size, "-1");

// If set to false, the parquet reader will not use page index to filter data.
// This is only for debug purpose, in case sometimes the page index
// filter wrong data.
DEFINE_mBool(enable_parquet_page_index, "true");
DEFINE_mBool(enable_parquet_page_index, "false");

DEFINE_mBool(ignore_not_found_file_in_external_table, "true");

Expand Down
8 changes: 6 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ DECLARE_mInt32(max_single_replica_compaction_threads);

DECLARE_Bool(enable_base_compaction_idle_sched);
DECLARE_mInt64(base_compaction_min_rowset_num);
DECLARE_mInt64(base_compaction_max_compaction_score);
DECLARE_mDouble(base_compaction_min_data_ratio);
DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes);

Expand Down Expand Up @@ -464,6 +465,7 @@ DECLARE_mInt64(compaction_min_size_mbytes);
// cumulative compaction policy: min and max delta file's number
DECLARE_mInt64(cumulative_compaction_min_deltas);
DECLARE_mInt64(cumulative_compaction_max_deltas);
DECLARE_mInt32(cumulative_compaction_max_deltas_factor);

// This config can be set to limit thread number in multiget thread pool.
DECLARE_mInt32(multi_get_max_threads);
Expand Down Expand Up @@ -1095,8 +1097,6 @@ DECLARE_mInt64(max_tablet_io_errors);
DECLARE_Int32(tablet_path_check_interval_seconds);
DECLARE_mInt32(tablet_path_check_batch_size);

// Page size of row column, default 4KB
DECLARE_mInt64(row_column_page_size);
// it must be larger than or equal to 5MB
DECLARE_mInt32(s3_write_buffer_size);
// The timeout config for S3 buffer allocation
Expand Down Expand Up @@ -1346,6 +1346,10 @@ DECLARE_mInt64(fetch_remote_schema_rpc_timeout_ms);
// The minimum row group size when exporting Parquet files.
DECLARE_Int64(min_row_group_size);

DECLARE_mInt64(compaction_memory_bytes_limit);

DECLARE_mInt64(compaction_batch_size);

DECLARE_mBool(enable_parquet_page_index);

// Wheather to ignore not found file in external teble(eg, hive)
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ void Status::to_thrift(TStatus* s) const {
// << "The error code has to > 0 because TStatusCode need it > 0, it's actual value is "
// << _code;
s->status_code = (int16_t)_code > 0 ? (TStatusCode::type)_code : TStatusCode::INTERNAL_ERROR;

if (_code == ErrorCode::VERSION_ALREADY_MERGED) {
s->status_code = TStatusCode::OLAP_ERR_VERSION_ALREADY_MERGED;
} else if (_code == ErrorCode::TABLE_NOT_FOUND) {
s->status_code = TStatusCode::TABLET_MISSING;
}

s->error_msgs.push_back(fmt::format("({})[{}]{}", BackendOptions::get_localhost(),
code_as_string(), _err_msg ? _err_msg->_msg : ""));
s->__isset.error_msgs = true;
Expand Down
69 changes: 68 additions & 1 deletion be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
#include "exec/schema_scanner/schema_workload_groups_scanner.h"
#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h"
#include "olap/hll.h"
#include "pipeline/pipeline_x/dependency.h"
#include "runtime/define_primitive_type.h"
#include "runtime/fragment_mgr.h"
#include "runtime/types.h"
#include "util/string_util.h"
#include "util/types.h"
#include "vec/columns/column.h"
Expand All @@ -64,6 +67,7 @@
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
class ObjectPool;
Expand All @@ -84,7 +88,60 @@ Status SchemaScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaScanner::get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
if (_data_block == nullptr) {
return Status::InternalError("No data left!");
}
DCHECK(_async_thread_running == false);
RETURN_IF_ERROR(_scanner_status.status());
for (size_t i = 0; i < block->columns(); i++) {
std::move(*block->get_by_position(i).column)
.mutate()
->insert_range_from(*_data_block->get_by_position(i).column, 0,
_data_block->rows());
}
_data_block->clear_column_data();
*eos = _eos;
if (!*eos) {
RETURN_IF_ERROR(get_next_block_async(state));
}
return Status::OK();
}

Status SchemaScanner::get_next_block_async(RuntimeState* state) {
_dependency->block();
auto task_ctx = state->get_task_execution_context();
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this, task_ctx, state]() {
DCHECK(_async_thread_running == false);
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
_scanner_status.update(Status::InternalError("Task context not exists!"));
return;
}
SCOPED_ATTACH_TASK(state);
_dependency->block();
_async_thread_running = true;
_finish_dependency->block();
if (!_opened) {
_data_block = vectorized::Block::create_unique();
_init_block(_data_block.get());
_scanner_status.update(start(state));
_opened = true;
}
bool eos = false;
_scanner_status.update(get_next_block_internal(_data_block.get(), &eos));
_eos = eos;
_async_thread_running = false;
_dependency->set_ready();
if (eos) {
_finish_dependency->set_ready();
}
}));
return Status::OK();
}

Status SchemaScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("used before initialized.");
}
Expand Down Expand Up @@ -173,6 +230,16 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
}
}

void SchemaScanner::_init_block(vectorized::Block* src_block) {
const std::vector<SchemaScanner::ColumnDesc>& columns_desc(get_column_desc());
for (int i = 0; i < columns_desc.size(); ++i) {
TypeDescriptor descriptor(columns_desc[i].type);
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
src_block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), data_type,
columns_desc[i].name));
}
}

Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector<void*>& datas) {
const ColumnDesc& col_desc = _columns[pos];
Expand Down
25 changes: 24 additions & 1 deletion be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <stddef.h>
#include <stdint.h>

#include <condition_variable>
#include <memory>
#include <string>
#include <vector>
Expand All @@ -43,6 +44,10 @@ namespace vectorized {
class Block;
}

namespace pipeline {
class Dependency;
}

struct SchemaScannerCommonParam {
SchemaScannerCommonParam()
: db(nullptr),
Expand All @@ -64,6 +69,7 @@ struct SchemaScannerCommonParam {
int32_t port; // frontend thrift port
int64_t thread_id;
const std::string* catalog = nullptr;
std::set<TNetworkAddress> fe_addr_list;
};

// scanner parameter from frontend
Expand Down Expand Up @@ -94,15 +100,23 @@ class SchemaScanner {

// init object need information, schema etc.
virtual Status init(SchemaScannerParam* param, ObjectPool* pool);
Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos);
// Start to work
virtual Status start(RuntimeState* state);
virtual Status get_next_block(vectorized::Block* block, bool* eos);
virtual Status get_next_block_internal(vectorized::Block* block, bool* eos);
const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
// factory function
static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
TSchemaTableType::type type() const { return _schema_table_type; }
void set_dependency(std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency> fin_dep) {
_dependency = dep;
_finish_dependency = fin_dep;
}
Status get_next_block_async(RuntimeState* state);

protected:
void _init_block(vectorized::Block* src_block);
Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector<void*>& datas);

Expand All @@ -125,6 +139,15 @@ class SchemaScanner {
RuntimeProfile::Counter* _get_table_timer = nullptr;
RuntimeProfile::Counter* _get_describe_timer = nullptr;
RuntimeProfile::Counter* _fill_block_timer = nullptr;

std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;

std::unique_ptr<vectorized::Block> _data_block;
AtomicStatus _scanner_status;
std::atomic<bool> _eos = false;
std::atomic<bool> _opened = false;
std::atomic<bool> _async_thread_running = false;
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
return Status::OK();
}

Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_active_queries_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaActiveQueriesScanner : public SchemaScanner {
~SchemaActiveQueriesScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ Status SchemaBackendActiveTasksScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Block* block,
bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaBackendActiveTasksScanner : public SchemaScanner {
~SchemaBackendActiveTasksScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_charsets_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ SchemaCharsetsScanner::SchemaCharsetsScanner()

SchemaCharsetsScanner::~SchemaCharsetsScanner() {}

Status SchemaCharsetsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaCharsetsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("call this before initial.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_charsets_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaCharsetsScanner : public SchemaScanner {
SchemaCharsetsScanner();
~SchemaCharsetsScanner() override;

Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
struct CharsetStruct {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_collations_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ SchemaCollationsScanner::SchemaCollationsScanner()

SchemaCollationsScanner::~SchemaCollationsScanner() {}

Status SchemaCollationsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaCollationsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("call this before initial.");
}
Expand Down
Loading

0 comments on commit b1edda3

Please sign in to comment.