Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork committed Sep 20, 2024
1 parent e9206b6 commit 4917b65
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 19 deletions.
69 changes: 50 additions & 19 deletions be/src/storage/chunk_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,9 +608,11 @@ bool ChunkPipelineAccumulator::is_finished() const {
return _finalized && _out_chunk == nullptr && _in_chunk == nullptr;
}

class SegmentedColumnVisitor final : public ColumnVisitorAdapter<SegmentedColumnVisitor> {
// Selective-copy data from SegmentedColumn according to provided index
class SegmentedColumnSelectiveCopy final : public ColumnVisitorAdapter<SegmentedColumnSelectiveCopy> {
public:
SegmentedColumnVisitor(SegmentedColumnPtr segment_column, const uint32_t* indexes, uint32_t from, uint32_t size)
SegmentedColumnSelectiveCopy(SegmentedColumnPtr segment_column, const uint32_t* indexes, uint32_t from,
uint32_t size)
: ColumnVisitorAdapter(this),
_segment_column(std::move(segment_column)),
_indexes(indexes),
Expand All @@ -620,7 +622,7 @@ class SegmentedColumnVisitor final : public ColumnVisitorAdapter<SegmentedColumn
template <class T>
Status do_visit(const FixedLengthColumnBase<T>& column) {
using ColumnT = FixedLengthColumnBase<T>;
using ContainerT = Buffer<T>*;
using ContainerT = typename ColumnT::Container*;

auto output = ColumnHelper::as_column<ColumnT>(_result);
auto& columns = _segment_column->columns();
Expand All @@ -646,36 +648,63 @@ class SegmentedColumnVisitor final : public ColumnVisitorAdapter<SegmentedColumn

auto output = ColumnHelper::as_column<ColumnT>(_result);
auto& columns = _segment_column->columns();
size_t segment_size = _segment_column->segment_size();
std::vector<ContainerT> buffers;
for (auto& seg_column : columns) {
buffers.push_back(&ColumnHelper::as_column<ColumnT>(seg_column)->get_data());
}

for (uint32_t i = _from; i < _size; i++) {
uint32_t idx = _indexes[i];
int segment_id = idx / segment_size;
int segment_offset = idx % segment_size;
auto [segment_id, segment_offset] = _segment_address(idx);
output->append((*buffers[segment_id])[segment_offset]);
}
return {};
}

// TODO
template <class T>
Status do_visit(const ObjectColumn<T>& column) {
return Status::NotSupported("SegmentedColumnVisitor");
// Fallback implementation, it's usually used for Array/Struct/Map/Json
template <class ColumnT>
Status do_visit(const ColumnT& column) {
auto output = ColumnHelper::as_column<ColumnT>(_result);
auto& columns = _segment_column->columns();
for (uint32_t i = _from; i < _size; i++) {
uint32_t idx = _indexes[i];
auto [segment_id, segment_offset] = _segment_address(idx);
output->append(*columns[segment_id], segment_offset, 1);
}
return {};
}

Status do_visit(const NullableColumn& column) {
auto output = ColumnHelper::as_column<NullableColumn>(_result);
std::vector<ColumnPtr> data_columns, null_columns;
for (auto& column : _segment_column->columns()) {
NullableColumn::Ptr nullable = ColumnHelper::as_column<NullableColumn>(column);
data_columns.push_back(nullable->data_column());
null_columns.push_back(nullable->null_column());
}

auto segmented_data_column = std::make_shared<SegmentedColumn>(data_columns);
SegmentedColumnSelectiveCopy copy_data(segmented_data_column, _indexes, _from, _size);
(void)data_columns[0]->accept(&copy_data);
auto segmented_null_column = std::make_shared<SegmentedColumn>(null_columns);
SegmentedColumnSelectiveCopy copy_null(segmented_null_column, _indexes, _from, _size);
(void)null_columns[0]->accept(&copy_null);

return {};
}

Status do_visit(const ArrayColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); }
Status do_visit(const MapColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); }
Status do_visit(const StructColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); }
Status do_visit(const ConstColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); }
Status do_visit(const NullableColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); }

ColumnPtr result() { return _result; }

private:
std::pair<int, int> _segment_address(uint32 idx) {
size_t segment_size = _segment_column->segment_size();
int segment_id = idx / segment_size;
int segment_offset = idx % segment_size;
return {segment_id, segment_offset};
}

SegmentedColumnPtr _segment_column;
ColumnPtr _result;
const uint32_t* _indexes;
Expand All @@ -689,8 +718,10 @@ SegmentedColumn::SegmentedColumn(SegmentedChunkPtr chunk, size_t column_index) :
}
}

SegmentedColumn::SegmentedColumn(const std::vector<ColumnPtr>& columns) : _columns(columns) {}

ColumnPtr SegmentedColumn::clone_selective(const uint32_t* indexes, uint32_t from, uint32_t size) {
SegmentedColumnVisitor visitor(shared_from_this(), indexes, from, size);
SegmentedColumnSelectiveCopy visitor(shared_from_this(), indexes, from, size);
(void)_columns[0]->accept(&visitor);
return visitor.result();
}
Expand Down Expand Up @@ -735,11 +766,11 @@ void SegmentedColumn::upgrade_to_nullable() {
SegmentedChunk::SegmentedChunk(size_t segment_size) : _segment_size(segment_size) {
// put at least one chunk there
_segments.resize(1);
_segments[0] = std::make_shared<Chunk>();
}

void SegmentedChunk::append_column(ColumnPtr column, SlotId slot_id) {
_segments.resize(1);
_segments[0] = std::make_shared<Chunk>();
// It's only used when initializing the chunk, so append the column to first chunk is enough
_segments[0]->append_column(std::move(column), slot_id);
}

Expand Down Expand Up @@ -786,11 +817,11 @@ void SegmentedChunk::append(const SegmentedChunkPtr& chunk, size_t offset) {
size_t segment_offset = offset % chunk->_segment_size;
for (size_t i = segment_index; i < chunk->num_segments(); i++) {
// The segment need to cutoff
if (i == segment_index) {
if (i == segment_index && segment_offset > 0) {
auto cutoff = input_segments[i]->clone_empty();
size_t count = input_segments[i]->num_rows() - segment_offset;
cutoff->append(*cutoff, segment_offset, count);
ChunkPtr shared(cutoff.release());
ChunkPtr shared(std::move(cutoff));
append_chunk(std::move(shared));
} else {
append_chunk(input_segments[i]);
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/chunk_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ class ChunkPipelineAccumulator {
class SegmentedColumn : std::enable_shared_from_this<SegmentedColumn> {
public:
SegmentedColumn(SegmentedChunkPtr chunk, size_t column_index);
SegmentedColumn(const std::vector<ColumnPtr>& columns);
~SegmentedColumn() = default;

ColumnPtr clone_selective(const uint32_t* indexes, uint32_t from, uint32_t size);
Expand Down

0 comments on commit 4917b65

Please sign in to comment.