Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Oct 11, 2024
1 parent ae8756f commit 24069a4
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 73 deletions.
50 changes: 37 additions & 13 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,10 +602,10 @@ void MemTable::_aggregate() {
// !!ATTENTION!!: there may be rows with the same keys after MemTable::_aggregate() in this situation.

// batched_rows store rows with the same key that have different pattern:
// 0 -> row has sequence col with delete sign
// 1 -> row has sequence col and without delete sign
// 2 -> row without sequence col and with delete sign
// 3 -> row without sequence col and without delete sign
// 0 -> row without sequence col and with delete sign
// 1 -> row without sequence col and without delete sign
// 2 -> row has sequence col with delete sign
// 3 -> row has sequence col and without delete sign
std::array<RowInBlock*, 4> batched_rows {};
auto finalize_rows = [&]() {
for (auto& row : batched_rows) {
Expand All @@ -618,7 +618,7 @@ void MemTable::_aggregate() {
batched_rows.fill(nullptr);
};
auto get_idx = [](bool with_seq_col, bool has_delete_sign) {
if (with_seq_col) {
if (!with_seq_col) {
if (has_delete_sign) {
return 0;
} else {
Expand Down Expand Up @@ -651,14 +651,38 @@ void MemTable::_aggregate() {
}
}
if (prev_row != nullptr && (*_vec_row_comparator)(prev_row, cur_row) == 0) {
if (has_delete_sign) {
RowInBlock*& row_without_delete_sign =
(with_seq_col ? batched_rows[1] : batched_rows[3]);
if (row_without_delete_sign != nullptr) {
// if there exits row without delete sign, remove it first
row_clear_agg(row_without_delete_sign);
_stat.merged_rows++;
row_without_delete_sign = nullptr;
if (!with_seq_col) {
if (has_delete_sign) {
RowInBlock*& row_without_delete_sign = batched_rows[1];
if (row_without_delete_sign != nullptr) {
// if there exits row without delete sign, remove it first
row_clear_agg(row_without_delete_sign);
_stat.merged_rows++;
row_without_delete_sign = nullptr;
}
}
} else {
// batched_rows[3]'s seq value >= batched_rows[2]'s seq value
prev_row = (batched_rows[3] != nullptr ? batched_rows[3] : batched_rows[2]);
if (prev_row != nullptr) {
auto* col_ptr =
mutable_block.mutable_columns()[_seq_col_idx_in_block].get();
int res = col_ptr->compare_at(prev_row->_row_pos, cur_row->_row_pos,
*col_ptr, -1);
if (res > 0) {
// previous rows have higher sequence value, ignore this row
_stat.merged_rows++;
continue;
}
}
if (has_delete_sign) {
RowInBlock*& row_without_delete_sign = batched_rows[1];
if (row_without_delete_sign != nullptr) {
// if there exits row without delete sign, remove it first
row_clear_agg(row_without_delete_sign);
_stat.merged_rows++;
row_without_delete_sign = nullptr;
}
}
}
prev_row = batched_rows[get_idx(with_seq_col, has_delete_sign)];
Expand Down
188 changes: 131 additions & 57 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
bool has_default_or_nullable = false;
std::vector<bool> use_default_or_null_flag;
use_default_or_null_flag.reserve(data.num_rows);
const auto* delete_sign_column_data =
const auto* delete_signs =
BaseTablet::get_delete_sign_column_data(full_block, data.row_pos + data.num_rows);

DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_partial_content.sleep",
Expand Down Expand Up @@ -494,8 +494,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
}

// mark key with delete sign as deleted.
bool have_delete_sign =
(delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0);
bool have_delete_sign = (delete_signs != nullptr && delete_signs[block_pos] != 0);

auto not_found_cb = [&]() {
return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error(
Expand Down Expand Up @@ -619,6 +618,10 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
};
std::vector<BitmapValue>* skip_bitmaps = get_skip_bitmaps(data.block);

const auto* delete_signs =
BaseTablet::get_delete_sign_column_data(*data.block, data.row_pos + data.num_rows);
DCHECK(delete_signs != nullptr);

bool has_default_or_nullable = false;
std::vector<bool> use_default_or_null_flag;
use_default_or_null_flag.reserve(data.num_rows);
Expand Down Expand Up @@ -687,32 +690,31 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
// some of them don't. We can't do the de-duplication in memtable. We must de-duplicate them here.
if (schema_has_sequence_col) {
RETURN_IF_ERROR(_merge_rows_for_sequence_column(data, skip_bitmaps, key_columns, seq_column,
specified_rowsets, segment_caches));
delete_signs, specified_rowsets,
segment_caches));
if (origin_rows != data.num_rows) {
// data in block has changed, should re-encode key columns, sequence column and re-get skip_bitmaps
_olap_data_convertor->clear_source_content();
RETURN_IF_ERROR(encode_key_columns(key_columns));
RETURN_IF_ERROR(encode_seq_column(seq_column));
skip_bitmaps = get_skip_bitmaps(data.block);
delete_signs = BaseTablet::get_delete_sign_column_data(*data.block,
data.row_pos + data.num_rows);
}
}

const auto* delete_sign_column_data =
BaseTablet::get_delete_sign_column_data(*data.block, data.row_pos + data.num_rows);
DCHECK(delete_sign_column_data != nullptr);

// 4. merge duplicate rows and mark delete for insert after delete
origin_rows = data.num_rows;
RETURN_IF_ERROR(_merge_rows_for_insert_after_delete(data, skip_bitmaps, key_columns, seq_column,
delete_sign_column_data, specified_rowsets,
delete_signs, specified_rowsets,
segment_caches));
if (data.num_rows != origin_rows) {
// data in block has changed, should re-encode key columns, sequence column and re-get skip_bitmaps, delete signs
_olap_data_convertor->clear_source_content();
RETURN_IF_ERROR(encode_key_columns(key_columns));
RETURN_IF_ERROR(encode_seq_column(seq_column));
skip_bitmaps = get_skip_bitmaps(data.block);
delete_sign_column_data =
delete_signs =
BaseTablet::get_delete_sign_column_data(*data.block, data.row_pos + data.num_rows);
}

Expand All @@ -730,8 +732,8 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
PartialUpdateStats stats;
RETURN_IF_ERROR(_generate_flexible_read_plan(
read_plan, data, segment_start_pos, schema_has_sequence_col, seq_map_col_unique_id,
skip_bitmaps, key_columns, seq_column, delete_sign_column_data, specified_rowsets,
segment_caches, has_default_or_nullable, use_default_or_null_flag, stats));
skip_bitmaps, key_columns, seq_column, delete_signs, specified_rowsets, segment_caches,
has_default_or_nullable, use_default_or_null_flag, stats));
CHECK_EQ(use_default_or_null_flag.size(), data.num_rows);

if (config::enable_merge_on_write_correctness_check) {
Expand Down Expand Up @@ -828,7 +830,7 @@ Status VerticalSegmentWriter::_generate_flexible_read_plan(
bool schema_has_sequence_col, int32_t seq_map_col_unique_id,
std::vector<BitmapValue>* skip_bitmaps,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_sign_column_data,
vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_signs,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
Expand Down Expand Up @@ -861,8 +863,8 @@ Status VerticalSegmentWriter::_generate_flexible_read_plan(
}

// mark key with delete sign as deleted.
bool have_delete_sign = (!skip_bitmap.contains(delete_sign_col_unique_id) &&
delete_sign_column_data[block_pos] != 0);
bool have_delete_sign =
(!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[block_pos] != 0);

auto not_found_cb = [&]() {
return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error(
Expand All @@ -883,15 +885,18 @@ Status VerticalSegmentWriter::_generate_flexible_read_plan(
Status VerticalSegmentWriter::_merge_rows_for_sequence_column(
RowsInBlock& data, std::vector<BitmapValue>* skip_bitmaps,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column,
vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_signs,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
VLOG_DEBUG << fmt::format(
"VerticalSegmentWriter::_merge_rows_for_sequence_column enter: data.block:{}\n",
data.block->dump_data());
auto seq_col_unique_id = _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id();
auto delete_sign_col_unique_id =
_tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id();
std::string previous_key {};
bool previous_has_seq_col {false};
bool previous_has_delete_sign {false};
int duplicate_keys {0};

auto filter_column = vectorized::ColumnUInt8::create(data.num_rows, 1);
Expand All @@ -901,58 +906,127 @@ Status VerticalSegmentWriter::_merge_rows_for_sequence_column(
RETURN_IF_ERROR(_generate_encoded_default_seq_value(
*_tablet_schema, *_opts.rowset_ctx->partial_update_info, &encoded_default_seq_value));

// batched_rows store rows with the same key that have different pattern:
// 0 -> row without sequence col and with delete sign
// 1 -> row without sequence col and without delete sign
// 2 -> row has sequence col with delete sign
// 3 -> row has sequence col and without delete sign
std::array<int64_t, 4> batched_rows {};
bool has_same_rows {false};
auto get_idx = [](bool with_seq_col, bool has_delete_sign) {
if (!with_seq_col) {
if (has_delete_sign) {
return 0;
} else {
return 1;
}
} else {
if (has_delete_sign) {
return 2;
} else {
return 3;
}
}
};
for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) {
size_t delta_pos = block_pos - data.row_pos;
auto& skip_bitmap = skip_bitmaps->at(block_pos);
std::string key = _full_encode_keys(key_columns, delta_pos);
bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
Status st;
bool row_has_delete_sign =
(!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[block_pos] != 0);
if (delta_pos > 0 && previous_key == key) {
DCHECK(previous_has_seq_col == !row_has_sequence_col);
++duplicate_keys;
RowLocation loc;
RowsetSharedPtr rowset;
size_t rid_missing_seq {};
size_t rid_with_seq {};
if (row_has_sequence_col) {
rid_missing_seq = block_pos - 1;
rid_with_seq = block_pos;
} else {
rid_missing_seq = block_pos;
rid_with_seq = block_pos - 1;
if (!has_same_rows) {
batched_rows[get_idx(previous_has_seq_col, previous_has_delete_sign)] =
block_pos - 1;
has_same_rows = true;
}
std::string previous_encoded_seq_value {};
st = _tablet->lookup_row_key(key, _tablet_schema.get(), false, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset, true,
&previous_encoded_seq_value);
DCHECK(st.is<KEY_NOT_FOUND>() || st.ok());

Slice previous_seq_slice {};
if (st.is<KEY_NOT_FOUND>()) {
previous_seq_slice = Slice {encoded_default_seq_value};
} else {
// TODO(bobhan1): we can mark these rows in delete bitmap and eliminate reading them in later phase
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
previous_seq_slice = Slice {previous_encoded_seq_value};
}
std::string cur_encoded_seq_value {};
_encode_seq_column(seq_column, rid_with_seq, &cur_encoded_seq_value);
// the encoded value is order-preserving, so we can use Slice::compare() to compare them
int res = previous_seq_slice.compare(Slice {cur_encoded_seq_value});
VLOG_DEBUG << fmt::format(
"VerticalSegmentWriter::_merge_rows_for_sequence_column: rid_with_seq={}, "
"rid_missing_seq={}, res={}",
rid_with_seq, rid_missing_seq, res);
if (res > 0) {
filter_map[rid_with_seq] = 0;
} else if (res < 0) {
filter_map[rid_missing_seq] = 0;
} else {
filter_map[std::min(rid_with_seq, rid_missing_seq)] = 0;
batched_rows[get_idx(row_has_sequence_col, row_has_delete_sign)] = block_pos;
} else {
if (has_same_rows) {
bool has_row_with_seq_col = (batched_rows[0] != -1 || batched_rows[1] != -1);
bool has_row_without_seq_col = (batched_rows[2] != -1 || batched_rows[3] != -1);
if (has_row_with_seq_col && has_row_without_seq_col) {
RowLocation loc;
RowsetSharedPtr rowset;
std::string previous_encoded_seq_value {};
std::string row_with_delete_sign_encoded_seq_value {};
std::string row_without_delete_sign_encoded_seq_value {};

Status st = _tablet->lookup_row_key(key, _tablet_schema.get(), false,
specified_rowsets, &loc,
_mow_context->max_version, segment_caches,
&rowset, true, &previous_encoded_seq_value);
DCHECK(st.is<KEY_NOT_FOUND>() || st.ok());
Slice previous_seq_slice {};
if (st.is<KEY_NOT_FOUND>()) {
previous_seq_slice = Slice {encoded_default_seq_value};
} else {
// TODO(bobhan1): we can mark these rows in delete bitmap and eliminate reading them in later phase
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
previous_seq_slice = Slice {previous_encoded_seq_value};
}
if (batched_rows[2] != -1) {
_encode_seq_column(seq_column, batched_rows[2],
&row_with_delete_sign_encoded_seq_value);
}
if (batched_rows[3] != -1) {
_encode_seq_column(seq_column, batched_rows[2],
&row_without_delete_sign_encoded_seq_value);
}

auto remove_rows_without_seq = [&]() {
if (batched_rows[0] != -1) {
filter_map[batched_rows[0]] = 0;
++duplicate_keys;
}
if (batched_rows[1] != -1) {
filter_map[batched_rows[1]] = 0;
++duplicate_keys;
}
};

// the encoded value is order-preserving, so we can use Slice::compare() to compare them
if (batched_rows[2] != -1 && batched_rows[3] != -1) {
if (previous_seq_slice.compare(
Slice {row_with_delete_sign_encoded_seq_value}) <= 0) {
remove_rows_without_seq();
} else if (previous_seq_slice.compare(Slice {
row_without_delete_sign_encoded_seq_value}) <= 0) {
remove_rows_without_seq();
filter_map[batched_rows[2]] = 0;
++duplicate_keys;
} else {
filter_map[batched_rows[2]] = 0;
++duplicate_keys;
filter_map[batched_rows[3]] = 0;
++duplicate_keys;
}
} else if (batched_rows[2] != -1) {
if (previous_seq_slice.compare(
Slice {row_with_delete_sign_encoded_seq_value}) <= 0) {
remove_rows_without_seq();
} else {
filter_map[batched_rows[2]] = 0;
++duplicate_keys;
}
} else {
if (previous_seq_slice.compare(
Slice {row_with_delete_sign_encoded_seq_value}) <= 0) {
remove_rows_without_seq();
} else {
filter_map[batched_rows[3]] = 0;
++duplicate_keys;
}
}
}
has_same_rows = false;
batched_rows.fill(-1);
}
}
previous_key = std::move(key);
previous_has_seq_col = row_has_sequence_col;
previous_has_delete_sign = row_has_delete_sign;
}
if (duplicate_keys > 0) {
RETURN_IF_ERROR(_filter_block_for_flexible_partial_update(
Expand Down
Loading

0 comments on commit 24069a4

Please sign in to comment.