diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index c2a4469d97f324..e0eb7534123a86 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -115,8 +115,7 @@ Status SegmentFlusher::close() { bool SegmentFlusher::need_buffering() { // buffering variants for schema change return _context.write_type == DataWriteType::TYPE_SCHEMA_CHANGE && - (_context.tablet_schema->num_variant_columns() > 0 || - !_context.tablet_schema->cluster_key_idxes().empty()); + _context.tablet_schema->num_variant_columns() > 0; } Status SegmentFlusher::_add_rows(std::unique_ptr& segment_writer, diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index ec291d8d2f0068..cd1b10d733a88c 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -198,6 +198,21 @@ class MultiBlockMerger { pushed_row_refs.push_back(row_refs[i]); } } + if (!_tablet->tablet_schema()->cluster_key_idxes().empty()) { + std::vector ids; + for (const auto& cid : _tablet->tablet_schema()->cluster_key_idxes()) { + auto index = _tablet->tablet_schema()->field_index(cid); + if (index == -1) { + return Status::InternalError( + "could not find cluster key column with unique_id=" + + std::to_string(cid) + " in tablet schema"); + } + ids.push_back(index); + } + // sort by cluster key + std::stable_sort(pushed_row_refs.begin(), pushed_row_refs.end(), + ClusterKeyRowRefComparator(ids)); + } } // update real inserted row number @@ -249,6 +264,20 @@ class MultiBlockMerger { const size_t _num_columns; }; + struct ClusterKeyRowRefComparator { + ClusterKeyRowRefComparator(std::vector columns) : _columns(columns) {} + + int compare(const RowRef& lhs, const RowRef& rhs) const { + return lhs.block->compare_at(lhs.position, rhs.position, &_columns, *rhs.block, -1); + } + + bool operator()(const RowRef& lhs, const RowRef& rhs) const { + return compare(lhs, rhs) < 0; + } + + const std::vector _columns; + }; + BaseTabletSPtr _tablet; RowRefComparator _cmp; vectorized::Arena _arena; @@ -1158,6 +1187,7 @@ Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc } context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE; + // TODO if support VerticalSegmentWriter, also need to handle cluster key primary key index auto result = _new_tablet->create_rowset_writer(context, false); if (!result.has_value()) { res = Status::Error("create_rowset_writer failed, reason={}",