From e9206b62edb3f94289693f0821e996eed644c24c Mon Sep 17 00:00:00 2001 From: Murphy Date: Thu, 19 Sep 2024 19:47:37 +0800 Subject: [PATCH] impl Signed-off-by: Murphy --- be/src/column/column_helper.cpp | 31 +- be/src/column/column_helper.h | 6 +- be/src/exec/join_hash_map.cpp | 45 +-- be/src/exec/join_hash_map.h | 12 +- be/src/exec/join_hash_map.tpp | 34 +-- .../spillable_hash_join_build_operator.cpp | 2 +- .../spillable_hash_join_build_operator.h | 2 +- be/src/exec/spill/mem_table.cpp | 2 +- be/src/storage/chunk_helper.cpp | 265 ++++++++++++++++++ be/src/storage/chunk_helper.h | 61 ++-- 10 files changed, 374 insertions(+), 86 deletions(-) diff --git a/be/src/column/column_helper.cpp b/be/src/column/column_helper.cpp index 2cb9675f9288e1..c49518ba02b5c1 100644 --- a/be/src/column/column_helper.cpp +++ b/be/src/column/column_helper.cpp @@ -25,6 +25,7 @@ #include "column/vectorized_fwd.h" #include "gutil/casts.h" #include "simd/simd.h" +#include "storage/chunk_helper.h" #include "types/logical_type_infra.h" #include "util/date_func.h" #include "util/percentile_value.h" @@ -469,7 +470,7 @@ size_t ChunkSliceTemplate::skip(size_t skip_rows) { // Cutoff required rows from this chunk template -Ptr ChunkSliceTemplate::cutoff(size_t required_rows) { +ChunkUniquePtr ChunkSliceTemplate::cutoff(size_t required_rows) { DCHECK(!empty()); size_t cut_rows = std::min(rows(), required_rows); auto res = chunk->clone_empty(cut_rows); @@ -482,7 +483,35 @@ Ptr ChunkSliceTemplate::cutoff(size_t required_rows) { return res; } +// Specialized for SegmentedChunkPtr +template <> +ChunkUniquePtr ChunkSliceTemplate::cutoff(size_t required_rows) { + DCHECK(!empty()); + // cutoff a chunk from current segment, if it doesn't meet the requirement just let it be + ChunkPtr segment = chunk->segments()[segment_id]; + size_t segment_offset = offset % chunk->segment_size(); + size_t cut_rows = std::min(segment->num_rows() - segment_offset, required_rows); + + auto res = segment->clone_empty(cut_rows); + res->append(*segment, segment_offset, cut_rows); + offset += cut_rows; + + // move to next segment and release previous one + size_t new_segment_id = offset / chunk->segment_size(); + if (new_segment_id != segment_id) { + chunk->segments()[segment_id].reset(); + segment_id = new_segment_id; + } + + if (empty()) { + chunk.reset(); + offset = 0; + } + return res; +} + template struct ChunkSliceTemplate; template struct ChunkSliceTemplate; +template struct ChunkSliceTemplate; } // namespace starrocks diff --git a/be/src/column/column_helper.h b/be/src/column/column_helper.h index 9d6cea7fe5b00b..b31c27635b05bc 100644 --- a/be/src/column/column_helper.h +++ b/be/src/column/column_helper.h @@ -345,8 +345,6 @@ class ColumnHelper { } } - static SegmentedColumnPtr get_data_column(SegmentedColumnPtr column); - static BinaryColumn* get_binary_column(Column* column) { return down_cast(get_data_column(column)); } static bool is_all_const(const Columns& columns); @@ -515,12 +513,13 @@ class ColumnHelper { template struct ChunkSliceTemplate { Ptr chunk; + size_t segment_id = 0; size_t offset = 0; bool empty() const; size_t rows() const; size_t skip(size_t skip_rows); - Ptr cutoff(size_t required_rows); + ChunkUniquePtr cutoff(size_t required_rows); void reset(Ptr input); }; @@ -550,5 +549,6 @@ APPLY_FOR_ALL_STRING_TYPE(GET_CONTAINER) using ChunkSlice = ChunkSliceTemplate; using ChunkSharedSlice = ChunkSliceTemplate; +using SegmentedChunkSlice = ChunkSliceTemplate; } // namespace starrocks diff --git a/be/src/exec/join_hash_map.cpp b/be/src/exec/join_hash_map.cpp index d04789ee8b2dc4..b4e56ed411253e 100644 --- a/be/src/exec/join_hash_map.cpp +++ b/be/src/exec/join_hash_map.cpp @@ -335,7 +335,7 @@ void JoinHashTable::create(const HashTableParam& param) { _probe_state->probe_counter = param.probe_counter; } - _table_items->build_chunk = std::make_shared(); + _table_items->build_chunk = std::make_shared(param.build_chunk_segment_size); _table_items->with_other_conjunct = param.with_other_conjunct; _table_items->join_type = param.join_type; _table_items->mor_reader_mode = param.mor_reader_mode; @@ -545,15 +545,6 @@ Status JoinHashTable::build(RuntimeState* state) { RETURN_IF_ERROR(_table_items->build_chunk->upgrade_if_overflow()); _table_items->has_large_column = _table_items->build_chunk->has_large_column(); - // If the join key is column ref of build chunk, fetch from build chunk directly - size_t join_key_count = _table_items->join_keys.size(); - for (size_t i = 0; i < join_key_count; i++) { - if (_table_items->join_keys[i].col_ref != nullptr) { - SlotId slot_id = _table_items->join_keys[i].col_ref->slot_id(); - _table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id); - } - } - RETURN_IF_ERROR(_upgrade_key_columns_if_overflow()); _hash_map_type = _choose_join_hash_map(); @@ -626,31 +617,22 @@ Status JoinHashTable::probe_remain(RuntimeState* state, ChunkPtr* chunk, bool* e } void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_columns) { - auto& columns = _table_items->build_chunk->columns(); - + // TODO: simplify the SlotId mapping, if the slot of input chunk is same as build_chunk, we don't need to remap them + std::vector slots; for (size_t i = 0; i < _table_items->build_column_count; i++) { SlotDescriptor* slot = _table_items->build_slots[i].slot; - ColumnPtr& column = chunk->get_column_by_slot_id(slot->id()); - - if (!columns[i]->is_nullable() && column->is_nullable()) { - // upgrade to nullable column - columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0)); - } - columns[i]->append(*column); + slots.push_back(slot->id()); } + _table_items->build_chunk->append_chunk(chunk, slots); for (size_t i = 0; i < _table_items->key_columns.size(); i++) { - // If the join key is slot ref, will get from build chunk directly, - // otherwise will append from key_column of input - if (_table_items->join_keys[i].col_ref == nullptr) { - // upgrade to nullable column - if (!_table_items->key_columns[i]->is_nullable() && key_columns[i]->is_nullable()) { - size_t row_count = _table_items->key_columns[i]->size(); - _table_items->key_columns[i] = - NullableColumn::create(_table_items->key_columns[i], NullColumn::create(row_count, 0)); - } - _table_items->key_columns[i]->append(*key_columns[i]); + // upgrade to nullable column + if (!_table_items->key_columns[i]->is_nullable() && key_columns[i]->is_nullable()) { + size_t row_count = _table_items->key_columns[i]->size(); + _table_items->key_columns[i] = + NullableColumn::create(_table_items->key_columns[i], NullColumn::create(row_count, 0)); } + _table_items->key_columns[i]->append(*key_columns[i]); } _table_items->row_count += chunk->num_rows(); @@ -665,10 +647,11 @@ void JoinHashTable::merge_ht(const JoinHashTable& ht) { for (size_t i = 0; i < _table_items->build_column_count; i++) { if (!columns[i]->is_nullable() && other_columns[i]->is_nullable()) { // upgrade to nullable column - columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0)); + // columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0)); + columns[i]->upgrade_to_nullable(); } - columns[i]->append(*other_columns[i], 1, other_columns[i]->size() - 1); } + _table_items->build_chunk->append(ht._table_items->build_chunk, 1); } ChunkPtr JoinHashTable::convert_to_spill_schema(const ChunkPtr& chunk) const { diff --git a/be/src/exec/join_hash_map.h b/be/src/exec/join_hash_map.h index fcfeb0280c8e5b..9de6bd544d15d4 100644 --- a/be/src/exec/join_hash_map.h +++ b/be/src/exec/join_hash_map.h @@ -102,7 +102,7 @@ struct HashTableSlotDescriptor { struct JoinHashTableItems { //TODO: memory continues problem? SegmentedChunkPtr build_chunk = nullptr; - SegmentedColumns key_columns; + Columns key_columns; std::vector build_slots; std::vector probe_slots; // A hash value is the bucket index of the hash map. "JoinHashTableItems.first" is the @@ -295,6 +295,9 @@ struct HashTableParam { RuntimeProfile::Counter* output_probe_column_timer = nullptr; RuntimeProfile::Counter* probe_counter = nullptr; bool mor_reader_mode = false; + + // TODO: optimize this according to chunk width + size_t build_chunk_segment_size = 2 << 15; }; template @@ -394,7 +397,7 @@ class JoinBuildFunc { using ColumnType = typename RunTimeTypeTraits::ColumnType; static void prepare(RuntimeState* runtime, JoinHashTableItems* table_items); - static const Buffer& get_key_data(const JoinHashTableItems& table_items, size_t segment_index); + static const Buffer& get_key_data(const JoinHashTableItems& table_items); static void construct_hash_table(RuntimeState* state, JoinHashTableItems* table_items, HashTableProbeState* probe_state); }; @@ -689,9 +692,10 @@ class JoinHashMap { void _copy_probe_nullable_column(ColumnPtr* src_column, ChunkPtr* chunk, const SlotDescriptor* slot); - void _copy_build_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, bool to_nullable); + void _copy_build_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, + bool to_nullable); - void _copy_build_nullable_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot); + void _copy_build_nullable_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot); void _probe_index_output(ChunkPtr* chunk); void _build_index_output(ChunkPtr* chunk); diff --git a/be/src/exec/join_hash_map.tpp b/be/src/exec/join_hash_map.tpp index 2d093679409caf..ba0a49f0a51cc3 100644 --- a/be/src/exec/join_hash_map.tpp +++ b/be/src/exec/join_hash_map.tpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "column/vectorized_fwd.h" #include "simd/simd.h" #include "util/runtime_profile.h" @@ -31,13 +32,13 @@ void JoinBuildFunc::prepare(RuntimeState* runtime, JoinHashTableItems* table template const Buffer::CppType>& JoinBuildFunc::get_key_data( - const JoinHashTableItems& table_items, size_t segment_index) { + const JoinHashTableItems& table_items) { ColumnPtr data_column; if (table_items.key_columns[0]->is_nullable()) { - auto* null_column = ColumnHelper::as_raw_column(table_items.key_columns[0]->get_segmented_column(segment_index)); + auto* null_column = ColumnHelper::as_raw_column(table_items.key_columns[0]); data_column = null_column->data_column(); } else { - data_column = table_items.key_columns[0]->get_segmented_column(segment_index); + data_column = table_items.key_columns[0]; } if constexpr (lt_is_string) { @@ -54,12 +55,9 @@ const Buffer::CppType>& JoinBuildFunc::get_key_da template void JoinBuildFunc::construct_hash_table(RuntimeState* state, JoinHashTableItems* table_items, HashTableProbeState* probe_state) { - size_t table_index = 1; - for (int i = 0; i < table_items->key_columns[0]->num_segments(); i++) { auto& data = get_key_data(*table_items); - if (table_items->key_columns[0]->is_nullable()) { - auto* nullable_column = ColumnHelper::as_raw_column(table_items->key_columns[0]->get_segmented_column(i)); + auto* nullable_column = ColumnHelper::as_raw_column(table_items->key_columns[0]); auto& null_array = nullable_column->null_column()->get_data(); for (size_t i = 1; i < table_items->row_count + 1; i++) { if (null_array[i] == 0) { @@ -75,7 +73,6 @@ void JoinBuildFunc::construct_hash_table(RuntimeState* state, JoinHashTableI table_items->first[bucket_num] = i; } } - } table_items->calculate_ht_info(table_items->key_columns[0]->byte_size()); } @@ -417,6 +414,7 @@ void JoinHashMap::probe_prepare(RuntimeState* state) { template void JoinHashMap::build(RuntimeState* state) { + _table_items->build_chunk->append_finished(); BuildFunc().construct_hash_table(state, _table_items, _probe_state); } @@ -623,7 +621,7 @@ void JoinHashMap::_build_output(ChunkPtr* chunk) { bool need_output = is_lazy ? hash_table_slot.need_lazy_materialize : hash_table_slot.need_output; if (need_output) { - ColumnPtr& column = _table_items->build_chunk->columns()[i]; + auto& column = _table_items->build_chunk->columns()[i]; if (!column->is_nullable()) { _copy_build_column(column, chunk, slot, to_nullable); } else { @@ -688,11 +686,10 @@ void JoinHashMap::_copy_probe_nullable_column(ColumnPt } template -void JoinHashMap::_copy_build_column(const ColumnPtr& src_column, ChunkPtr* chunk, +void JoinHashMap::_copy_build_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, bool to_nullable) { if (to_nullable) { - auto data_column = src_column->clone_empty(); - data_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count); + auto data_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count); // When left outer join is executed, // build_index[i] Equal to 0 means it is not found in the hash table, @@ -708,18 +705,15 @@ void JoinHashMap::_copy_build_column(const ColumnPtr& auto dest_column = NullableColumn::create(std::move(data_column), null_column); (*chunk)->append_column(std::move(dest_column), slot->id()); } else { - auto dest_column = src_column->clone_empty(); - dest_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count); - (*chunk)->append_column(std::move(dest_column), slot->id()); + auto data_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count); + (*chunk)->append_column(std::move(data_column), slot->id()); } } template -void JoinHashMap::_copy_build_nullable_column(const ColumnPtr& src_column, ChunkPtr* chunk, - const SlotDescriptor* slot) { - ColumnPtr dest_column = src_column->clone_empty(); - - dest_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count); +void JoinHashMap::_copy_build_nullable_column(const SegmentedColumnPtr& src_column, + ChunkPtr* chunk, const SlotDescriptor* slot) { + ColumnPtr dest_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count); // When left outer join is executed, // build_index[i] Equal to 0 means it is not found in the hash table, diff --git a/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.cpp b/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.cpp index 3c0166e292d2f9..e2dae20f5fc6b6 100644 --- a/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.cpp +++ b/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.cpp @@ -259,7 +259,7 @@ StatusOr()>> SpillableHashJoinBuildOperator::_c } } - auto chunk = _hash_table_build_chunk_slice.cutoff(runtime_state()->chunk_size()); + ChunkPtr chunk = _hash_table_build_chunk_slice.cutoff(runtime_state()->chunk_size()); RETURN_IF_ERROR(chunk->downgrade()); RETURN_IF_ERROR(append_hash_columns(chunk)); _join_builder->update_build_rows(chunk->num_rows()); diff --git a/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h b/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h index 723f8232e39d74..77c253d1257f60 100644 --- a/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h +++ b/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h @@ -66,7 +66,7 @@ class SpillableHashJoinBuildOperator final : public HashJoinBuildOperator { size_t _hash_table_iterate_idx = 0; std::vector _hash_tables; - ChunkSharedSlice _hash_table_build_chunk_slice; + SegmentedChunkSlice _hash_table_build_chunk_slice; std::function()> _hash_table_slice_iterator; bool _is_first_time_spill = true; DECLARE_ONCE_DETECTOR(_set_finishing_once); diff --git a/be/src/exec/spill/mem_table.cpp b/be/src/exec/spill/mem_table.cpp index c89f112d91e7d1..98cd79270f4e41 100644 --- a/be/src/exec/spill/mem_table.cpp +++ b/be/src/exec/spill/mem_table.cpp @@ -167,7 +167,7 @@ Status OrderedMemTable::finalize(workgroup::YieldContext& yield_ctx, const Spill return Status::OK(); } SCOPED_RAW_TIMER(&yield_ctx.time_spent_ns); - auto chunk = _chunk_slice.cutoff(_runtime_state->chunk_size()); + ChunkPtr chunk = _chunk_slice.cutoff(_runtime_state->chunk_size()); bool need_aligned = _runtime_state->spill_enable_direct_io(); RETURN_IF_ERROR(serde->serialize(_runtime_state, serde_ctx, chunk, output, need_aligned)); diff --git a/be/src/storage/chunk_helper.cpp b/be/src/storage/chunk_helper.cpp index e948e9ca94afd9..9b15bb5fe45eee 100644 --- a/be/src/storage/chunk_helper.cpp +++ b/be/src/storage/chunk_helper.cpp @@ -14,6 +14,9 @@ #include "storage/chunk_helper.h" +#include +#include + #include "column/array_column.h" #include "column/chunk.h" #include "column/column_helper.h" @@ -22,6 +25,7 @@ #include "column/schema.h" #include "column/struct_column.h" #include "column/type_traits.h" +#include "column/vectorized_fwd.h" #include "gutil/strings/fastmem.h" #include "runtime/current_thread.h" #include "runtime/descriptors.h" @@ -604,4 +608,265 @@ bool ChunkPipelineAccumulator::is_finished() const { return _finalized && _out_chunk == nullptr && _in_chunk == nullptr; } +class SegmentedColumnVisitor final : public ColumnVisitorAdapter { +public: + SegmentedColumnVisitor(SegmentedColumnPtr segment_column, const uint32_t* indexes, uint32_t from, uint32_t size) + : ColumnVisitorAdapter(this), + _segment_column(std::move(segment_column)), + _indexes(indexes), + _from(from), + _size(size) {} + + template + Status do_visit(const FixedLengthColumnBase& column) { + using ColumnT = FixedLengthColumnBase; + using ContainerT = Buffer*; + + auto output = ColumnHelper::as_column(_result); + auto& columns = _segment_column->columns(); + size_t segment_size = _segment_column->segment_size(); + std::vector buffers; + for (auto& seg_column : columns) { + buffers.push_back(&ColumnHelper::as_column(seg_column)->get_data()); + } + + for (uint32_t i = _from; i < _size; i++) { + uint32_t idx = _indexes[i]; + int segment_id = idx / segment_size; + int segment_offset = idx % segment_size; + output->append((*buffers[segment_id])[segment_offset]); + } + return {}; + } + + template + Status do_visit(const BinaryColumnBase& column) { + using ColumnT = BinaryColumnBase; + using ContainerT = typename ColumnT::Container*; + + auto output = ColumnHelper::as_column(_result); + auto& columns = _segment_column->columns(); + size_t segment_size = _segment_column->segment_size(); + std::vector buffers; + for (auto& seg_column : columns) { + buffers.push_back(&ColumnHelper::as_column(seg_column)->get_data()); + } + + for (uint32_t i = _from; i < _size; i++) { + uint32_t idx = _indexes[i]; + int segment_id = idx / segment_size; + int segment_offset = idx % segment_size; + output->append((*buffers[segment_id])[segment_offset]); + } + return {}; + } + + // TODO + template + Status do_visit(const ObjectColumn& column) { + return Status::NotSupported("SegmentedColumnVisitor"); + } + + Status do_visit(const ArrayColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); } + Status do_visit(const MapColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); } + Status do_visit(const StructColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); } + Status do_visit(const ConstColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); } + Status do_visit(const NullableColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); } + + ColumnPtr result() { return _result; } + +private: + SegmentedColumnPtr _segment_column; + ColumnPtr _result; + const uint32_t* _indexes; + uint32_t _from; + uint32_t _size; +}; + +SegmentedColumn::SegmentedColumn(SegmentedChunkPtr chunk, size_t column_index) : _chunk(std::move(chunk)) { + for (auto& segment : _chunk->segments()) { + _columns.push_back(segment->get_column_by_index(column_index)); + } +} + +ColumnPtr SegmentedColumn::clone_selective(const uint32_t* indexes, uint32_t from, uint32_t size) { + SegmentedColumnVisitor visitor(shared_from_this(), indexes, from, size); + (void)_columns[0]->accept(&visitor); + return visitor.result(); +} + +size_t SegmentedColumn::segment_size() const { + return _chunk->segment_size(); +} + +size_t SegmentedChunk::segment_size() const { + return _segment_size; +} + +bool SegmentedColumn::is_nullable() const { + return _columns[0]->is_nullable(); +} + +bool SegmentedColumn::has_null() const { + for (auto& column : _columns) { + RETURN_IF(column->has_null(), true); + } + return false; +} + +size_t SegmentedColumn::size() const { + size_t result = 0; + for (auto& column : _columns) { + result += column->size(); + } + return result; +} + +const std::vector& SegmentedColumn::columns() const { + return _columns; +} + +void SegmentedColumn::upgrade_to_nullable() { + for (auto& column : _columns) { + column = NullableColumn::wrap_if_necessary(column); + } +} + +SegmentedChunk::SegmentedChunk(size_t segment_size) : _segment_size(segment_size) { + // put at least one chunk there + _segments.resize(1); +} + +void SegmentedChunk::append_column(ColumnPtr column, SlotId slot_id) { + _segments.resize(1); + _segments[0] = std::make_shared(); + _segments[0]->append_column(std::move(column), slot_id); +} + +void SegmentedChunk::append_chunk(const ChunkPtr& chunk, const std::vector& slots) { + ChunkPtr open_segment = _segments[_segments.size() - 1]; + size_t append_rows = chunk->num_rows(); + size_t append_index = 0; + while (append_rows > 0) { + size_t open_segment_append_rows = std::min(_segment_size - open_segment->num_rows(), append_rows); + for (int i = 0; i < slots.size(); i++) { + SlotId slot = slots[i]; + ColumnPtr column = chunk->get_column_by_slot_id(slot); + open_segment->columns()[i]->append(*column, append_index, open_segment_append_rows); + } + append_index += open_segment_append_rows; + append_rows -= open_segment_append_rows; + if (open_segment->num_rows() == _segment_size) { + _segments.emplace_back(); + } + } +} + +void SegmentedChunk::append_chunk(const ChunkPtr& chunk) { + ChunkPtr open_segment = _segments[_segments.size() - 1]; + size_t append_rows = chunk->num_rows(); + size_t append_index = 0; + while (append_rows > 0) { + size_t open_segment_append_rows = std::min(_segment_size - open_segment->num_rows(), append_rows); + for (int i = 0; i < chunk->num_columns(); i++) { + ColumnPtr column = chunk->get_column_by_index(i); + open_segment->columns()[i]->append(*column, append_index, open_segment_append_rows); + } + append_index += open_segment_append_rows; + append_rows -= open_segment_append_rows; + if (open_segment->num_rows() == _segment_size) { + _segments.emplace_back(); + } + } +} + +void SegmentedChunk::append(const SegmentedChunkPtr& chunk, size_t offset) { + auto& input_segments = chunk->segments(); + size_t segment_index = offset / chunk->_segment_size; + size_t segment_offset = offset % chunk->_segment_size; + for (size_t i = segment_index; i < chunk->num_segments(); i++) { + // The segment need to cutoff + if (i == segment_index) { + auto cutoff = input_segments[i]->clone_empty(); + size_t count = input_segments[i]->num_rows() - segment_offset; + cutoff->append(*cutoff, segment_offset, count); + ChunkPtr shared(cutoff.release()); + append_chunk(std::move(shared)); + } else { + append_chunk(input_segments[i]); + } + } +} + +void SegmentedChunk::append_finished() { + size_t num_columns = _segments[0]->num_columns(); + for (int i = 0; i < num_columns; i++) { + _columns.emplace_back(std::make_shared(shared_from_this(), i)); + } +} + +size_t SegmentedChunk::memory_usage() const { + size_t result = 0; + for (auto& chunk : _segments) { + result += chunk->memory_usage(); + } + return result; +} + +size_t SegmentedChunk::num_rows() const { + size_t result = 0; + for (auto& chunk : _segments) { + result += chunk->num_rows(); + } + return result; +} + +const SegmentedColumns& SegmentedChunk::columns() const { + return _columns; +} + +SegmentedColumns& SegmentedChunk::columns() { + return _columns; +} + +Status SegmentedChunk::upgrade_if_overflow() { + for (auto& chunk : _segments) { + RETURN_IF_ERROR(chunk->upgrade_if_overflow()); + } + return {}; +} + +Status SegmentedChunk::downgrade() { + for (auto& chunk : _segments) { + RETURN_IF_ERROR(chunk->downgrade()); + } + return {}; +} + +bool SegmentedChunk::has_large_column() const { + for (auto& chunk : _segments) { + if (chunk->has_large_column()) { + return true; + } + } + return false; +} + +size_t SegmentedChunk::num_segments() const { + return _segments.size(); +} + +const std::vector& SegmentedChunk::segments() const { + return _segments; +} +std::vector& SegmentedChunk::segments() { + return _segments; +} + +void SegmentedChunk::reset() { + for (auto& chunk : _segments) { + chunk.reset(); + } +} + } // namespace starrocks diff --git a/be/src/storage/chunk_helper.h b/be/src/storage/chunk_helper.h index 755433694750f5..2223adb56ab494 100644 --- a/be/src/storage/chunk_helper.h +++ b/be/src/storage/chunk_helper.h @@ -17,6 +17,10 @@ #include #include +#include "column/column_visitor.h" +#include "column/column_visitor_adapter.h" +#include "column/datum.h" +#include "column/fixed_length_column_base.h" #include "column/vectorized_fwd.h" #include "storage/olap_common.h" #include "storage/olap_type_infra.h" @@ -146,48 +150,57 @@ class ChunkPipelineAccumulator { bool _finalized = false; }; -class SegmentedColumn { +class SegmentedColumn : std::enable_shared_from_this { public: - void append(const Column& src); - void append(const Column& src, size_t offset, size_t count); + SegmentedColumn(SegmentedChunkPtr chunk, size_t column_index); + ~SegmentedColumn() = default; + + ColumnPtr clone_selective(const uint32_t* indexes, uint32_t from, uint32_t size); + bool is_nullable() const; bool has_null() const; size_t size() const; - StatusOr upgrade_if_overflow(); - - template - std::deque> get_data(); - - std::vector null_data() const; - - size_t num_segments() const; - ColumnPtr get_segmented_column(size_t segment_index); - - size_t byte_size() const; + void upgrade_to_nullable(); + const std::vector& columns() const; + size_t segment_size() const; private: + SegmentedChunkPtr _chunk; // The chunk it belongs to + std::vector _columns; // All segmented columns }; // A big-chunk would be segmented into multi small ones, to avoid allocating large-continuous memory // It's not a transparent replacement for Chunk, but must be aware of and set a reasonale chunk_size -class SegmentedChunk { +class SegmentedChunk : std::enable_shared_from_this { public: - size_t memory_usage() const; - Status upgrade_if_overflow(); - Status downgrade(); - bool has_large_column() const; + SegmentedChunk(size_t segment_size); + ~SegmentedChunk() = default; void append_column(ColumnPtr column, SlotId slot_id); + void append_chunk(const ChunkPtr& chunk, const std::vector& slots); + void append_chunk(const ChunkPtr& chunk); + void append(const SegmentedChunkPtr& chunk, size_t offset); + void append_finished(); - const std::vector& get_chunks() const; - std::vector& get_chunks(); + size_t memory_usage() const; + size_t num_rows() const; const SegmentedColumns& columns() const; SegmentedColumns& columns(); - const SegmentedColumnPtr& get_column_by_slot_id(SlotId slot_id) const; - SegmentedColumnPtr get_column_by_slot_id(SlotId slot_id); + size_t num_segments() const; + const std::vector& segments() const; + std::vector& segments(); + size_t segment_size() const; + void reset(); + + Status upgrade_if_overflow(); + Status downgrade(); + bool has_large_column() const; private: - std::vector _chunks; + std::vector _segments; + SegmentedColumns _columns; + + const size_t _segment_size; }; } // namespace starrocks