Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Add [[nodiscard]] to functions #30935

Merged
merged 2 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions be/src/exec/hash_join_components.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

namespace starrocks {

void HashJoinProber::push_probe_chunk(RuntimeState* state, ChunkPtr&& chunk) {
Status HashJoinProber::push_probe_chunk(RuntimeState* state, ChunkPtr&& chunk) {
DCHECK(!_probe_chunk);
_probe_chunk = std::move(chunk);
_current_probe_has_remain = true;
_hash_joiner.prepare_probe_key_columns(&_key_columns, _probe_chunk);
RETURN_IF_ERROR(_hash_joiner.prepare_probe_key_columns(&_key_columns, _probe_chunk));
return Status::OK();
}

StatusOr<ChunkPtr> HashJoinProber::probe_chunk(RuntimeState* state, JoinHashTable* hash_table) {
Expand Down Expand Up @@ -70,15 +71,15 @@ void HashJoinBuilder::reset(const HashTableParam& param) {

void HashJoinBuilder::reset_probe(RuntimeState* state) {
_key_columns.clear();
_ht.reset_probe_state(state);
(void)_ht.reset_probe_state(state);
satanson marked this conversation as resolved.
Show resolved Hide resolved
}

Status HashJoinBuilder::append_chunk(RuntimeState* state, const ChunkPtr& chunk) {
if (UNLIKELY(_ht.get_row_count() + chunk->num_rows() >= max_hash_table_element_size)) {
return Status::NotSupported(strings::Substitute("row count of right table in hash join > $0", UINT32_MAX));
}

_hash_joiner.prepare_build_key_columns(&_key_columns, chunk);
RETURN_IF_ERROR(_hash_joiner.prepare_build_key_columns(&_key_columns, chunk));
// copy chunk of right table
SCOPED_TIMER(_hash_joiner.build_metrics().copy_right_table_chunk_timer);
TRY_CATCH_BAD_ALLOC(_ht.append_chunk(state, chunk, _key_columns));
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/hash_join_components.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ class HashJoinProber {

bool probe_chunk_empty() const { return _probe_chunk == nullptr; }

void push_probe_chunk(RuntimeState* state, ChunkPtr&& chunk);
[[nodiscard]] Status push_probe_chunk(RuntimeState* state, ChunkPtr&& chunk);

// probe hash table
StatusOr<ChunkPtr> probe_chunk(RuntimeState* state, JoinHashTable* hash_table);
[[nodiscard]] StatusOr<ChunkPtr> probe_chunk(RuntimeState* state, JoinHashTable* hash_table);

StatusOr<ChunkPtr> probe_remain(RuntimeState* state, JoinHashTable* hash_table, bool* has_remain);
[[nodiscard]] StatusOr<ChunkPtr> probe_remain(RuntimeState* state, JoinHashTable* hash_table, bool* has_remain);

void reset();

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ Status HashJoinNode::_push_down_in_filter(RuntimeState* state) {
builder.use_as_join_runtime_filter();
Status st = builder.create();
if (!st.ok()) continue;
builder.add_values(column, kHashJoinKeyColumnOffset);
RETURN_IF_ERROR(builder.add_values(column, kHashJoinKeyColumnOffset));
_runtime_in_filters.push_back(builder.get_in_const_predicate());
}
}
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ bool HashJoiner::has_output() const {
return false;
}

void HashJoiner::push_chunk(RuntimeState* state, ChunkPtr&& chunk) {
Status HashJoiner::push_chunk(RuntimeState* state, ChunkPtr&& chunk) {
DCHECK(chunk && !chunk->is_empty());
_hash_join_prober->push_probe_chunk(state, std::move(chunk));
return _hash_join_prober->push_probe_chunk(state, std::move(chunk));
}

StatusOr<ChunkPtr> HashJoiner::pull_chunk(RuntimeState* state) {
Expand Down Expand Up @@ -337,7 +337,7 @@ void HashJoiner::reference_hash_table(HashJoiner* src_join_builder) {

void HashJoiner::set_prober_finished() {
if (++_num_finished_probers == _num_probers) {
set_finished();
(void)set_finished();
}
}
void HashJoiner::decr_prober(RuntimeState* state) {
Expand Down Expand Up @@ -546,7 +546,7 @@ Status HashJoiner::_create_runtime_in_filters(RuntimeState* state) {
if (probe_expr->type().is_string_type()) {
_string_key_columns.emplace_back(column);
}
builder.add_values(column, kHashJoinKeyColumnOffset);
RETURN_IF_ERROR(builder.add_values(column, kHashJoinKeyColumnOffset));
_runtime_in_filters.push_back(builder.get_in_const_predicate());
}
}
Expand Down
60 changes: 32 additions & 28 deletions be/src/exec/hash_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ class HashJoiner final : public pipeline::ContextWithDependency {
}
}

Status prepare_builder(RuntimeState* state, RuntimeProfile* runtime_profile);
Status prepare_prober(RuntimeState* state, RuntimeProfile* runtime_profile);
[[nodiscard]] Status prepare_builder(RuntimeState* state, RuntimeProfile* runtime_profile);
[[nodiscard]] Status prepare_prober(RuntimeState* state, RuntimeProfile* runtime_profile);
void close(RuntimeState* state) override;

bool need_input() const;
Expand All @@ -199,16 +199,16 @@ class HashJoiner final : public pipeline::ContextWithDependency {

void enter_eos_phase() { _phase = HashJoinPhase::EOS; }
// build phase
Status append_chunk_to_ht(RuntimeState* state, const ChunkPtr& chunk);
[[nodiscard]] Status append_chunk_to_ht(RuntimeState* state, const ChunkPtr& chunk);

Status append_chunk_to_spill_buffer(RuntimeState* state, const ChunkPtr& chunk);
[[nodiscard]] Status append_chunk_to_spill_buffer(RuntimeState* state, const ChunkPtr& chunk);

Status append_spill_task(RuntimeState* state, std::function<StatusOr<ChunkPtr>()>& spill_task);
[[nodiscard]] Status append_spill_task(RuntimeState* state, std::function<StatusOr<ChunkPtr>()>& spill_task);

Status build_ht(RuntimeState* state);
[[nodiscard]] Status build_ht(RuntimeState* state);
// probe phase
void push_chunk(RuntimeState* state, ChunkPtr&& chunk);
StatusOr<ChunkPtr> pull_chunk(RuntimeState* state);
[[nodiscard]] Status push_chunk(RuntimeState* state, ChunkPtr&& chunk);
[[nodiscard]] StatusOr<ChunkPtr> pull_chunk(RuntimeState* state);

pipeline::RuntimeInFilters& get_runtime_in_filters() { return _runtime_in_filters; }
pipeline::RuntimeBloomFilters& get_runtime_bloom_filters() { return _build_runtime_filters; }
Expand All @@ -220,7 +220,7 @@ class HashJoiner final : public pipeline::ContextWithDependency {

HashJoinBuilder* hash_join_builder() { return _hash_join_builder; }

Status create_runtime_filters(RuntimeState* state);
[[nodiscard]] Status create_runtime_filters(RuntimeState* state);

void reference_hash_table(HashJoiner* src_join_builder);

Expand All @@ -231,7 +231,7 @@ class HashJoiner final : public pipeline::ContextWithDependency {
bool has_referenced_hash_table() const { return _has_referenced_hash_table; }

Columns string_key_columns() { return _string_key_columns; }
Status reset_probe(RuntimeState* state);
[[nodiscard]] Status reset_probe(RuntimeState* state);

float avg_keys_per_bucket() const;

Expand Down Expand Up @@ -260,22 +260,24 @@ class HashJoiner final : public pipeline::ContextWithDependency {
void set_spill_strategy(spill::SpillStrategy strategy) { _spill_strategy = strategy; }
spill::SpillStrategy spill_strategy() { return _spill_strategy; }

void prepare_probe_key_columns(Columns* key_columns, const ChunkPtr& chunk) {
[[nodiscard]] Status prepare_probe_key_columns(Columns* key_columns, const ChunkPtr& chunk) {
SCOPED_TIMER(probe_metrics().probe_conjunct_evaluate_timer);
_prepare_key_columns(*key_columns, chunk, _probe_expr_ctxs);
RETURN_IF_ERROR(_prepare_key_columns(*key_columns, chunk, _probe_expr_ctxs));
return Status::OK();
}

void prepare_build_key_columns(Columns* key_columns, const ChunkPtr& chunk) {
[[nodiscard]] Status prepare_build_key_columns(Columns* key_columns, const ChunkPtr& chunk) {
SCOPED_TIMER(build_metrics().build_conjunct_evaluate_timer);
_prepare_key_columns(*key_columns, chunk, _build_expr_ctxs);
RETURN_IF_ERROR(_prepare_key_columns(*key_columns, chunk, _build_expr_ctxs));
return Status::OK();
}

const std::vector<ExprContext*> probe_expr_ctxs() { return _probe_expr_ctxs; }

HashJoinProber* new_prober(ObjectPool* pool) { return _hash_join_prober->clone_empty(pool); }
HashJoinBuilder* new_builder(ObjectPool* pool) { return _hash_join_builder->clone_empty(pool); }

Status filter_probe_output_chunk(ChunkPtr& chunk, JoinHashTable& hash_table) {
[[nodiscard]] Status filter_probe_output_chunk(ChunkPtr& chunk, JoinHashTable& hash_table) {
// Probe in JoinHashMap is divided into probe with other_conjuncts and without other_conjuncts.
// Probe without other_conjuncts directly labels the hash table as hit, while _process_other_conjunct()
// only remains the rows which are not hit the hash table before. Therefore, _process_other_conjunct can
Expand All @@ -292,7 +294,7 @@ class HashJoiner final : public pipeline::ContextWithDependency {
return Status::OK();
}

Status filter_post_probe_output_chunk(ChunkPtr& chunk) {
[[nodiscard]] Status filter_post_probe_output_chunk(ChunkPtr& chunk) {
// Post probe needn't process _other_join_conjunct_ctxs, because they
// are `ON` predicates, which need to be processed only on probe phase.
if (chunk && !chunk->is_empty() && !_conjunct_ctxs.empty()) {
Expand All @@ -309,7 +311,8 @@ class HashJoiner final : public pipeline::ContextWithDependency {

void _init_hash_table_param(HashTableParam* param);

Status _prepare_key_columns(Columns& key_columns, const ChunkPtr& chunk, const vector<ExprContext*>& expr_ctxs) {
[[nodiscard]] Status _prepare_key_columns(Columns& key_columns, const ChunkPtr& chunk,
const vector<ExprContext*>& expr_ctxs) {
key_columns.resize(0);
for (auto& expr_ctx : expr_ctxs) {
ASSIGN_OR_RETURN(auto column_ptr, expr_ctx->evaluate(chunk.get()));
Expand Down Expand Up @@ -361,24 +364,25 @@ class HashJoiner final : public pipeline::ContextWithDependency {
}
}

Status _build(RuntimeState* state);
[[nodiscard]] Status _build(RuntimeState* state);

StatusOr<ChunkPtr> _pull_probe_output_chunk(RuntimeState* state);
[[nodiscard]] StatusOr<ChunkPtr> _pull_probe_output_chunk(RuntimeState* state);

Status _calc_filter_for_other_conjunct(ChunkPtr* chunk, Filter& filter, bool& filter_all, bool& hit_all);
[[nodiscard]] Status _calc_filter_for_other_conjunct(ChunkPtr* chunk, Filter& filter, bool& filter_all,
bool& hit_all);
static void _process_row_for_other_conjunct(ChunkPtr* chunk, size_t start_column, size_t column_count,
bool filter_all, bool hit_all, const Filter& filter);

Status _process_outer_join_with_other_conjunct(ChunkPtr* chunk, size_t start_column, size_t column_count,
JoinHashTable& hash_table);
Status _process_semi_join_with_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table);
Status _process_right_anti_join_with_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table);
Status _process_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table);
Status _process_where_conjunct(ChunkPtr* chunk);
[[nodiscard]] Status _process_outer_join_with_other_conjunct(ChunkPtr* chunk, size_t start_column,
size_t column_count, JoinHashTable& hash_table);
[[nodiscard]] Status _process_semi_join_with_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table);
[[nodiscard]] Status _process_right_anti_join_with_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table);
[[nodiscard]] Status _process_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table);
[[nodiscard]] Status _process_where_conjunct(ChunkPtr* chunk);

Status _create_runtime_in_filters(RuntimeState* state);
[[nodiscard]] Status _create_runtime_in_filters(RuntimeState* state);

Status _create_runtime_bloom_filters(RuntimeState* state, int64_t limit);
[[nodiscard]] Status _create_runtime_bloom_filters(RuntimeState* state, int64_t limit);

private:
const THashJoinNode& _hash_join_node;
Expand Down
13 changes: 7 additions & 6 deletions be/src/exec/join_hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -759,14 +759,15 @@ class JoinHashTable {
void create(const HashTableParam& param);
void close();

Status build(RuntimeState* state);
Status reset_probe_state(RuntimeState* state);
Status probe(RuntimeState* state, const Columns& key_columns, ChunkPtr* probe_chunk, ChunkPtr* chunk, bool* eos);
Status probe_remain(RuntimeState* state, ChunkPtr* chunk, bool* eos);
[[nodiscard]] Status build(RuntimeState* state);
[[nodiscard]] Status reset_probe_state(RuntimeState* state);
[[nodiscard]] Status probe(RuntimeState* state, const Columns& key_columns, ChunkPtr* probe_chunk, ChunkPtr* chunk,
bool* eos);
[[nodiscard]] Status probe_remain(RuntimeState* state, ChunkPtr* chunk, bool* eos);

void append_chunk(RuntimeState* state, const ChunkPtr& chunk, const Columns& key_columns);
// convert input column to spill schema order
StatusOr<ChunkPtr> convert_to_spill_schema(const ChunkPtr& chunk) const;
[[nodiscard]] StatusOr<ChunkPtr> convert_to_spill_schema(const ChunkPtr& chunk) const;

const ChunkPtr& get_build_chunk() const { return _table_items->build_chunk; }
Columns& get_key_columns() { return _table_items->key_columns; }
Expand All @@ -783,7 +784,7 @@ class JoinHashTable {
JoinHashMapType _choose_join_hash_map();
static size_t _get_size_of_fixed_and_contiguous_type(LogicalType data_type);

Status _upgrade_key_columns_if_overflow();
[[nodiscard]] Status _upgrade_key_columns_if_overflow();

void _remove_duplicate_index_for_left_outer_join(Filter* filter);
void _remove_duplicate_index_for_left_semi_join(Filter* filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ bool HashJoinProbeOperator::is_ready() const {

Status HashJoinProbeOperator::push_chunk(RuntimeState* state, const ChunkPtr& chunk) {
RETURN_IF_ERROR(_reference_builder_hash_table_once());
_join_prober->push_chunk(state, std::move(const_cast<ChunkPtr&>(chunk)));
RETURN_IF_ERROR(_join_prober->push_chunk(state, std::move(const_cast<ChunkPtr&>(chunk))));
return Status::OK();
}

Expand Down
22 changes: 12 additions & 10 deletions be/src/exec/pipeline/runtime_filter_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class RefCountedRuntimeFilterProbeCollector {
_num_operators_generated(num_operators_generated),
_rf_probe_collector(std::move(rf_probe_collector)) {}

Status prepare(RuntimeState* state, const RowDescriptor& row_desc, RuntimeProfile* p) {
[[nodiscard]] Status prepare(RuntimeState* state, const RowDescriptor& row_desc, RuntimeProfile* p) {
if ((_count.fetch_sub(1) & PREPARE_COUNTER_MASK) == _num_operators_generated) {
RETURN_IF_ERROR(_rf_probe_collector.prepare(state, row_desc, p));
RETURN_IF_ERROR(_rf_probe_collector.open(state));
Expand Down Expand Up @@ -220,14 +220,16 @@ class PartialRuntimeFilterMerger {
// mark runtime_filter as always true.
bool set_always_true() {
_always_true = true;
return _try_do_merge({});
(void)_try_do_merge({});
silverbullet233 marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

// HashJoinBuildOperator call add_partial_filters to gather partial runtime filters. the last HashJoinBuildOperator
// will merge partial runtime filters into total one finally.
StatusOr<bool> add_partial_filters(size_t idx, size_t ht_row_count, RuntimeInFilters&& partial_in_filters,
OptRuntimeBloomFilterBuildParams&& partial_bloom_filter_build_params,
RuntimeBloomFilters&& bloom_filter_descriptors) {
[[nodiscard]] StatusOr<bool> add_partial_filters(
size_t idx, size_t ht_row_count, RuntimeInFilters&& partial_in_filters,
OptRuntimeBloomFilterBuildParams&& partial_bloom_filter_build_params,
RuntimeBloomFilters&& bloom_filter_descriptors) {
DCHECK(idx < _partial_bloom_filter_build_params.size());
// both _ht_row_counts, _partial_in_filters, _partial_bloom_filter_build_params are reserved beforehand,
// each HashJoinBuildOperator mutates its corresponding slot indexed by driver_sequence, so concurrent
Expand All @@ -247,7 +249,7 @@ class PartialRuntimeFilterMerger {
return {_bloom_filter_descriptors.begin(), _bloom_filter_descriptors.end()};
}

Status merge_local_in_filters() {
[[nodiscard]] Status merge_local_in_filters() {
bool can_merge_in_filters = true;
size_t num_rows = 0;
ssize_t k = -1;
Expand Down Expand Up @@ -305,7 +307,7 @@ class PartialRuntimeFilterMerger {
return Status::OK();
}

Status merge_local_bloom_filters() {
[[nodiscard]] Status merge_local_bloom_filters() {
if (_partial_bloom_filter_build_params.empty()) {
return Status::OK();
}
Expand Down Expand Up @@ -384,16 +386,16 @@ class PartialRuntimeFilterMerger {
size_t limit() const { return _limit; }

private:
bool _try_do_merge(RuntimeBloomFilters&& bloom_filter_descriptors) {
StatusOr<bool> _try_do_merge(RuntimeBloomFilters&& bloom_filter_descriptors) {
if (1 == _num_active_builders--) {
if (_always_true) {
_partial_in_filters.clear();
_bloom_filter_descriptors.clear();
return true;
}
_bloom_filter_descriptors = std::move(bloom_filter_descriptors);
merge_local_in_filters();
merge_local_bloom_filters();
RETURN_IF_ERROR(merge_local_in_filters());
RETURN_IF_ERROR(merge_local_bloom_filters());
return true;
}
return false;
Expand Down
5 changes: 2 additions & 3 deletions be/src/exec/query_cache/cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ static void delete_cache_entry(const CacheKey& key, void* value) {
delete cache_value;
}

Status CacheManager::populate(const std::string& key, const CacheValue& value) {
void CacheManager::populate(const std::string& key, const CacheValue& value) {
auto* cache_value = new CacheValue(value);
auto* handle = _cache.insert(key, cache_value, cache_value->size(), &delete_cache_entry, CachePriority::NORMAL);
DeferOp defer([this, handle]() { _cache.release(handle); });
return handle != nullptr ? Status::OK() : Status::InternalError("Insert failure");
_cache.release(handle);
}

StatusOr<CacheValue> CacheManager::probe(const std::string& key) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/query_cache/cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ class CacheManager {
public:
explicit CacheManager(size_t capacity);
~CacheManager() = default;
Status populate(const std::string& key, const CacheValue& value);
StatusOr<CacheValue> probe(const std::string& key);
void populate(const std::string& key, const CacheValue& value);
[[nodiscard]] StatusOr<CacheValue> probe(const std::string& key);
size_t memory_usage();
size_t capacity();
size_t lookup_count();
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/query_cache/cache_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,9 @@ Status CacheOperator::reset_lane(RuntimeState* state, LaneOwnerType lane_owner)
_lane_arbiter->enable_passthrough_mode();
for (auto i = 0; i <= premature_finished_idx; ++i) {
auto& multi_op = _multilane_operators[i];
multi_op->set_finished(state);
(void)multi_op->set_finished(state);
}
_scan_operator->set_finished(state);
(void)_scan_operator->set_finished(state);
}
return Status::OK();
}
Expand Down
Loading
Loading