-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support _state/_union/_merge aggregate funciton combinator
Signed-off-by: shuming.li <[email protected]>
- Loading branch information
Showing
38 changed files
with
4,715 additions
and
201 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#pragma once | ||
|
||
#include "column/vectorized_fwd.h" | ||
#include "exprs/agg/aggregate.h" | ||
|
||
namespace starrocks { | ||
struct AggStateMergeState {}; | ||
|
||
/** | ||
* @brief Merge combinator for aggregate function to merge the agg state to return the final result of aggregate function. | ||
* DESC: return_type {agg_func}_merge(immediate_type) | ||
* input type : aggregate function's immediate_type | ||
* intermediate type : aggregate function's immediate_type | ||
* return type : aggregate function's return type | ||
*/ | ||
class AggStateMerge final : public AggregateFunctionBatchHelper<AggStateMergeState, AggStateMerge> { | ||
public: | ||
AggStateMerge(AggStateDesc agg_state_desc, const AggregateFunction* function) | ||
: _agg_state_desc(std::move(agg_state_desc)), _function(function) { | ||
DCHECK(_function != nullptr); | ||
} | ||
const AggStateDesc* get_agg_state_desc() const { return &_agg_state_desc; } | ||
|
||
void create(FunctionContext* ctx, AggDataPtr __restrict ptr) const override { _function->create(ctx, ptr); } | ||
|
||
void destroy(FunctionContext* ctx, AggDataPtr __restrict ptr) const override { _function->destroy(ctx, ptr); } | ||
|
||
size_t size() const override { return _function->size(); } | ||
|
||
size_t alignof_size() const override { return _function->alignof_size(); } | ||
|
||
bool is_pod_state() const override { return _function->is_pod_state(); } | ||
|
||
void reset(FunctionContext* ctx, const Columns& args, AggDataPtr state) const override { | ||
_function->reset(ctx, args, state); | ||
} | ||
|
||
void update(FunctionContext* ctx, const Column** columns, AggDataPtr __restrict state, | ||
size_t row_num) const override { | ||
_function->merge(ctx, columns[0], state, row_num); | ||
} | ||
|
||
void merge(FunctionContext* ctx, const Column* column, AggDataPtr __restrict state, size_t row_num) const override { | ||
_function->merge(ctx, column, state, row_num); | ||
} | ||
|
||
void get_values(FunctionContext* ctx, ConstAggDataPtr __restrict state, Column* dst, size_t start, | ||
size_t end) const override { | ||
DCHECK_GT(end, start); | ||
_function->get_values(ctx, state, dst, start, end); | ||
} | ||
|
||
void serialize_to_column([[maybe_unused]] FunctionContext* ctx, ConstAggDataPtr __restrict state, | ||
Column* to) const override { | ||
_function->serialize_to_column(ctx, state, to); | ||
} | ||
|
||
void convert_to_serialize_format([[maybe_unused]] FunctionContext* ctx, const Columns& srcs, size_t chunk_size, | ||
ColumnPtr* dst) const override { | ||
DCHECK_EQ(1, srcs.size()); | ||
*dst = srcs[0]; | ||
} | ||
|
||
void finalize_to_column(FunctionContext* ctx __attribute__((unused)), ConstAggDataPtr __restrict state, | ||
Column* to) const override { | ||
_function->finalize_to_column(ctx, state, to); | ||
} | ||
|
||
std::string get_name() const override { return "agg_state_merge"; } | ||
|
||
private: | ||
const AggStateDesc _agg_state_desc; | ||
const AggregateFunction* _function; | ||
}; | ||
|
||
} // namespace starrocks |
Oops, something went wrong.