diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 1888273507eae4c..c8ccbe7c1afcc15 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -602,10 +602,10 @@ void MemTable::_aggregate() { // !!ATTENTION!!: there may be rows with the same keys after MemTable::_aggregate() in this situation. // batched_rows store rows with the same key that have different pattern: - // 0 -> row has sequence col with delete sign - // 1 -> row has sequence col and without delete sign - // 2 -> row without sequence col and with delete sign - // 3 -> row without sequence col and without delete sign + // 0 -> row without sequence col and with delete sign + // 1 -> row without sequence col and without delete sign + // 2 -> row has sequence col with delete sign + // 3 -> row has sequence col and without delete sign std::array batched_rows {}; auto finalize_rows = [&]() { for (auto& row : batched_rows) { @@ -618,7 +618,7 @@ void MemTable::_aggregate() { batched_rows.fill(nullptr); }; auto get_idx = [](bool with_seq_col, bool has_delete_sign) { - if (with_seq_col) { + if (!with_seq_col) { if (has_delete_sign) { return 0; } else { @@ -651,14 +651,38 @@ void MemTable::_aggregate() { } } if (prev_row != nullptr && (*_vec_row_comparator)(prev_row, cur_row) == 0) { - if (has_delete_sign) { - RowInBlock*& row_without_delete_sign = - (with_seq_col ? batched_rows[1] : batched_rows[3]); - if (row_without_delete_sign != nullptr) { - // if there exits row without delete sign, remove it first - row_clear_agg(row_without_delete_sign); - _stat.merged_rows++; - row_without_delete_sign = nullptr; + if (!with_seq_col) { + if (has_delete_sign) { + RowInBlock*& row_without_delete_sign = batched_rows[1]; + if (row_without_delete_sign != nullptr) { + // if there exits row without delete sign, remove it first + row_clear_agg(row_without_delete_sign); + _stat.merged_rows++; + row_without_delete_sign = nullptr; + } + } + } else { + // batched_rows[3]'s seq value >= batched_rows[2]'s seq value + prev_row = (batched_rows[3] != nullptr ? batched_rows[3] : batched_rows[2]); + if (prev_row != nullptr) { + auto* col_ptr = + mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); + int res = col_ptr->compare_at(prev_row->_row_pos, cur_row->_row_pos, + *col_ptr, -1); + if (res > 0) { + // previous rows have higher sequence value, ignore this row + _stat.merged_rows++; + continue; + } + } + if (has_delete_sign) { + RowInBlock*& row_without_delete_sign = batched_rows[1]; + if (row_without_delete_sign != nullptr) { + // if there exits row without delete sign, remove it first + row_clear_agg(row_without_delete_sign); + _stat.merged_rows++; + row_without_delete_sign = nullptr; + } } } prev_row = batched_rows[get_idx(with_seq_col, has_delete_sign)]; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 74b8cfb4ba2f7e0..3865ee0e2172421 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -459,7 +459,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da bool has_default_or_nullable = false; std::vector use_default_or_null_flag; use_default_or_null_flag.reserve(data.num_rows); - const auto* delete_sign_column_data = + const auto* delete_signs = BaseTablet::get_delete_sign_column_data(full_block, data.row_pos + data.num_rows); DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_partial_content.sleep", @@ -494,8 +494,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da } // mark key with delete sign as deleted. - bool have_delete_sign = - (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0); + bool have_delete_sign = (delete_signs != nullptr && delete_signs[block_pos] != 0); auto not_found_cb = [&]() { return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error( @@ -619,6 +618,10 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( }; std::vector* skip_bitmaps = get_skip_bitmaps(data.block); + const auto* delete_signs = + BaseTablet::get_delete_sign_column_data(*data.block, data.row_pos + data.num_rows); + DCHECK(delete_signs != nullptr); + bool has_default_or_nullable = false; std::vector use_default_or_null_flag; use_default_or_null_flag.reserve(data.num_rows); @@ -687,24 +690,23 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( // some of them don't. We can't do the de-duplication in memtable. We must de-duplicate them here. if (schema_has_sequence_col) { RETURN_IF_ERROR(_merge_rows_for_sequence_column(data, skip_bitmaps, key_columns, seq_column, - specified_rowsets, segment_caches)); + delete_signs, specified_rowsets, + segment_caches)); if (origin_rows != data.num_rows) { // data in block has changed, should re-encode key columns, sequence column and re-get skip_bitmaps _olap_data_convertor->clear_source_content(); RETURN_IF_ERROR(encode_key_columns(key_columns)); RETURN_IF_ERROR(encode_seq_column(seq_column)); skip_bitmaps = get_skip_bitmaps(data.block); + delete_signs = BaseTablet::get_delete_sign_column_data(*data.block, + data.row_pos + data.num_rows); } } - const auto* delete_sign_column_data = - BaseTablet::get_delete_sign_column_data(*data.block, data.row_pos + data.num_rows); - DCHECK(delete_sign_column_data != nullptr); - // 4. merge duplicate rows and mark delete for insert after delete origin_rows = data.num_rows; RETURN_IF_ERROR(_merge_rows_for_insert_after_delete(data, skip_bitmaps, key_columns, seq_column, - delete_sign_column_data, specified_rowsets, + delete_signs, specified_rowsets, segment_caches)); if (data.num_rows != origin_rows) { // data in block has changed, should re-encode key columns, sequence column and re-get skip_bitmaps, delete signs @@ -712,7 +714,7 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( RETURN_IF_ERROR(encode_key_columns(key_columns)); RETURN_IF_ERROR(encode_seq_column(seq_column)); skip_bitmaps = get_skip_bitmaps(data.block); - delete_sign_column_data = + delete_signs = BaseTablet::get_delete_sign_column_data(*data.block, data.row_pos + data.num_rows); } @@ -730,8 +732,8 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( PartialUpdateStats stats; RETURN_IF_ERROR(_generate_flexible_read_plan( read_plan, data, segment_start_pos, schema_has_sequence_col, seq_map_col_unique_id, - skip_bitmaps, key_columns, seq_column, delete_sign_column_data, specified_rowsets, - segment_caches, has_default_or_nullable, use_default_or_null_flag, stats)); + skip_bitmaps, key_columns, seq_column, delete_signs, specified_rowsets, segment_caches, + has_default_or_nullable, use_default_or_null_flag, stats)); CHECK_EQ(use_default_or_null_flag.size(), data.num_rows); if (config::enable_merge_on_write_correctness_check) { @@ -828,7 +830,7 @@ Status VerticalSegmentWriter::_generate_flexible_read_plan( bool schema_has_sequence_col, int32_t seq_map_col_unique_id, std::vector* skip_bitmaps, const std::vector& key_columns, - vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_sign_column_data, + vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_signs, const std::vector& specified_rowsets, std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, @@ -861,8 +863,8 @@ Status VerticalSegmentWriter::_generate_flexible_read_plan( } // mark key with delete sign as deleted. - bool have_delete_sign = (!skip_bitmap.contains(delete_sign_col_unique_id) && - delete_sign_column_data[block_pos] != 0); + bool have_delete_sign = + (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[block_pos] != 0); auto not_found_cb = [&]() { return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error( @@ -883,15 +885,18 @@ Status VerticalSegmentWriter::_generate_flexible_read_plan( Status VerticalSegmentWriter::_merge_rows_for_sequence_column( RowsInBlock& data, std::vector* skip_bitmaps, const std::vector& key_columns, - vectorized::IOlapColumnDataAccessor* seq_column, + vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_signs, const std::vector& specified_rowsets, std::vector>& segment_caches) { VLOG_DEBUG << fmt::format( "VerticalSegmentWriter::_merge_rows_for_sequence_column enter: data.block:{}\n", data.block->dump_data()); auto seq_col_unique_id = _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id(); + auto delete_sign_col_unique_id = + _tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id(); std::string previous_key {}; bool previous_has_seq_col {false}; + bool previous_has_delete_sign {false}; int duplicate_keys {0}; auto filter_column = vectorized::ColumnUInt8::create(data.num_rows, 1); @@ -901,58 +906,127 @@ Status VerticalSegmentWriter::_merge_rows_for_sequence_column( RETURN_IF_ERROR(_generate_encoded_default_seq_value( *_tablet_schema, *_opts.rowset_ctx->partial_update_info, &encoded_default_seq_value)); + // batched_rows store rows with the same key that have different pattern: + // 0 -> row without sequence col and with delete sign + // 1 -> row without sequence col and without delete sign + // 2 -> row has sequence col with delete sign + // 3 -> row has sequence col and without delete sign + std::array batched_rows {}; + bool has_same_rows {false}; + auto get_idx = [](bool with_seq_col, bool has_delete_sign) { + if (!with_seq_col) { + if (has_delete_sign) { + return 0; + } else { + return 1; + } + } else { + if (has_delete_sign) { + return 2; + } else { + return 3; + } + } + }; for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) { size_t delta_pos = block_pos - data.row_pos; auto& skip_bitmap = skip_bitmaps->at(block_pos); std::string key = _full_encode_keys(key_columns, delta_pos); bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id)); - Status st; + bool row_has_delete_sign = + (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[block_pos] != 0); if (delta_pos > 0 && previous_key == key) { - DCHECK(previous_has_seq_col == !row_has_sequence_col); - ++duplicate_keys; - RowLocation loc; - RowsetSharedPtr rowset; - size_t rid_missing_seq {}; - size_t rid_with_seq {}; - if (row_has_sequence_col) { - rid_missing_seq = block_pos - 1; - rid_with_seq = block_pos; - } else { - rid_missing_seq = block_pos; - rid_with_seq = block_pos - 1; + if (!has_same_rows) { + batched_rows[get_idx(previous_has_seq_col, previous_has_delete_sign)] = + block_pos - 1; + has_same_rows = true; } - std::string previous_encoded_seq_value {}; - st = _tablet->lookup_row_key(key, _tablet_schema.get(), false, specified_rowsets, &loc, - _mow_context->max_version, segment_caches, &rowset, true, - &previous_encoded_seq_value); - DCHECK(st.is() || st.ok()); - - Slice previous_seq_slice {}; - if (st.is()) { - previous_seq_slice = Slice {encoded_default_seq_value}; - } else { - // TODO(bobhan1): we can mark these rows in delete bitmap and eliminate reading them in later phase - _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); - previous_seq_slice = Slice {previous_encoded_seq_value}; - } - std::string cur_encoded_seq_value {}; - _encode_seq_column(seq_column, rid_with_seq, &cur_encoded_seq_value); - // the encoded value is order-preserving, so we can use Slice::compare() to compare them - int res = previous_seq_slice.compare(Slice {cur_encoded_seq_value}); - VLOG_DEBUG << fmt::format( - "VerticalSegmentWriter::_merge_rows_for_sequence_column: rid_with_seq={}, " - "rid_missing_seq={}, res={}", - rid_with_seq, rid_missing_seq, res); - if (res > 0) { - filter_map[rid_with_seq] = 0; - } else if (res < 0) { - filter_map[rid_missing_seq] = 0; - } else { - filter_map[std::min(rid_with_seq, rid_missing_seq)] = 0; + batched_rows[get_idx(row_has_sequence_col, row_has_delete_sign)] = block_pos; + } else { + if (has_same_rows) { + bool has_row_with_seq_col = (batched_rows[0] != -1 || batched_rows[1] != -1); + bool has_row_without_seq_col = (batched_rows[2] != -1 || batched_rows[3] != -1); + if (has_row_with_seq_col && has_row_without_seq_col) { + RowLocation loc; + RowsetSharedPtr rowset; + std::string previous_encoded_seq_value {}; + std::string row_with_delete_sign_encoded_seq_value {}; + std::string row_without_delete_sign_encoded_seq_value {}; + + Status st = _tablet->lookup_row_key(key, _tablet_schema.get(), false, + specified_rowsets, &loc, + _mow_context->max_version, segment_caches, + &rowset, true, &previous_encoded_seq_value); + DCHECK(st.is() || st.ok()); + Slice previous_seq_slice {}; + if (st.is()) { + previous_seq_slice = Slice {encoded_default_seq_value}; + } else { + // TODO(bobhan1): we can mark these rows in delete bitmap and eliminate reading them in later phase + _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); + previous_seq_slice = Slice {previous_encoded_seq_value}; + } + if (batched_rows[2] != -1) { + _encode_seq_column(seq_column, batched_rows[2], + &row_with_delete_sign_encoded_seq_value); + } + if (batched_rows[3] != -1) { + _encode_seq_column(seq_column, batched_rows[2], + &row_without_delete_sign_encoded_seq_value); + } + + auto remove_rows_without_seq = [&]() { + if (batched_rows[0] != -1) { + filter_map[batched_rows[0]] = 0; + ++duplicate_keys; + } + if (batched_rows[1] != -1) { + filter_map[batched_rows[1]] = 0; + ++duplicate_keys; + } + }; + + // the encoded value is order-preserving, so we can use Slice::compare() to compare them + if (batched_rows[2] != -1 && batched_rows[3] != -1) { + if (previous_seq_slice.compare( + Slice {row_with_delete_sign_encoded_seq_value}) <= 0) { + remove_rows_without_seq(); + } else if (previous_seq_slice.compare(Slice { + row_without_delete_sign_encoded_seq_value}) <= 0) { + remove_rows_without_seq(); + filter_map[batched_rows[2]] = 0; + ++duplicate_keys; + } else { + filter_map[batched_rows[2]] = 0; + ++duplicate_keys; + filter_map[batched_rows[3]] = 0; + ++duplicate_keys; + } + } else if (batched_rows[2] != -1) { + if (previous_seq_slice.compare( + Slice {row_with_delete_sign_encoded_seq_value}) <= 0) { + remove_rows_without_seq(); + } else { + filter_map[batched_rows[2]] = 0; + ++duplicate_keys; + } + } else { + if (previous_seq_slice.compare( + Slice {row_with_delete_sign_encoded_seq_value}) <= 0) { + remove_rows_without_seq(); + } else { + filter_map[batched_rows[3]] = 0; + ++duplicate_keys; + } + } + } + has_same_rows = false; + batched_rows.fill(-1); } } previous_key = std::move(key); previous_has_seq_col = row_has_sequence_col; + previous_has_delete_sign = row_has_delete_sign; } if (duplicate_keys > 0) { RETURN_IF_ERROR(_filter_block_for_flexible_partial_update( diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 705586a1677f2a5..658c2051bf125e9 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -181,8 +181,7 @@ class VerticalSegmentWriter { bool schema_has_sequence_col, int32_t seq_map_col_unique_id, std::vector* skip_bitmaps, const std::vector& key_columns, - vectorized::IOlapColumnDataAccessor* seq_column, - const signed char* delete_sign_column_data, + vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_signs, const std::vector& specified_rowsets, std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, @@ -190,7 +189,7 @@ class VerticalSegmentWriter { Status _merge_rows_for_sequence_column( RowsInBlock& data, std::vector* skip_bitmaps, const std::vector& key_columns, - vectorized::IOlapColumnDataAccessor* seq_column, + vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_signs, const std::vector& specified_rowsets, std::vector>& segment_caches); Status _merge_rows_for_insert_after_delete(