Skip to content

Commit

Permalink
Merge branch 'master' into drop_repository
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Dec 4, 2024
2 parents 945d46e + d22bd83 commit 81f21f5
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
3 changes: 1 addition & 2 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ Status SegmentFlusher::close() {
bool SegmentFlusher::need_buffering() {
// buffering variants for schema change
return _context.write_type == DataWriteType::TYPE_SCHEMA_CHANGE &&
(_context.tablet_schema->num_variant_columns() > 0 ||
!_context.tablet_schema->cluster_key_idxes().empty());
_context.tablet_schema->num_variant_columns() > 0;
}

Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
Expand Down
30 changes: 30 additions & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ class MultiBlockMerger {
pushed_row_refs.push_back(row_refs[i]);
}
}
if (!_tablet->tablet_schema()->cluster_key_idxes().empty()) {
std::vector<uint32_t> ids;
for (const auto& cid : _tablet->tablet_schema()->cluster_key_idxes()) {
auto index = _tablet->tablet_schema()->field_index(cid);
if (index == -1) {
return Status::InternalError(
"could not find cluster key column with unique_id=" +
std::to_string(cid) + " in tablet schema");
}
ids.push_back(index);
}
// sort by cluster key
std::stable_sort(pushed_row_refs.begin(), pushed_row_refs.end(),
ClusterKeyRowRefComparator(ids));
}
}

// update real inserted row number
Expand Down Expand Up @@ -249,6 +264,20 @@ class MultiBlockMerger {
const size_t _num_columns;
};

struct ClusterKeyRowRefComparator {
ClusterKeyRowRefComparator(std::vector<uint32_t> columns) : _columns(columns) {}

int compare(const RowRef& lhs, const RowRef& rhs) const {
return lhs.block->compare_at(lhs.position, rhs.position, &_columns, *rhs.block, -1);
}

bool operator()(const RowRef& lhs, const RowRef& rhs) const {
return compare(lhs, rhs) < 0;
}

const std::vector<uint32_t> _columns;
};

BaseTabletSPtr _tablet;
RowRefComparator _cmp;
vectorized::Arena _arena;
Expand Down Expand Up @@ -1158,6 +1187,7 @@ Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc
}

context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
// TODO if support VerticalSegmentWriter, also need to handle cluster key primary key index
auto result = _new_tablet->create_rowset_writer(context, false);
if (!result.has_value()) {
res = Status::Error<ROWSET_BUILDER_INIT>("create_rowset_writer failed, reason={}",
Expand Down

0 comments on commit 81f21f5

Please sign in to comment.