Skip to content

Commit

Permalink
Merge branch 'case-from-master' of https://github.com/amorynan/doris
Browse files Browse the repository at this point in the history
…into case-from-master
  • Loading branch information
amorynan committed Aug 1, 2024
2 parents 473bf9d + 1b6b594 commit 7d42b58
Show file tree
Hide file tree
Showing 370 changed files with 29,651 additions and 742 deletions.
2 changes: 1 addition & 1 deletion be/src/clucene
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ DEFINE_mInt32(migration_remaining_size_threshold_mb, "10");
// If the task runs longer than this time, the task will be terminated, in seconds.
// timeout = std::max(migration_task_timeout_secs, tablet size / 1MB/s)
DEFINE_mInt32(migration_task_timeout_secs, "300");
// timeout for try_lock migration lock
DEFINE_Int64(migration_lock_timeout_ms, "1000");

// Port to start debug webserver on
DEFINE_Int32(webserver_port, "8040");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ DECLARE_mInt32(migration_remaining_size_threshold_mb);
// If the task runs longer than this time, the task will be terminated, in seconds.
// timeout = std::max(migration_task_timeout_secs, tablet size / 1MB/s)
DECLARE_mInt32(migration_task_timeout_secs);
// timeout for try_lock migration lock
DECLARE_Int64(migration_lock_timeout_ms);

// Port to start debug webserver on
DECLARE_Int32(webserver_port);
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "vec/io/io_helper.h"
#include "vec/runtime/ipv4_value.h"
#include "vec/runtime/ipv6_value.h"
#include "vec/runtime/time_value.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris {
Expand All @@ -70,6 +71,8 @@ std::string cast_to_string(T value, int scale) {
std::stringstream ss;
ss << buf;
return ss.str();
} else if constexpr (primitive_type == TYPE_TIMEV2) {
return TimeValue::to_string(value, scale);
} else if constexpr (primitive_type == TYPE_IPV4) {
return IPv4Value::to_string(value);
} else if constexpr (primitive_type == TYPE_IPV6) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/olap_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ inline SQLFilterOp to_olap_filter_type(const std::string& function_name, bool op
return opposite ? FILTER_NOT_IN : FILTER_IN;
} else if (function_name == "ne") {
return opposite ? FILTER_IN : FILTER_NOT_IN;
} else if (function_name == "in_list") {
} else if (function_name == "in") {
return opposite ? FILTER_NOT_IN : FILTER_IN;
} else if (function_name == "not_in_list") {
} else if (function_name == "not_in") {
return opposite ? FILTER_IN : FILTER_NOT_IN;
} else {
DCHECK(false) << "Function Name: " << function_name;
Expand Down
20 changes: 16 additions & 4 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,12 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
virtual ~BloomFilterFuncBase() = default;

void init_params(const RuntimeFilterParams* params) {
_bloom_filter_length = params->bloom_filter_size;
_bloom_filter_length =
params->runtime_bloom_filter_min_size > 0
? std::max(params->bloom_filter_size, params->runtime_bloom_filter_min_size)
: params->bloom_filter_size;
_build_bf_exactly = params->build_bf_exactly;
_runtime_bloom_filter_min_size = params->runtime_bloom_filter_min_size;
_null_aware = params->null_aware;
_bloom_filter_size_calculated_by_ndv = params->bloom_filter_size_calculated_by_ndv;
}
Expand All @@ -124,9 +128,16 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
// if FE do use ndv stat to predict the bf size, BE only use the row count. FE have more
// exactly row count stat. which one is min is more correctly.
if (_bloom_filter_size_calculated_by_ndv) {
_bloom_filter_length = std::min(be_calculate_size, _bloom_filter_length);
_bloom_filter_length =
_runtime_bloom_filter_min_size > 0
? std::max(_runtime_bloom_filter_min_size,
std::min(be_calculate_size, _bloom_filter_length))
: std::min(be_calculate_size, _bloom_filter_length);
} else {
_bloom_filter_length = be_calculate_size;
_bloom_filter_length =
_runtime_bloom_filter_min_size > 0
? std::max(_runtime_bloom_filter_min_size, be_calculate_size)
: be_calculate_size;
}
}
return init_with_fixed_length(_bloom_filter_length);
Expand Down Expand Up @@ -221,8 +232,9 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
// bloom filter size
int32_t _bloom_filter_alloced;
std::shared_ptr<BloomFilterAdaptor> _bloom_filter;
bool _inited {};
bool _inited = false;
int64_t _bloom_filter_length;
int64_t _runtime_bloom_filter_min_size;
bool _build_bf_exactly = false;
bool _bloom_filter_size_calculated_by_ndv = false;
};
Expand Down
3 changes: 3 additions & 0 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,9 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
params.filter_type = _runtime_filter_type;
params.column_return_type = build_ctx->root()->type().type;
params.max_in_num = options->runtime_filter_max_in_num;
params.runtime_bloom_filter_min_size = options->__isset.runtime_bloom_filter_min_size
? options->runtime_bloom_filter_min_size
: 0;
// We build runtime filter by exact distinct count iff three conditions are met:
// 1. Only 1 join key
// 2. Do not have remote target (e.g. do not need to merge), or broadcast join
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ struct RuntimeFilterParams {
// used in bloom filter
int64_t bloom_filter_size;
int32_t max_in_num;
int64_t runtime_bloom_filter_min_size;
int32_t filter_id;
bool bitmap_filter_not_in;
bool build_bf_exactly;
Expand Down
29 changes: 16 additions & 13 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::InternalError("receive body don't equal with body bytes");
return Status::InternalError<false>("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
Expand Down Expand Up @@ -230,13 +230,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// auth information
if (!parse_basic_auth(*http_req, &ctx->auth)) {
LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
return Status::InternalError("no valid Basic authorization");
return Status::InternalError<false>("no valid Basic authorization");
}

// get format of this put
if (!http_req->header(HTTP_COMPRESS_TYPE).empty() &&
iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
return Status::InternalError("compress data of JSON format is not supported.");
return Status::InternalError<false>("compress data of JSON format is not supported.");
}
std::string format_str = http_req->header(HTTP_FORMAT_KEY);
if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
Expand All @@ -252,8 +252,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
LoadUtil::parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format,
&ctx->compress_type);
if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
return Status::InternalError("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
return Status::InternalError<false>("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
}

// check content length
Expand All @@ -271,16 +271,16 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// json max body size
if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
(ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
return Status::InternalError(
return Status::InternalError<false>(
"The size of this batch exceed the max size [{}] of json type data "
" data [ {} ]. Split the file, or use 'read_json_by_line'",
json_max_body_bytes, ctx->body_bytes);
}
// csv max body size
else if (ctx->body_bytes > csv_max_body_bytes) {
LOG(WARNING) << "body exceed max size." << ctx->brief();
return Status::InternalError("body exceed max size: {}, data: {}", csv_max_body_bytes,
ctx->body_bytes);
return Status::InternalError<false>("body exceed max size: {}, data: {}",
csv_max_body_bytes, ctx->body_bytes);
}
} else {
#ifndef BE_TEST
Expand All @@ -298,13 +298,14 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
!ctx->is_chunked_transfer))) {
LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set "
"content_length or transfer-encoding=chunked";
return Status::InternalError(
return Status::InternalError<false>(
"content_length is empty and transfer-encoding!=chunked, please set content_length "
"or transfer-encoding=chunked");
} else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
ctx->is_chunked_transfer)) {
LOG(WARNING) << "please do not set both content_length and transfer-encoding";
return Status::InternalError("please do not set both content_length and transfer-encoding");
return Status::InternalError<false>(
"please do not set both content_length and transfer-encoding");
}

if (!http_req->header(HTTP_TIMEOUT).empty()) {
Expand Down Expand Up @@ -716,7 +717,8 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
return Status::InternalError<false>(
"group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
Expand All @@ -729,7 +731,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
ss << "This stream load content length <0 (" << content_length
<< "), please check your content length.";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
return Status::InternalError<false>(ss.str());
}
// allow chunked stream load in flink
auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
Expand All @@ -750,7 +752,8 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InternalError("label and group_commit can't be set at the same time");
return Status::InternalError<false>(
"label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
Expand Down
17 changes: 16 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,8 @@ Status SegmentIterator::_lookup_ordinal_from_pk_index(const RowCursor& key, bool
// for mow with cluster key table, we should get key range from short key index.
DCHECK(_segment->_tablet_schema->cluster_key_idxes().empty());

if (has_seq_col) {
// if full key is exact_match, the primary key without sequence column should also the same
if (has_seq_col && !exact_match) {
size_t seq_col_length =
_segment->_tablet_schema->column(_segment->_tablet_schema->sequence_col_idx())
.length() +
Expand Down Expand Up @@ -2342,6 +2343,15 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
return Status::EndOfFile("no more data in segment");
}

DBUG_EXECUTE_IF("segment_iterator._rowid_result_for_index", {
for (auto& iter : _rowid_result_for_index) {
if (iter.second.first) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"_rowid_result_for_index exists true");
}
}
})

if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
if (_non_predicate_columns.empty()) {
return Status::InternalError("_non_predicate_columns is empty");
Expand Down Expand Up @@ -2731,6 +2741,11 @@ void SegmentIterator::_calculate_pred_in_remaining_conjunct_root(
} else if (_is_literal_node(node_type)) {
auto v_literal_expr = static_cast<const doris::vectorized::VLiteral*>(expr.get());
_column_predicate_info->query_values.insert(v_literal_expr->value());
} else if (node_type == TExprNodeType::NULL_LITERAL) {
if (!_column_predicate_info->column_name.empty()) {
auto v_literal_expr = static_cast<const doris::vectorized::VLiteral*>(expr.get());
_column_predicate_info->query_values.insert(v_literal_expr->value());
}
} else {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ Status RowsetBuilder::check_tablet_version_count() {

Status RowsetBuilder::prepare_txn() {
std::shared_lock base_migration_lock(tablet()->get_migration_lock(), std::defer_lock);
if (!base_migration_lock.try_lock_for(std::chrono::milliseconds(30))) {
return Status::Error<TRY_LOCK_FAILED>("try migration lock failed");
if (!base_migration_lock.try_lock_for(
std::chrono::milliseconds(config::migration_lock_timeout_ms))) {
return Status::Error<TRY_LOCK_FAILED>("try_lock migration lock failed after {}ms",
config::migration_lock_timeout_ms);
}
std::lock_guard<std::mutex> push_lock(tablet()->get_push_lock());
return _engine.txn_manager()->prepare_txn(_req.partition_id, *tablet(), _req.txn_id,
Expand Down
4 changes: 1 addition & 3 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,6 @@ void AnalyticLocalState::_destroy_agg_status() {
}
}

//now is execute for lead/lag row_number/rank/dense_rank/ntile functions
//sum min max count avg first_value last_value functions
void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, int64_t partition_end,
int64_t frame_start, int64_t frame_end) {
for (size_t i = 0; i < _agg_functions_size; ++i) {
Expand All @@ -296,7 +294,7 @@ void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, int64_t
partition_start, partition_end, frame_start, frame_end,
_fn_place_ptr +
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i],
agg_columns.data(), nullptr);
agg_columns.data(), _agg_arena_pool.get());

// If the end is not greater than the start, the current window should be empty.
_current_window_empty =
Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
VLOG_DEBUG << "query: " << print_id(query_id)
<< " hash probe revoke done, node: " << p.node_id()
<< ", task: " << state->task_id();
_dependency->set_ready();
return Status::OK();
};

Expand Down Expand Up @@ -335,7 +334,6 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
<< ", task id: " << state->task_id();
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
shared_state_sptr->spilled_streams[partition_index].reset();
_dependency->set_ready();
};

auto exception_catch_func = [read_func, query_id, mem_tracker, shared_state_holder,
Expand All @@ -361,6 +359,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
_spill_status_ok = false;
_spill_status = std::move(status);
}
_dependency->set_ready();
};

auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
Expand Down Expand Up @@ -423,8 +422,6 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
}

_dependency->set_ready();
};

auto exception_catch_func = [read_func, mem_tracker, shared_state_holder, execution_context,
Expand All @@ -450,6 +447,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
_spill_status_ok = false;
_spill_status = std::move(status);
}
_dependency->set_ready();
};

auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
Expand Down
Loading

0 comments on commit 7d42b58

Please sign in to comment.