Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork committed Sep 20, 2024
1 parent dfd3afd commit e9206b6
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 86 deletions.
31 changes: 30 additions & 1 deletion be/src/column/column_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "column/vectorized_fwd.h"
#include "gutil/casts.h"
#include "simd/simd.h"
#include "storage/chunk_helper.h"
#include "types/logical_type_infra.h"
#include "util/date_func.h"
#include "util/percentile_value.h"
Expand Down Expand Up @@ -469,7 +470,7 @@ size_t ChunkSliceTemplate<Ptr>::skip(size_t skip_rows) {

// Cutoff required rows from this chunk
template <class Ptr>
Ptr ChunkSliceTemplate<Ptr>::cutoff(size_t required_rows) {
ChunkUniquePtr ChunkSliceTemplate<Ptr>::cutoff(size_t required_rows) {
DCHECK(!empty());
size_t cut_rows = std::min(rows(), required_rows);
auto res = chunk->clone_empty(cut_rows);
Expand All @@ -482,7 +483,35 @@ Ptr ChunkSliceTemplate<Ptr>::cutoff(size_t required_rows) {
return res;
}

// Specialized for SegmentedChunkPtr
template <>
ChunkUniquePtr ChunkSliceTemplate<SegmentedChunkPtr>::cutoff(size_t required_rows) {
DCHECK(!empty());
// cutoff a chunk from current segment, if it doesn't meet the requirement just let it be
ChunkPtr segment = chunk->segments()[segment_id];
size_t segment_offset = offset % chunk->segment_size();
size_t cut_rows = std::min(segment->num_rows() - segment_offset, required_rows);

auto res = segment->clone_empty(cut_rows);
res->append(*segment, segment_offset, cut_rows);
offset += cut_rows;

// move to next segment and release previous one
size_t new_segment_id = offset / chunk->segment_size();
if (new_segment_id != segment_id) {
chunk->segments()[segment_id].reset();
segment_id = new_segment_id;
}

if (empty()) {
chunk.reset();
offset = 0;
}
return res;
}

template struct ChunkSliceTemplate<ChunkPtr>;
template struct ChunkSliceTemplate<ChunkUniquePtr>;
template struct ChunkSliceTemplate<SegmentedChunkPtr>;

} // namespace starrocks
6 changes: 3 additions & 3 deletions be/src/column/column_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,6 @@ class ColumnHelper {
}
}

static SegmentedColumnPtr get_data_column(SegmentedColumnPtr column);

static BinaryColumn* get_binary_column(Column* column) { return down_cast<BinaryColumn*>(get_data_column(column)); }

static bool is_all_const(const Columns& columns);
Expand Down Expand Up @@ -515,12 +513,13 @@ class ColumnHelper {
template <class Ptr = ChunkUniquePtr>
struct ChunkSliceTemplate {
Ptr chunk;
size_t segment_id = 0;
size_t offset = 0;

bool empty() const;
size_t rows() const;
size_t skip(size_t skip_rows);
Ptr cutoff(size_t required_rows);
ChunkUniquePtr cutoff(size_t required_rows);
void reset(Ptr input);
};

Expand Down Expand Up @@ -550,5 +549,6 @@ APPLY_FOR_ALL_STRING_TYPE(GET_CONTAINER)

using ChunkSlice = ChunkSliceTemplate<ChunkUniquePtr>;
using ChunkSharedSlice = ChunkSliceTemplate<ChunkPtr>;
using SegmentedChunkSlice = ChunkSliceTemplate<SegmentedChunkPtr>;

} // namespace starrocks
45 changes: 14 additions & 31 deletions be/src/exec/join_hash_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ void JoinHashTable::create(const HashTableParam& param) {
_probe_state->probe_counter = param.probe_counter;
}

_table_items->build_chunk = std::make_shared<SegmentedChunk>();
_table_items->build_chunk = std::make_shared<SegmentedChunk>(param.build_chunk_segment_size);
_table_items->with_other_conjunct = param.with_other_conjunct;
_table_items->join_type = param.join_type;
_table_items->mor_reader_mode = param.mor_reader_mode;
Expand Down Expand Up @@ -545,15 +545,6 @@ Status JoinHashTable::build(RuntimeState* state) {
RETURN_IF_ERROR(_table_items->build_chunk->upgrade_if_overflow());
_table_items->has_large_column = _table_items->build_chunk->has_large_column();

// If the join key is column ref of build chunk, fetch from build chunk directly
size_t join_key_count = _table_items->join_keys.size();
for (size_t i = 0; i < join_key_count; i++) {
if (_table_items->join_keys[i].col_ref != nullptr) {
SlotId slot_id = _table_items->join_keys[i].col_ref->slot_id();
_table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id);
}
}

RETURN_IF_ERROR(_upgrade_key_columns_if_overflow());

_hash_map_type = _choose_join_hash_map();
Expand Down Expand Up @@ -626,31 +617,22 @@ Status JoinHashTable::probe_remain(RuntimeState* state, ChunkPtr* chunk, bool* e
}

void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_columns) {
auto& columns = _table_items->build_chunk->columns();

// TODO: simplify the SlotId mapping, if the slot of input chunk is same as build_chunk, we don't need to remap them
std::vector<SlotId> slots;
for (size_t i = 0; i < _table_items->build_column_count; i++) {
SlotDescriptor* slot = _table_items->build_slots[i].slot;
ColumnPtr& column = chunk->get_column_by_slot_id(slot->id());

if (!columns[i]->is_nullable() && column->is_nullable()) {
// upgrade to nullable column
columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0));
}
columns[i]->append(*column);
slots.push_back(slot->id());
}
_table_items->build_chunk->append_chunk(chunk, slots);

for (size_t i = 0; i < _table_items->key_columns.size(); i++) {
// If the join key is slot ref, will get from build chunk directly,
// otherwise will append from key_column of input
if (_table_items->join_keys[i].col_ref == nullptr) {
// upgrade to nullable column
if (!_table_items->key_columns[i]->is_nullable() && key_columns[i]->is_nullable()) {
size_t row_count = _table_items->key_columns[i]->size();
_table_items->key_columns[i] =
NullableColumn::create(_table_items->key_columns[i], NullColumn::create(row_count, 0));
}
_table_items->key_columns[i]->append(*key_columns[i]);
// upgrade to nullable column
if (!_table_items->key_columns[i]->is_nullable() && key_columns[i]->is_nullable()) {
size_t row_count = _table_items->key_columns[i]->size();
_table_items->key_columns[i] =
NullableColumn::create(_table_items->key_columns[i], NullColumn::create(row_count, 0));
}
_table_items->key_columns[i]->append(*key_columns[i]);
}

_table_items->row_count += chunk->num_rows();
Expand All @@ -665,10 +647,11 @@ void JoinHashTable::merge_ht(const JoinHashTable& ht) {
for (size_t i = 0; i < _table_items->build_column_count; i++) {
if (!columns[i]->is_nullable() && other_columns[i]->is_nullable()) {
// upgrade to nullable column
columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0));
// columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0));
columns[i]->upgrade_to_nullable();
}
columns[i]->append(*other_columns[i], 1, other_columns[i]->size() - 1);
}
_table_items->build_chunk->append(ht._table_items->build_chunk, 1);
}

ChunkPtr JoinHashTable::convert_to_spill_schema(const ChunkPtr& chunk) const {
Expand Down
12 changes: 8 additions & 4 deletions be/src/exec/join_hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ struct HashTableSlotDescriptor {
struct JoinHashTableItems {
//TODO: memory continues problem?
SegmentedChunkPtr build_chunk = nullptr;
SegmentedColumns key_columns;
Columns key_columns;
std::vector<HashTableSlotDescriptor> build_slots;
std::vector<HashTableSlotDescriptor> probe_slots;
// A hash value is the bucket index of the hash map. "JoinHashTableItems.first" is the
Expand Down Expand Up @@ -295,6 +295,9 @@ struct HashTableParam {
RuntimeProfile::Counter* output_probe_column_timer = nullptr;
RuntimeProfile::Counter* probe_counter = nullptr;
bool mor_reader_mode = false;

// TODO: optimize this according to chunk width
size_t build_chunk_segment_size = 2 << 15;
};

template <class T>
Expand Down Expand Up @@ -394,7 +397,7 @@ class JoinBuildFunc {
using ColumnType = typename RunTimeTypeTraits<LT>::ColumnType;

static void prepare(RuntimeState* runtime, JoinHashTableItems* table_items);
static const Buffer<CppType>& get_key_data(const JoinHashTableItems& table_items, size_t segment_index);
static const Buffer<CppType>& get_key_data(const JoinHashTableItems& table_items);
static void construct_hash_table(RuntimeState* state, JoinHashTableItems* table_items,
HashTableProbeState* probe_state);
};
Expand Down Expand Up @@ -689,9 +692,10 @@ class JoinHashMap {

void _copy_probe_nullable_column(ColumnPtr* src_column, ChunkPtr* chunk, const SlotDescriptor* slot);

void _copy_build_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, bool to_nullable);
void _copy_build_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot,
bool to_nullable);

void _copy_build_nullable_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot);
void _copy_build_nullable_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot);

void _probe_index_output(ChunkPtr* chunk);
void _build_index_output(ChunkPtr* chunk);
Expand Down
34 changes: 14 additions & 20 deletions be/src/exec/join_hash_map.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "column/vectorized_fwd.h"
#include "simd/simd.h"
#include "util/runtime_profile.h"

Expand All @@ -31,13 +32,13 @@ void JoinBuildFunc<LT>::prepare(RuntimeState* runtime, JoinHashTableItems* table

template <LogicalType LT>
const Buffer<typename JoinBuildFunc<LT>::CppType>& JoinBuildFunc<LT>::get_key_data(
const JoinHashTableItems& table_items, size_t segment_index) {
const JoinHashTableItems& table_items) {
ColumnPtr data_column;
if (table_items.key_columns[0]->is_nullable()) {
auto* null_column = ColumnHelper::as_raw_column<NullableColumn>(table_items.key_columns[0]->get_segmented_column(segment_index));
auto* null_column = ColumnHelper::as_raw_column<NullableColumn>(table_items.key_columns[0]);
data_column = null_column->data_column();
} else {
data_column = table_items.key_columns[0]->get_segmented_column(segment_index);
data_column = table_items.key_columns[0];
}

if constexpr (lt_is_string<LT>) {
Expand All @@ -54,12 +55,9 @@ const Buffer<typename JoinBuildFunc<LT>::CppType>& JoinBuildFunc<LT>::get_key_da
template <LogicalType LT>
void JoinBuildFunc<LT>::construct_hash_table(RuntimeState* state, JoinHashTableItems* table_items,
HashTableProbeState* probe_state) {
size_t table_index = 1;
for (int i = 0; i < table_items->key_columns[0]->num_segments(); i++) {
auto& data = get_key_data(*table_items);

if (table_items->key_columns[0]->is_nullable()) {
auto* nullable_column = ColumnHelper::as_raw_column<NullableColumn>(table_items->key_columns[0]->get_segmented_column(i));
auto* nullable_column = ColumnHelper::as_raw_column<NullableColumn>(table_items->key_columns[0]);
auto& null_array = nullable_column->null_column()->get_data();
for (size_t i = 1; i < table_items->row_count + 1; i++) {
if (null_array[i] == 0) {
Expand All @@ -75,7 +73,6 @@ void JoinBuildFunc<LT>::construct_hash_table(RuntimeState* state, JoinHashTableI
table_items->first[bucket_num] = i;
}
}
}
table_items->calculate_ht_info(table_items->key_columns[0]->byte_size());
}

Expand Down Expand Up @@ -417,6 +414,7 @@ void JoinHashMap<LT, BuildFunc, ProbeFunc>::probe_prepare(RuntimeState* state) {

template <LogicalType LT, class BuildFunc, class ProbeFunc>
void JoinHashMap<LT, BuildFunc, ProbeFunc>::build(RuntimeState* state) {
_table_items->build_chunk->append_finished();
BuildFunc().construct_hash_table(state, _table_items, _probe_state);
}

Expand Down Expand Up @@ -623,7 +621,7 @@ void JoinHashMap<LT, BuildFunc, ProbeFunc>::_build_output(ChunkPtr* chunk) {

bool need_output = is_lazy ? hash_table_slot.need_lazy_materialize : hash_table_slot.need_output;
if (need_output) {
ColumnPtr& column = _table_items->build_chunk->columns()[i];
auto& column = _table_items->build_chunk->columns()[i];
if (!column->is_nullable()) {
_copy_build_column(column, chunk, slot, to_nullable);
} else {
Expand Down Expand Up @@ -688,11 +686,10 @@ void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_probe_nullable_column(ColumnPt
}

template <LogicalType LT, class BuildFunc, class ProbeFunc>
void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_build_column(const ColumnPtr& src_column, ChunkPtr* chunk,
void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_build_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk,
const SlotDescriptor* slot, bool to_nullable) {
if (to_nullable) {
auto data_column = src_column->clone_empty();
data_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count);
auto data_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count);

// When left outer join is executed,
// build_index[i] Equal to 0 means it is not found in the hash table,
Expand All @@ -708,18 +705,15 @@ void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_build_column(const ColumnPtr&
auto dest_column = NullableColumn::create(std::move(data_column), null_column);
(*chunk)->append_column(std::move(dest_column), slot->id());
} else {
auto dest_column = src_column->clone_empty();
dest_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count);
(*chunk)->append_column(std::move(dest_column), slot->id());
auto data_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count);
(*chunk)->append_column(std::move(data_column), slot->id());
}
}

template <LogicalType LT, class BuildFunc, class ProbeFunc>
void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_build_nullable_column(const ColumnPtr& src_column, ChunkPtr* chunk,
const SlotDescriptor* slot) {
ColumnPtr dest_column = src_column->clone_empty();

dest_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count);
void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_build_nullable_column(const SegmentedColumnPtr& src_column,
ChunkPtr* chunk, const SlotDescriptor* slot) {
ColumnPtr dest_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count);

// When left outer join is executed,
// build_index[i] Equal to 0 means it is not found in the hash table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ StatusOr<std::function<StatusOr<ChunkPtr>()>> SpillableHashJoinBuildOperator::_c
}
}

auto chunk = _hash_table_build_chunk_slice.cutoff(runtime_state()->chunk_size());
ChunkPtr chunk = _hash_table_build_chunk_slice.cutoff(runtime_state()->chunk_size());
RETURN_IF_ERROR(chunk->downgrade());
RETURN_IF_ERROR(append_hash_columns(chunk));
_join_builder->update_build_rows(chunk->num_rows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class SpillableHashJoinBuildOperator final : public HashJoinBuildOperator {

size_t _hash_table_iterate_idx = 0;
std::vector<JoinHashTable*> _hash_tables;
ChunkSharedSlice _hash_table_build_chunk_slice;
SegmentedChunkSlice _hash_table_build_chunk_slice;
std::function<StatusOr<ChunkPtr>()> _hash_table_slice_iterator;
bool _is_first_time_spill = true;
DECLARE_ONCE_DETECTOR(_set_finishing_once);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/spill/mem_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ Status OrderedMemTable::finalize(workgroup::YieldContext& yield_ctx, const Spill
return Status::OK();
}
SCOPED_RAW_TIMER(&yield_ctx.time_spent_ns);
auto chunk = _chunk_slice.cutoff(_runtime_state->chunk_size());
ChunkPtr chunk = _chunk_slice.cutoff(_runtime_state->chunk_size());
bool need_aligned = _runtime_state->spill_enable_direct_io();

RETURN_IF_ERROR(serde->serialize(_runtime_state, serde_ctx, chunk, output, need_aligned));
Expand Down
Loading

0 comments on commit e9206b6

Please sign in to comment.