diff --git a/be/src/exec/join_hash_map.cpp b/be/src/exec/join_hash_map.cpp index cd4a9da7c6aa0..77bee12d89339 100644 --- a/be/src/exec/join_hash_map.cpp +++ b/be/src/exec/join_hash_map.cpp @@ -542,6 +542,15 @@ Status JoinHashTable::build(RuntimeState* state) { RETURN_IF_ERROR(_table_items->build_chunk->upgrade_if_overflow()); _table_items->has_large_column = _table_items->build_chunk->has_large_column(); + // build key_columns + size_t join_key_count = _table_items->join_keys.size(); + for (size_t i = 0; i < join_key_count; i++) { + if (_table_items->join_keys[i].col_ref != nullptr) { + SlotId slot_id = _table_items->join_keys[i].col_ref->slot_id(); + _table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id)->materialize(); + } + } + RETURN_IF_ERROR(_upgrade_key_columns_if_overflow()); _hash_map_type = _choose_join_hash_map(); @@ -622,17 +631,23 @@ void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_colum } _table_items->build_chunk->append_chunk(chunk, slots); + // TODO: it's useless for the optimizer, but there're stil some UT depending on it for (size_t i = 0; i < _table_items->key_columns.size(); i++) { - // upgrade to nullable column - if (!_table_items->key_columns[i]->is_nullable() && key_columns[i]->is_nullable()) { - size_t row_count = _table_items->key_columns[i]->size(); - _table_items->key_columns[i] = - NullableColumn::create(_table_items->key_columns[i], NullColumn::create(row_count, 0)); + // If the join key is slot ref, will get from build chunk directly, + // otherwise will append from key_column of input + if (_table_items->join_keys[i].col_ref == nullptr) { + // upgrade to nullable column + if (!_table_items->key_columns[i]->is_nullable() && key_columns[i]->is_nullable()) { + size_t row_count = _table_items->key_columns[i]->size(); + _table_items->key_columns[i] = + NullableColumn::create(_table_items->key_columns[i], NullColumn::create(row_count, 0)); + } + _table_items->key_columns[i]->append(*key_columns[i]); } - _table_items->key_columns[i]->append(*key_columns[i]); } _table_items->row_count += chunk->num_rows(); + DCHECK_EQ(_table_items->row_count + 1, _table_items->build_chunk->num_rows()); } void JoinHashTable::merge_ht(const JoinHashTable& ht) { diff --git a/be/src/exec/join_hash_map.tpp b/be/src/exec/join_hash_map.tpp index 34e0b50624822..41bda49995a0d 100644 --- a/be/src/exec/join_hash_map.tpp +++ b/be/src/exec/join_hash_map.tpp @@ -59,6 +59,7 @@ void JoinBuildFunc::construct_hash_table(RuntimeState* state, JoinHashTableI if (table_items->key_columns[0]->is_nullable()) { auto* nullable_column = ColumnHelper::as_raw_column(table_items->key_columns[0]); auto& null_array = nullable_column->null_column()->get_data(); + DCHECK_EQ(data.size(), table_items->row_count + 1); for (size_t i = 1; i < table_items->row_count + 1; i++) { if (null_array[i] == 0) { uint32_t bucket_num = JoinHashMapHelper::calc_bucket_num(data[i], table_items->bucket_size); @@ -67,6 +68,7 @@ void JoinBuildFunc::construct_hash_table(RuntimeState* state, JoinHashTableI } } } else { + DCHECK_EQ(data.size(), table_items->row_count + 1); for (size_t i = 1; i < table_items->row_count + 1; i++) { uint32_t bucket_num = JoinHashMapHelper::calc_bucket_num(data[i], table_items->bucket_size); table_items->next[i] = table_items->first[bucket_num]; diff --git a/be/src/storage/chunk_helper.cpp b/be/src/storage/chunk_helper.cpp index 341ec1c832493..857466af47d7e 100644 --- a/be/src/storage/chunk_helper.cpp +++ b/be/src/storage/chunk_helper.cpp @@ -662,9 +662,9 @@ class SegmentedColumnSelectiveCopy final : public ColumnVisitorAdapter& column) { using ColumnT = BinaryColumnBase; using ContainerT = typename ColumnT::Container*; - using Bytes = ColumnT::Bytes; - using Byte = ColumnT::Byte; - using Offsets = ColumnT::Offsets; + using Bytes = typename ColumnT::Bytes; + using Byte = typename ColumnT::Byte; + using Offsets = typename ColumnT::Offsets; _result = column.clone_empty(); auto output = ColumnHelper::as_column(_result); @@ -789,6 +789,18 @@ ColumnPtr SegmentedColumn::clone_selective(const uint32_t* indexes, uint32_t fro return visitor.result(); } +ColumnPtr SegmentedColumn::materialize() const { + auto actual_columns = columns(); + if (actual_columns.empty()) { + return {}; + } + ColumnPtr result = actual_columns[0]->clone_empty(); + for (size_t i = 0; i < actual_columns.size(); i++) { + result->append(*actual_columns[i]); + } + return result; +} + size_t SegmentedColumn::segment_size() const { return _segment_size; } @@ -932,6 +944,16 @@ size_t SegmentedChunk::num_rows() const { return result; } +SegmentedColumnPtr SegmentedChunk::get_column_by_slot_id(SlotId slot_id) { + DCHECK(!!_segments[0]); + auto& map = _segments[0]->get_slot_id_to_index_map(); + auto iter = map.find(slot_id); + if (iter == map.end()) { + return nullptr; + } + return _columns[iter->second]; +} + const SegmentedColumns& SegmentedChunk::columns() const { return _columns; } diff --git a/be/src/storage/chunk_helper.h b/be/src/storage/chunk_helper.h index caf4a5258d65b..c582214055890 100644 --- a/be/src/storage/chunk_helper.h +++ b/be/src/storage/chunk_helper.h @@ -161,6 +161,7 @@ class SegmentedColumn final : public std::enable_shared_from_this void append(const SegmentedChunkPtr& chunk, size_t offset); void build_columns(); - size_t memory_usage() const; - size_t num_rows() const; + SegmentedColumnPtr get_column_by_slot_id(SlotId slot_id); const SegmentedColumns& columns() const; SegmentedColumns& columns(); size_t num_segments() const; const std::vector& segments() const; std::vector& segments(); - size_t segment_size() const; - void reset(); ChunkUniquePtr clone_empty(size_t reserve); + size_t segment_size() const; + void reset(); + size_t memory_usage() const; + size_t num_rows() const; Status upgrade_if_overflow(); Status downgrade(); bool has_large_column() const;