diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 3a9156f45b6758a..fe0ab0b148e55af 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -352,17 +352,17 @@ Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) { int64_t range_start, range_end; if (!_parent->cast()._window.__isset.window_start && _parent->cast()._window.window_end.type == - TAnalyticWindowBoundaryType:: - CURRENT_ROW) { //[preceding, current_row],[current_row, following] + TAnalyticWindowBoundaryType::CURRENT_ROW) { + // [preceding, current_row], [current_row, following] rewrite it's same + // as could reuse the previous calculate result, so don't call _reset_agg_status function + // going on calculate, add up data, no need to reset state range_start = _shared_state->current_row_position; - range_end = _shared_state->current_row_position + - 1; //going on calculate,add up data, no need to reset state + range_end = _shared_state->current_row_position + 1; } else { _reset_agg_status(); range_end = _shared_state->current_row_position + _rows_end_offset + 1; - if (!_parent->cast() - ._window.__isset - .window_start) { //[preceding, offset] --unbound: [preceding, following] + //[preceding, offset] --unbound: [preceding, following] + if (!_parent->cast()._window.__isset.window_start) { range_start = _partition_by_start.pos; } else { range_start = _shared_state->current_row_position + _rows_start_offset; diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h index dca76ea49a3f486..f2fbd83a605ccb0 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h +++ b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h @@ -65,8 +65,6 @@ struct Value { _offset = 0; } - size_t offset_pos() { return _offset; } - protected: const IColumn* _ptr = nullptr; size_t _offset = 0; @@ -102,11 +100,6 @@ struct CopiedValue : public Value { } } - size_t offset_pos() { - DCHECK(false) << " should call this in CopiedValue"; - return 0; - } - private: Field _copied_value; }; @@ -148,8 +141,6 @@ struct ReaderFirstAndLastData { bool has_set_value() { return _has_value; } - size_t offset_pos() { return _data_value.offset_pos(); } - bool is_null() { return _data_value.is_null(); } protected: diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.cpp b/be/src/vec/aggregate_functions/aggregate_function_window.cpp index 9da838a6b9067c9..2d10083488b40b8 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_window.cpp @@ -40,6 +40,8 @@ AggregateFunctionPtr create_function_lead_lag_first_last(const String& name, WhichDataType which(*type); bool arg_ignore_null_value = false; + // FE have rewrite case first_value(k1,false)--->first_value(k1) + // so size is 2, must will be arg_ignore_null_value if (argument_types.size() == 2) { DCHECK(name == "first_value" || name == "last_value") << "invalid function name: " << name; arg_ignore_null_value = true; diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h b/be/src/vec/aggregate_functions/aggregate_function_window.h index ae8e364f28eb3ce..8803df7de67bbde 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.h +++ b/be/src/vec/aggregate_functions/aggregate_function_window.h @@ -473,14 +473,9 @@ struct WindowFunctionFirstImpl : Data { if constexpr (arg_ignore_null) { frame_end = std::min(frame_end, partition_end); - if (this->has_set_value()) { - frame_start = this->offset_pos(); - } - const auto& second_arg = assert_cast&>(*columns[1]); - auto ignore_null_value = second_arg.get_data()[0]; - - if (ignore_null_value && columns[0]->is_nullable()) { + if (columns[0]->is_nullable()) { const auto& arg_nullable = assert_cast(*columns[0]); + // the valid range is: [frame_start, frame_end) while (frame_start < frame_end - 1 && arg_nullable.is_null_at(frame_start)) { frame_start++; } @@ -506,17 +501,25 @@ struct WindowFunctionLastImpl : Data { if constexpr (arg_ignore_null) { frame_start = std::max(frame_start, partition_start); - if (this->has_set_value()) { - frame_start = this->offset_pos(); - } - const auto& second_arg = assert_cast&>(*columns[1]); - auto ignore_null_value = second_arg.get_data()[0]; - - if (ignore_null_value && columns[0]->is_nullable()) { + if (columns[0]->is_nullable()) { const auto& arg_nullable = assert_cast(*columns[0]); - while (frame_start < (frame_end - 1) && arg_nullable.is_null_at(frame_end - 1)) { - frame_end--; + // wants find a not null value in [frame_start, frame_end) + // iff has find: set_value and return directly + // iff not find: the while loop is finished + // case 1: iff has_set_value, means the previous window have value, could reuse it, so return directly + // case 2: iff not has_set_value, means there is none value, set it's to NULL + while (frame_start < frame_end) { + if (arg_nullable.is_null_at(frame_end - 1)) { + frame_end--; + } else { + this->set_value(columns, frame_end - 1); + return; + } + } + if (!this->has_set_value()) { + this->set_is_null(); } + return; } }