Skip to content

Commit

Permalink
Merge branch 'master' into ut-scanner-schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh authored Feb 13, 2025
2 parents 2ba0da5 + f015d9a commit 4432f1c
Show file tree
Hide file tree
Showing 277 changed files with 9,066 additions and 1,924 deletions.
4 changes: 1 addition & 3 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ github:
- P0 Regression (Doris Regression)
- External Regression (Doris External Regression)
- cloud_p0 (Doris Cloud Regression)

required_pull_request_reviews:
required_pull_request_reviews:
require_code_owner_reviews: true
required_approving_review_count: 1
dismiss_stale_reviews: true
Expand Down Expand Up @@ -147,7 +146,6 @@ github:
required_approving_review_count: 1

collaborators:
- LemonLiTree
- Yukang-Lian
- TangSiyang2001
- freemandealer
Expand Down
22 changes: 14 additions & 8 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <fmt/format.h>

#include <memory>
#include <random>
#include <thread>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet.h"
Expand Down Expand Up @@ -192,6 +194,18 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
}

int64_t t3 = MonotonicMicros();
DBUG_EXECUTE_IF("CloudEngineCalcDeleteBitmapTask.handle.inject_sleep", {
auto p = dp->param("percent", 0.01);
// 100s > Config.calculate_delete_bitmap_task_timeout_seconds = 60s
auto sleep_time = dp->param("sleep", 100);
std::mt19937 gen {std::random_device {}()};
std::bernoulli_distribution inject_fault {p};
if (inject_fault(gen)) {
LOG_INFO("injection sleep for {} seconds, txn={}, tablet_id={}", sleep_time,
_transaction_id, _tablet_id);
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
}
});
Status status;
if (_sub_txn_ids.empty()) {
status = _handle_rowset(tablet, _version);
Expand Down Expand Up @@ -262,7 +276,6 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
return status;
}

int64_t t3 = MonotonicMicros();
rowset->set_version(Version(version, version));
TabletTxnInfo txn_info;
txn_info.rowset = rowset;
Expand All @@ -274,7 +287,6 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
.base_compaction_cnt = _ms_base_compaction_cnt,
.cumulative_compaction_cnt = _ms_cumulative_compaction_cnt,
.cumulative_point = _ms_cumulative_point};
int64_t update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) &&
version == previous_publish_info.publish_version &&
_ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt &&
Expand Down Expand Up @@ -304,7 +316,6 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, transaction_id,
txn_expiration, tablet_delete_bitmap);
}
update_delete_bitmap_time_us = MonotonicMicros() - t3;
}
if (status != Status::OK()) {
LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << rowset->rowset_id()
Expand All @@ -314,11 +325,6 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
}

_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "calculate delete bitmap successfully on tablet"
<< ", table_id=" << tablet->table_id() << ", " << txn_str
<< ", tablet_id=" << tablet->tablet_id() << ", num_rows=" << rowset->num_rows()
<< ", update_delete_bitmap_time_us=" << update_delete_bitmap_time_us
<< ", res=" << status;
if (invisible_rowsets != nullptr) {
invisible_rowsets->push_back(rowset);
// see CloudTablet::save_delete_bitmap
Expand Down
18 changes: 18 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,24 @@ Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t loc
bitmap.write(bitmap_data.data());
*(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
}
DBUG_EXECUTE_IF("CloudMetaMgr::test_update_big_delete_bitmap", {
LOG(INFO) << "test_update_big_delete_bitmap for tablet " << tablet.tablet_id();
auto count = dp->param<int>("count", 30000);
if (!delete_bitmap->delete_bitmap.empty()) {
auto& key = delete_bitmap->delete_bitmap.begin()->first;
auto& bitmap = delete_bitmap->delete_bitmap.begin()->second;
for (int i = 1000; i < (1000 + count); i++) {
req.add_rowset_ids(std::get<0>(key).to_string());
req.add_segment_ids(std::get<1>(key));
req.add_versions(i);
// To save space, convert array and bitmap containers to run containers
bitmap.runOptimize();
std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
bitmap.write(bitmap_data.data());
*(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
}
}
});
auto st = retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap);
if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
Expand Down
5 changes: 0 additions & 5 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,6 @@ Status CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transact
// must call release handle to reduce the reference count,
// otherwise there will be memory leak
release(handle);
LOG_INFO("update txn related delete bitmap")
.tag("txn_id", transaction_id)
.tag("tablt_id", tablet_id)
.tag("delete_bitmap_size", charge)
.tag("publish_status", static_cast<int>(publish_status));
return Status::OK();
}

Expand Down
1 change: 0 additions & 1 deletion be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ Status HdfsFileWriter::_append(std::string_view content) {
if (_batch_buffer.full()) {
auto error_msg = fmt::format("invalid batch buffer status, capacity {}, size {}",
_batch_buffer.capacity(), _batch_buffer.size());
DCHECK(false) << error_msg;
return Status::InternalError(error_msg);
}
size_t append_size = _batch_buffer.append(content);
Expand Down
47 changes: 32 additions & 15 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <cstdint>
#include <iterator>
#include <random>

#include "common/cast_set.h"
#include "common/logging.h"
Expand Down Expand Up @@ -541,10 +542,7 @@ Status BaseTablet::calc_delete_bitmap(const BaseTabletSPtr& tablet, RowsetShared
DeleteBitmapPtr delete_bitmap, int64_t end_version,
CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer,
DeleteBitmapPtr tablet_delete_bitmap) {
auto rowset_id = rowset->rowset_id();
if (specified_rowsets.empty() || segments.empty()) {
LOG(INFO) << "skip to construct delete bitmap tablet: " << tablet->tablet_id()
<< " rowset: " << rowset_id;
return Status::OK();
}

Expand Down Expand Up @@ -682,6 +680,18 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
continue;
}

DBUG_EXECUTE_IF("BaseTablet::calc_segment_delete_bitmap.inject_err", {
auto p = dp->param("percent", 0.01);
std::mt19937 gen {std::random_device {}()};
std::bernoulli_distribution inject_fault {p};
if (inject_fault(gen)) {
return Status::InternalError(
"injection error in calc_segment_delete_bitmap, "
"tablet_id={}, rowset_id={}",
tablet_id(), rowset_id.to_string());
}
});

RowsetSharedPtr rowset_find;
Status st = Status::OK();
if (tablet_delete_bitmap == nullptr) {
Expand Down Expand Up @@ -808,22 +818,29 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
partial_update_info->partial_update_mode_str(), new_generated_rows,
rowset_writer->num_rows(), rids_be_overwritten.size(), tablet_id());
}
LOG(INFO) << "calc segment delete bitmap for "
<< partial_update_info->partial_update_mode_str() << ", tablet: " << tablet_id()
auto cost_us = watch.get_elapse_time_us();
if (cost_us > 10 * 1000) {
LOG(INFO) << "calc segment delete bitmap for "
<< partial_update_info->partial_update_mode_str()
<< ", tablet: " << tablet_id() << " rowset: " << rowset_id
<< " seg_id: " << seg->id() << " dummy_version: " << end_version + 1
<< " rows: " << seg->num_rows() << " conflict rows: " << conflict_rows
<< " filtered rows: " << rids_be_overwritten.size()
<< " new generated rows: " << new_generated_rows
<< " bimap num: " << delete_bitmap->delete_bitmap.size()
<< " cost: " << cost_us << "(us)";
}
return Status::OK();
}
auto cost_us = watch.get_elapse_time_us();
if (cost_us > 10 * 1000) {
LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id()
<< " rowset: " << rowset_id << " seg_id: " << seg->id()
<< " dummy_version: " << end_version + 1 << " rows: " << seg->num_rows()
<< " conflict rows: " << conflict_rows
<< " filtered rows: " << rids_be_overwritten.size()
<< " new generated rows: " << new_generated_rows
<< " bimap num: " << delete_bitmap->delete_bitmap.size()
<< " cost: " << watch.get_elapse_time_us() << "(us)";
return Status::OK();
<< " bitmap num: " << delete_bitmap->delete_bitmap.size() << " cost: " << cost_us
<< "(us)";
}
LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() << " rowset: " << rowset_id
<< " seg_id: " << seg->id() << " dummy_version: " << end_version + 1
<< " rows: " << seg->num_rows() << " conflict rows: " << conflict_rows
<< " bitmap num: " << delete_bitmap->delete_bitmap.size()
<< " cost: " << watch.get_elapse_time_us() << "(us)";
return Status::OK();
}

Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,16 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
<< ", memsize: " << memtable->memory_usage()
<< ", rows: " << memtable->stat().raw_rows;
memtable->update_mem_type(MemType::FLUSH);
int64_t duration_ns;
SCOPED_RAW_TIMER(&duration_ns);
SCOPED_ATTACH_TASK(memtable->resource_ctx());
signal::set_signal_task_id(_rowset_writer->load_id());
signal::tablet_id = memtable->tablet_id();
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
SCOPED_ATTACH_TASK(memtable->resource_ctx());
SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
std::unique_ptr<vectorized::Block> block;
RETURN_IF_ERROR(memtable->to_block(&block));
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
memtable->set_flush_success();
}
memtable->set_flush_success();
_memtable_stat += memtable->stat();
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
Expand All @@ -164,6 +162,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in

void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
int64_t submit_task_time) {
signal::set_signal_task_id(_rowset_writer->load_id());
signal::tablet_id = memtable_ptr->tablet_id();
Defer defer {[&]() {
std::lock_guard<std::mutex> lock(_mutex);
_stats.flush_running_count--;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/push_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <butil/macros.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/Exprs_types.h>
#include <stdint.h>

#include <cstdint>
#include <memory>
#include <string>
#include <vector>
Expand All @@ -35,6 +35,7 @@
#include "olap/rowset/rowset_fwd.h"
#include "olap/tablet_fwd.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
#include "vec/exec/format/generic_reader.h"

namespace doris {
Expand All @@ -48,7 +49,6 @@ class TTabletInfo;
class StorageEngine;

namespace vectorized {
class Block;
class GenericReader;
class VExprContext;
} // namespace vectorized
Expand Down
10 changes: 8 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <utility>

#include "cloud/config.h"
#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
Expand Down Expand Up @@ -498,8 +499,13 @@ Status Segment::_load_pk_bloom_filter(OlapReaderStatistics* stats) {
}

Status Segment::load_pk_index_and_bf(OlapReaderStatistics* index_load_stats) {
RETURN_IF_ERROR(load_index(index_load_stats));
RETURN_IF_ERROR(_load_pk_bloom_filter(index_load_stats));
// `DorisCallOnce` may catch exception in calling stack A and re-throw it in
// a different calling stack B which doesn't have catch block. So we add catch block here
// to prevent coreudmp
RETURN_IF_CATCH_EXCEPTION({
RETURN_IF_ERROR(load_index(index_load_stats));
RETURN_IF_ERROR(_load_pk_bloom_filter(index_load_stats));
});
return Status::OK();
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,6 @@ Status BaseRowsetBuilder::wait_calc_delete_bitmap() {
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_wait_delete_bitmap_timer);
RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
LOG(INFO) << "Got result of calc delete bitmap task from executor, tablet_id: "
<< _tablet->tablet_id() << ", txn_id: " << _req.txn_id;
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping";
DCHECK(_limit == -1) << "Preaggs have no limits";
}
} else {
_is_streaming_preagg = false;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block
if (_is_merging && !local_state.is_ready) {
SCOPED_TIMER(local_state.create_merger_timer);
RETURN_IF_ERROR(local_state.stream_recvr->create_merger(
local_state.vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first,
local_state.vsort_exec_exprs.ordering_expr_ctxs(), _is_asc_order, _nulls_first,
state->batch_size(), _limit, _offset));
local_state.is_ready = true;
return Status::OK();
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ class DataSinkOperatorXBase : public OperatorBase {

DataSinkOperatorXBase(const int operator_id, const int node_id, std::vector<int>& sources)
: OperatorBase(), _operator_id(operator_id), _node_id(node_id), _dests_id(sources) {}
#ifdef BE_TEST
DataSinkOperatorXBase() : _operator_id(-1), _node_id(0), _dests_id({-1}) {};
#endif

~DataSinkOperatorXBase() override = default;

Expand Down Expand Up @@ -537,6 +540,9 @@ class DataSinkOperatorX : public DataSinkOperatorXBase {

DataSinkOperatorX(const int id, const int node_id, std::vector<int> sources)
: DataSinkOperatorXBase(id, node_id, sources) {}
#ifdef BE_TEST
DataSinkOperatorX() = default;
#endif
~DataSinkOperatorX() override = default;

Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override;
Expand Down
13 changes: 0 additions & 13 deletions be/src/pipeline/exec/sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,5 @@ const vectorized::SortDescription& SortSourceOperatorX::get_sort_description(
return local_state._shared_state->sorter->get_sort_description();
}

Status SortSourceOperatorX::build_merger(RuntimeState* state,
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
RuntimeProfile* profile) {
// now only use in LocalMergeSortExchanger::get_block
vectorized::VSortExecExprs vsort_exec_exprs;
// clone vsort_exec_exprs in LocalMergeSortExchanger
RETURN_IF_ERROR(_vsort_exec_exprs.clone(state, vsort_exec_exprs));
merger = std::make_unique<vectorized::VSortedRunMerger>(
vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first,
state->batch_size(), _limit, _offset, profile);
return Status::OK();
}

#include "common/compile_check_end.h"
} // namespace doris::pipeline
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ class SortSourceOperatorX final : public OperatorX<SortLocalState> {
bool use_local_merge() const { return _merge_by_exchange; }
const vectorized::SortDescription& get_sort_description(RuntimeState* state) const;

Status build_merger(RuntimeState* state, std::unique_ptr<vectorized::VSortedRunMerger>& merger,
RuntimeProfile* profile);

private:
friend class PipelineFragmentContext;
friend class SortLocalState;

const bool _merge_by_exchange;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
Expand Down
9 changes: 9 additions & 0 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
_texprs(texprs),
_partitioned_exprs_num(texprs.size()),
_shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
#ifdef BE_TEST
LocalExchangeSinkOperatorX(const std::vector<TExpr>& texprs,
const std::map<int, int>& bucket_seq_to_instance_idx)
: Base(),
_num_partitions(0),
_texprs(texprs),
_partitioned_exprs_num(texprs.size()),
_shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
#endif

Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL
public:
using Base = OperatorX<LocalExchangeSourceLocalState>;
LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, id, id) {}
#ifdef BE_TEST
LocalExchangeSourceOperatorX() = default;
#endif
Status init(ExchangeType type) override {
_op_name = "LOCAL_EXCHANGE_OPERATOR (" + get_exchange_type_name(type) + ")";
_exchange_type = type;
Expand Down
Loading

0 comments on commit 4432f1c

Please sign in to comment.