Skip to content

Commit

Permalink
fix key_columns
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork committed Oct 18, 2024
1 parent 9a8f37a commit 80fd5dc
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 13 deletions.
27 changes: 21 additions & 6 deletions be/src/exec/join_hash_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,15 @@ Status JoinHashTable::build(RuntimeState* state) {
RETURN_IF_ERROR(_table_items->build_chunk->upgrade_if_overflow());
_table_items->has_large_column = _table_items->build_chunk->has_large_column();

// build key_columns
size_t join_key_count = _table_items->join_keys.size();
for (size_t i = 0; i < join_key_count; i++) {
if (_table_items->join_keys[i].col_ref != nullptr) {
SlotId slot_id = _table_items->join_keys[i].col_ref->slot_id();
_table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id)->materialize();
}
}

RETURN_IF_ERROR(_upgrade_key_columns_if_overflow());

_hash_map_type = _choose_join_hash_map();
Expand Down Expand Up @@ -622,17 +631,23 @@ void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_colum
}
_table_items->build_chunk->append_chunk(chunk, slots);

// TODO: it's useless for the optimizer, but there're stil some UT depending on it
for (size_t i = 0; i < _table_items->key_columns.size(); i++) {
// upgrade to nullable column
if (!_table_items->key_columns[i]->is_nullable() && key_columns[i]->is_nullable()) {
size_t row_count = _table_items->key_columns[i]->size();
_table_items->key_columns[i] =
NullableColumn::create(_table_items->key_columns[i], NullColumn::create(row_count, 0));
// If the join key is slot ref, will get from build chunk directly,
// otherwise will append from key_column of input
if (_table_items->join_keys[i].col_ref == nullptr) {
// upgrade to nullable column
if (!_table_items->key_columns[i]->is_nullable() && key_columns[i]->is_nullable()) {
size_t row_count = _table_items->key_columns[i]->size();
_table_items->key_columns[i] =
NullableColumn::create(_table_items->key_columns[i], NullColumn::create(row_count, 0));
}
_table_items->key_columns[i]->append(*key_columns[i]);
}
_table_items->key_columns[i]->append(*key_columns[i]);
}

_table_items->row_count += chunk->num_rows();
DCHECK_EQ(_table_items->row_count + 1, _table_items->build_chunk->num_rows());
}

void JoinHashTable::merge_ht(const JoinHashTable& ht) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/join_hash_map.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void JoinBuildFunc<LT>::construct_hash_table(RuntimeState* state, JoinHashTableI
if (table_items->key_columns[0]->is_nullable()) {
auto* nullable_column = ColumnHelper::as_raw_column<NullableColumn>(table_items->key_columns[0]);
auto& null_array = nullable_column->null_column()->get_data();
DCHECK_EQ(data.size(), table_items->row_count + 1);
for (size_t i = 1; i < table_items->row_count + 1; i++) {
if (null_array[i] == 0) {
uint32_t bucket_num = JoinHashMapHelper::calc_bucket_num<CppType>(data[i], table_items->bucket_size);
Expand All @@ -67,6 +68,7 @@ void JoinBuildFunc<LT>::construct_hash_table(RuntimeState* state, JoinHashTableI
}
}
} else {
DCHECK_EQ(data.size(), table_items->row_count + 1);
for (size_t i = 1; i < table_items->row_count + 1; i++) {
uint32_t bucket_num = JoinHashMapHelper::calc_bucket_num<CppType>(data[i], table_items->bucket_size);
table_items->next[i] = table_items->first[bucket_num];
Expand Down
28 changes: 25 additions & 3 deletions be/src/storage/chunk_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,9 @@ class SegmentedColumnSelectiveCopy final : public ColumnVisitorAdapter<Segmented
Status do_visit(const BinaryColumnBase<Offset>& column) {
using ColumnT = BinaryColumnBase<Offset>;
using ContainerT = typename ColumnT::Container*;
using Bytes = ColumnT::Bytes;
using Byte = ColumnT::Byte;
using Offsets = ColumnT::Offsets;
using Bytes = typename ColumnT::Bytes;
using Byte = typename ColumnT::Byte;
using Offsets = typename ColumnT::Offsets;

_result = column.clone_empty();
auto output = ColumnHelper::as_column<ColumnT>(_result);
Expand Down Expand Up @@ -789,6 +789,18 @@ ColumnPtr SegmentedColumn::clone_selective(const uint32_t* indexes, uint32_t fro
return visitor.result();
}

ColumnPtr SegmentedColumn::materialize() const {
auto actual_columns = columns();
if (actual_columns.empty()) {
return {};
}
ColumnPtr result = actual_columns[0]->clone_empty();
for (size_t i = 0; i < actual_columns.size(); i++) {
result->append(*actual_columns[i]);
}
return result;
}

size_t SegmentedColumn::segment_size() const {
return _segment_size;
}
Expand Down Expand Up @@ -932,6 +944,16 @@ size_t SegmentedChunk::num_rows() const {
return result;
}

SegmentedColumnPtr SegmentedChunk::get_column_by_slot_id(SlotId slot_id) {
DCHECK(!!_segments[0]);
auto& map = _segments[0]->get_slot_id_to_index_map();
auto iter = map.find(slot_id);
if (iter == map.end()) {
return nullptr;
}
return _columns[iter->second];
}

const SegmentedColumns& SegmentedChunk::columns() const {
return _columns;
}
Expand Down
10 changes: 6 additions & 4 deletions be/src/storage/chunk_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class SegmentedColumn final : public std::enable_shared_from_this<SegmentedColum
~SegmentedColumn() = default;

ColumnPtr clone_selective(const uint32_t* indexes, uint32_t from, uint32_t size);
ColumnPtr materialize() const;

bool is_nullable() const;
bool has_null() const;
Expand Down Expand Up @@ -192,17 +193,18 @@ class SegmentedChunk final : public std::enable_shared_from_this<SegmentedChunk>
void append(const SegmentedChunkPtr& chunk, size_t offset);
void build_columns();

size_t memory_usage() const;
size_t num_rows() const;
SegmentedColumnPtr get_column_by_slot_id(SlotId slot_id);
const SegmentedColumns& columns() const;
SegmentedColumns& columns();
size_t num_segments() const;
const std::vector<ChunkPtr>& segments() const;
std::vector<ChunkPtr>& segments();
size_t segment_size() const;
void reset();
ChunkUniquePtr clone_empty(size_t reserve);

size_t segment_size() const;
void reset();
size_t memory_usage() const;
size_t num_rows() const;
Status upgrade_if_overflow();
Status downgrade();
bool has_large_column() const;
Expand Down

0 comments on commit 80fd5dc

Please sign in to comment.