Skip to content

Commit

Permalink
Merge branch 'master' into enhancement_nereids_kill-query-support-union
Browse files Browse the repository at this point in the history
  • Loading branch information
Yao-MR authored Dec 24, 2024
2 parents 627fcfd + c45d468 commit 1f64bf0
Show file tree
Hide file tree
Showing 3,707 changed files with 11,622 additions and 105,730 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ github:
- COMPILE (DORIS_COMPILE)
- Need_2_Approval
- Cloud UT (Doris Cloud UT)
- performance (Doris Performance)

required_pull_request_reviews:
dismiss_stale_reviews: true
Expand Down
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,9 @@ Apache Doris is an easy-to-use, high-performance and real-time analytical databa

All this makes Apache Doris an ideal tool for scenarios including report analysis, ad-hoc query, unified data warehouse, and data lake query acceleration. On Apache Doris, users can build various applications, such as user behavior analysis, AB test platform, log retrieval analysis, user portrait analysis, and order analysis.

🎉 Version 2.1.4 released now. Check out the 🔗[Release Notes](https://doris.apache.org/docs/releasenotes/release-2.1.4) here. The 2.1 verison delivers exceptional performance with 100% higher out-of-the-box queries proven by TPC-DS 1TB tests, enhanced data lake analytics that are 4-6 times speedier than Trino and Spark, solid support for semi-structured data analysis with new Variant types and suite of analytical functions, asynchronous materialized views for query acceleration, optimized real-time writing at scale, and better workload management with stability and runtime SQL resource tracking.
🎉 Check out the 🔗[All releases](https://doris.apache.org/docs/releasenotes/all-release), where you'll find a chronological summary of Apache Doris versions released over the past year.


🎉 Version 2.0.12 is now released ! This fully evolved and stable release is ready for all users to upgrade. Check out the 🔗[Release Notes](https://doris.apache.org/docs/2.0/releasenotes/release-2.0.12) here.

👀 Have a look at the 🔗[Official Website](https://doris.apache.org/) for a comprehensive list of Apache Doris's core features, blogs and user cases.
👀 Explore the 🔗[Official Website](https://doris.apache.org/) to discover Apache Doris's core features, blogs, and user cases in detail.

## 📈 Usage Scenarios

Expand Down
7 changes: 4 additions & 3 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ Status CloudBaseCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -343,7 +344,7 @@ Status CloudBaseCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("num_output_delete_bitmap", output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ Status CloudCumulativeCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms, range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -299,7 +300,8 @@ Status CloudCumulativeCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("number_output_delete_bitmap",
output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
void _check_file_cache_ttl_block_valid();

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id;

bool synced = false;
do {
Expand Down
38 changes: 26 additions & 12 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/cumulative_compaction_time_series_policy.h"
Expand All @@ -54,6 +55,7 @@ namespace doris {
using namespace ErrorCode;

static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
static constexpr int LOAD_INITIATOR_ID = -1;

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
: BaseTablet(std::move(tablet_meta)), _engine(engine) {}
Expand Down Expand Up @@ -407,6 +409,9 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
auto rs_it = _stale_rs_version_map.find(v_ts->version());
if (rs_it != _stale_rs_version_map.end()) {
expired_rowsets.push_back(rs_it->second);
LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id()
<< " rowset_id=" << rs_it->second->rowset_id().to_string()
<< " version=" << rs_it->first.to_string();
_stale_rs_version_map.erase(rs_it);
} else {
LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
Expand Down Expand Up @@ -504,13 +509,19 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
int64_t txn_expiration) {
if (rowset.rowset_meta()->rowset_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE) [[unlikely]] {
// May cause the segment files generated by the transient rowset writer unable to be
// recycled, see `CloudRowsetWriter::build` for detail.
LOG(WARNING) << "Wrong rowset state: " << rowset.rowset_meta()->rowset_state();
DCHECK(false) << rowset.rowset_meta()->rowset_state();
if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
auto msg = fmt::format(
"wrong rowset state when create_transient_rowset_writer, rowset state should be "
"BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
tablet_id());
// see `CloudRowsetWriter::build` for detail.
// if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
// in `RowsetMeta::merge_rowset_meta()` in previous trials.
LOG(WARNING) << msg;
DCHECK(false) << msg;
}

RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
Expand Down Expand Up @@ -650,11 +661,14 @@ void CloudTablet::get_compaction_status(std::string* json_result) {
}

void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
_cumulative_point = new_point;
return;
}
// cumulative point should only be reset to -1, or be increased
CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point)
<< "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
_cumulative_point = new_point;
// FIXME: could happen in currently unresolved race conditions
LOG(WARNING) << "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
}

std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
Expand Down Expand Up @@ -719,8 +733,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
}

auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*this, ms_lock_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
Expand Down
94 changes: 56 additions & 38 deletions be/src/cloud/cloud_tablet_hotspot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,55 @@ TabletHotspot::~TabletHotspot() {
}
}

struct MapKeyHash {
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
void get_return_partitions(
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
hot_partition,
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
last_hot_partition,
std::vector<THotTableMessage>* hot_tables, int& return_partitions, int N) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
auto last_value_iter = last_hot_partition.find(key);
if (last_value_iter != last_hot_partition.end()) {
auto last_partition_iter = last_value_iter->second.find(partition_id);
if (last_partition_iter != last_value_iter->second.end()) {
const auto& last_value = last_partition_iter->second;
if (std::abs(static_cast<int64_t>(value.qpd) -
static_cast<int64_t>(last_value.qpd)) < 5 &&
std::abs(static_cast<int64_t>(value.qpw) -
static_cast<int64_t>(last_value.qpw)) < 10 &&
std::abs(static_cast<int64_t>(value.last_access_time) -
static_cast<int64_t>(last_value.last_access_time)) < 60) {
LOG(INFO) << "skip partition_id=" << partition_id << " qpd=" << value.qpd
<< " qpw=" << value.qpw
<< " last_access_time=" << value.last_access_time
<< " last_qpd=" << last_value.qpd
<< " last_qpw=" << last_value.qpw
<< " last_access_time=" << last_value.last_access_time;
continue;
}
}
}
THotPartition hot_partition;
hot_partition.__set_partition_id(partition_id);
hot_partition.__set_query_per_day(value.qpd);
hot_partition.__set_query_per_week(value.qpw);
hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
};
struct TabletHotspotMapValue {
uint64_t qpd = 0; // query per day
uint64_t qpw = 0; // query per week
int64_t last_access_time;
};

using TabletHotspotMapKey = std::pair<int64_t, int64_t>;
}

void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_tables) {
// map<pair<table_id, index_id>, map<partition_id, value>> for day
Expand Down Expand Up @@ -108,33 +145,14 @@ void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_t
});
constexpr int N = 50;
int return_partitions = 0;
auto get_return_partitions =
[=, &return_partitions](
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>& hot_partition) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
THotPartition hot_partition;
hot_partition.__set_partition_id(partition_id);
hot_partition.__set_query_per_day(value.qpd);
hot_partition.__set_query_per_week(value.qpw);
hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
};
get_return_partitions(day_hot_partitions);
get_return_partitions(week_hot_partitions);

get_return_partitions(day_hot_partitions, _last_day_hot_partitions, hot_tables,
return_partitions, N);
get_return_partitions(week_hot_partitions, _last_week_hot_partitions, hot_tables,
return_partitions, N);

_last_day_hot_partitions = std::move(day_hot_partitions);
_last_week_hot_partitions = std::move(week_hot_partitions);
}

void HotspotCounter::make_dot_point() {
Expand Down
19 changes: 19 additions & 0 deletions be/src/cloud/cloud_tablet_hotspot.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ struct HotspotCounter {
};

using HotspotCounterPtr = std::shared_ptr<HotspotCounter>;
using TabletHotspotMapKey = std::pair<int64_t, int64_t>;

struct TabletHotspotMapValue {
uint64_t qpd = 0; // query per day
uint64_t qpw = 0; // query per week
int64_t last_access_time;
};

struct MapKeyHash {
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
}
};

class TabletHotspot {
public:
Expand All @@ -71,6 +84,12 @@ class TabletHotspot {
bool _closed {false};
std::mutex _mtx;
std::condition_variable _cond;
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
_last_day_hot_partitions;
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
_last_week_hot_partitions;
};

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,8 @@ DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");

DEFINE_mInt32(compaction_num_per_round, "1");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,8 @@ DECLARE_Bool(enable_table_size_correctness_check);
// Enable sleep 5s between delete cumulative compaction.
DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);

DECLARE_mInt32(compaction_num_per_round);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ void refresh_memory_state_after_memory_change() {
}

void refresh_cache_capacity() {
if (doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load(
std::memory_order_relaxed)) {
// the last cache capacity adjustment has not been completed.
return;
}
if (refresh_cache_capacity_sleep_time_ms <= 0) {
auto cache_capacity_reduce_mem_limit = int64_t(
doris::MemInfo::soft_mem_limit() * config::cache_capacity_reduce_mem_limit_frac);
Expand All @@ -247,6 +252,8 @@ void refresh_cache_capacity() {
new_cache_capacity_adjust_weighted;
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
refresh_cache_capacity_sleep_time_ms = config::memory_gc_sleep_time_ms;
} else {
refresh_cache_capacity_sleep_time_ms = 0;
}
}
refresh_cache_capacity_sleep_time_ms -= config::memory_maintenance_sleep_time_ms;
Expand Down
11 changes: 10 additions & 1 deletion be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ namespace ErrorCode {
E(ENTRY_NOT_FOUND, -7002, false); \
E(INVALID_TABLET_STATE, -7211, false); \
E(ROWSETS_EXPIRED, -7311, false); \
E(CGROUP_ERROR, -7411, false);
E(CGROUP_ERROR, -7411, false); \
E(FATAL_ERROR, -7412, false);

// Define constexpr int error_code_name = error_code_value
#define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE;
Expand Down Expand Up @@ -446,6 +447,14 @@ class [[nodiscard]] Status {

static Status OK() { return {}; }

template <bool stacktrace = true, typename... Args>
static Status FatalError(std::string_view msg, Args&&... args) {
#ifndef NDEBUG
LOG(FATAL) << fmt::format(msg, std::forward<Args>(args)...);
#endif
return Error<ErrorCode::FATAL_ERROR, stacktrace>(msg, std::forward<Args>(args)...);
}

// default have stacktrace. could disable manually.
#define ERROR_CTOR(name, code) \
template <bool stacktrace = true, typename... Args> \
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaActiveQueriesScanner::_s_tbls_columns = {
// name, type, size
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), true},
Expand Down Expand Up @@ -92,7 +94,7 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
_active_query_block->reserve(_block_rows_limit);

if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>("active queries schema is not match for FE and BE");
}
Expand All @@ -119,7 +121,7 @@ Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* bl

if (_active_query_block == nullptr) {
RETURN_IF_ERROR(_get_active_queries_block_from_fe());
_total_rows = _active_query_block->rows();
_total_rows = (int)_active_query_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
Loading

0 comments on commit 1f64bf0

Please sign in to comment.