Skip to content

Commit

Permalink
[Optimize](Variant) optimize schema update performance (#45480)
Browse files Browse the repository at this point in the history
When update schema with high concurrency, updaing schemas cost is
expensive.
1. update schema only when rows is not 0
2. copy_from is expensive, use copy constructor
  • Loading branch information
eldenmoon authored Dec 20, 2024
1 parent 9bde47c commit 62a6360
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 15 deletions.
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ Status SegmentWriter::append_block_with_variant_subcolumns(vectorized::Block& da
continue;
}
if (_flush_schema == nullptr) {
_flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
_flush_schema = std::make_shared<TabletSchema>();
// deep copy
_flush_schema->copy_from(*_tablet_schema);
}
auto column_ref = data.get_by_position(i).column;
const vectorized::ColumnObject& object_column = assert_cast<vectorized::ColumnObject&>(
Expand Down
24 changes: 13 additions & 11 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,21 +346,22 @@ Status RowsetBuilder::commit_txn() {
SCOPED_TIMER(_commit_txn_timer);

const RowsetWriterContext& rw_ctx = _rowset_writer->context();
if (rw_ctx.tablet_schema->num_variant_columns() > 0) {
if (rw_ctx.tablet_schema->num_variant_columns() > 0 && _rowset->num_rows() > 0) {
// Need to merge schema with `rw_ctx.merged_tablet_schema` in prior,
// merged schema keeps the newest merged schema for the rowset, which is updated and merged
// during flushing segments.
if (rw_ctx.merged_tablet_schema != nullptr) {
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.merged_tablet_schema));
} else {
// We should merge rowset schema further, in case that the merged_tablet_schema maybe null
// when enable_memtable_on_sink_node is true, the merged_tablet_schema will not be passed to
// the destination backend.
// update tablet schema when meet variant columns, before commit_txn
// Eg. rowset schema: A(int), B(float), C(int), D(int)
// _tabelt->tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
}
// We should merge rowset schema further, in case that the merged_tablet_schema maybe null
// when enable_memtable_on_sink_node is true, the merged_tablet_schema will not be passed to
// the destination backend.
// update tablet schema when meet variant columns, before commit_txn
// Eg. rowset schema: A(int), B(float), C(int), D(int)
// _tabelt->tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
}

// Transfer ownership of `PendingRowsetGuard` to `TxnManager`
Expand Down Expand Up @@ -398,7 +399,6 @@ Status BaseRowsetBuilder::cancel() {
void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema) {
_tablet_schema->copy_from(ori_tablet_schema);
// find the right index id
int i = 0;
auto indexes = table_schema_param->indexes();
Expand All @@ -407,11 +407,13 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
break;
}
}

if (!indexes.empty() && !indexes[i]->columns.empty() &&
indexes[i]->columns[0]->unique_id() >= 0) {
_tablet_schema->shawdow_copy_without_columns(ori_tablet_schema);
_tablet_schema->build_current_tablet_schema(index_id, table_schema_param->version(),
indexes[i], ori_tablet_schema);
} else {
_tablet_schema->copy_from(ori_tablet_schema);
}
if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) {
// After schema change, should include extracted column
Expand Down
15 changes: 15 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,21 @@ void TabletSchema::copy_from(const TabletSchema& tablet_schema) {
_table_id = tablet_schema.table_id();
}

void TabletSchema::shawdow_copy_without_columns(const TabletSchema& tablet_schema) {
*this = tablet_schema;
_field_path_to_index.clear();
_field_name_to_index.clear();
_field_id_to_index.clear();
_num_columns = 0;
_num_variant_columns = 0;
_num_null_columns = 0;
_num_key_columns = 0;
_cols.clear();
_vl_field_mem_size = 0;
// notice : do not ref columns
_column_cache_handlers.clear();
}

void TabletSchema::update_index_info_from(const TabletSchema& tablet_schema) {
for (auto& col : _cols) {
if (col->unique_id() < 0) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
// Must make sure the row column is always the last column
void add_row_column();
void copy_from(const TabletSchema& tablet_schema);
// lightweight copy, take care of lifecycle of TabletColumn
void shawdow_copy_without_columns(const TabletSchema& tablet_schema);
void update_index_info_from(const TabletSchema& tablet_schema);
std::string to_key() const;
// get_metadata_size is only the memory of the TabletSchema itself, not include child objects.
Expand Down Expand Up @@ -531,6 +533,7 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);
TabletSchema(const TabletSchema&) = default;

void clear_column_cache_handlers();

Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,8 @@ Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
// duplicated paths following the update_least_common_schema process.
auto build_schema_without_extracted_columns = [&](const TabletSchemaSPtr& base_schema) {
output_schema = std::make_shared<TabletSchema>();
output_schema->copy_from(*base_schema);
// Merge columns from other schemas
output_schema->clear_columns();
// not copy columns but only shadow copy other attributes
output_schema->shawdow_copy_without_columns(*base_schema);
// Get all columns without extracted columns and collect variant col unique id
for (const TabletColumnPtr& col : base_schema->columns()) {
if (col->is_variant_type()) {
Expand Down

0 comments on commit 62a6360

Please sign in to comment.