Skip to content

Commit

Permalink
Abort the current split and forbid later split under illegal split p…
Browse files Browse the repository at this point in the history
…oint, instead of exception (#2214)(#2220)

Signed-off-by: ti-srebot <[email protected]>

Co-authored-by: Flowyi <[email protected]>
  • Loading branch information
ti-srebot and flowbehappy authored Jun 21, 2021
1 parent d74c23e commit 8c3b94e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
8 changes: 5 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
return false;
};
auto try_bg_split = [&](const SegmentPtr & seg) {
if (should_split)
if (should_split && !seg->isSplitForbidden())
{
delta_last_try_split_rows = delta_rows;
delta_last_try_split_bytes = delta_bytes;
Expand All @@ -1241,7 +1241,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
auto try_fg_split = [&](const SegmentPtr & my_segment) -> bool {
auto my_segment_rows = my_segment->getEstimatedRows();
auto my_should_split = my_segment_rows >= dm_context->segment_limit_rows * 3;
if (my_should_split)
if (my_should_split && !my_segment->isSplitForbidden())
{
if (segmentSplit(*dm_context, my_segment, true).first)
return true;
Expand Down Expand Up @@ -1591,7 +1591,9 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP

if (!split_info_opt.has_value())
{
LOG_WARNING(log, "Give up segment [" << segment->segmentId() << "] split because of prepare split failed");
// Likely we can not find an appropriate split point for this segment later, forbid the split until this segment get updated through applying delta-merge. Or it will slow down the write a lot.
segment->forbidSplit();
LOG_WARNING(log, "Giving up and forbid later split. Segment [" << segment->segmentId() << "]. Because of prepare split failed");
return {};
}

Expand Down
48 changes: 39 additions & 9 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,21 @@ std::optional<RowKeyValue> Segment::getSplitPointFast(DMContext & dm_context, co
stream.readSuffix();

RowKeyColumnContainer rowkey_column(block.getByPosition(0).column, is_common_handle);
return {RowKeyValue(rowkey_column.getRowKeyValue(read_row_in_pack))};
RowKeyValue split_point(rowkey_column.getRowKeyValue(read_row_in_pack));


if (!rowkey_range.check(split_point.toRowKeyValueRef())
|| RowKeyRange(rowkey_range.start, split_point, is_common_handle, rowkey_column_size).none()
|| RowKeyRange(split_point, rowkey_range.end, is_common_handle, rowkey_column_size).none())
{
LOG_WARNING(log,
__FUNCTION__ << " unexpected split_handle: " << split_point.toRowKeyValueRef().toDebugString()
<< ", should be in range " << rowkey_range.toDebugString() << ", cur_rows: " << cur_rows
<< ", read_row_in_pack: " << read_row_in_pack << ", file_index: " << file_index);
return {};
}

return {split_point};
}

std::optional<RowKeyValue>
Expand Down Expand Up @@ -775,10 +789,16 @@ Segment::getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_info, c
}
stream->readSuffix();

if (!rowkey_range.check(split_point.toRowKeyValueRef()))
throw Exception("getSplitPointSlow unexpected split_handle: " + split_point.toRowKeyValueRef().toDebugString()
+ ", should be in range " + rowkey_range.toDebugString() + ", exact_rows: " + DB::toString(exact_rows)
+ ", cur count:" + DB::toString(count));
if (!rowkey_range.check(split_point.toRowKeyValueRef())
|| RowKeyRange(rowkey_range.start, split_point, is_common_handle, rowkey_column_size).none()
|| RowKeyRange(split_point, rowkey_range.end, is_common_handle, rowkey_column_size).none())
{
LOG_WARNING(log,
__FUNCTION__ << " unexpected split_handle: " << split_point.toRowKeyValueRef().toDebugString()
<< ", should be in range " << rowkey_range.toDebugString() << ", exact_rows: " << DB::toString(exact_rows)
<< ", cur count: " << DB::toString(count) << ", split_row_index: " << split_row_index);
return {};
}

return {split_point};
}
Expand All @@ -792,7 +812,9 @@ std::optional<Segment::SplitInfo> Segment::prepareSplit(DMContext &
if (!dm_context.enable_logical_split //
|| segment_snap->stable->getPacks() <= 3 //
|| segment_snap->delta->getRows() > segment_snap->stable->getRows())
{
return prepareSplitPhysical(dm_context, schema_snap, segment_snap, wbs, need_rate_limit);
}
else
{
auto split_point_opt = getSplitPointFast(dm_context, segment_snap->stable);
Expand Down Expand Up @@ -828,8 +850,12 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co
RowKeyRange other_range(split_point, rowkey_range.end, is_common_handle, rowkey_column_size);

if (my_range.none() || other_range.none())
throw Exception("prepareSplitLogical: unexpected range! my_range: " + my_range.toDebugString()
+ ", other_range: " + other_range.toDebugString());
{
LOG_WARNING(log,
__FUNCTION__ << ": unexpected range! my_range: " << my_range.toDebugString()
<< ", other_range: " << other_range.toDebugString() << ", aborted");
return {};
}

GenPageId log_gen_page_id = std::bind(&StoragePool::newLogPageId, &storage_pool);

Expand Down Expand Up @@ -892,8 +918,12 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitPhysical(DMContext &
RowKeyRange other_range(split_point, rowkey_range.end, is_common_handle, rowkey_column_size);

if (my_range.none() || other_range.none())
throw Exception("prepareSplitPhysical: unexpected range! my_range: " + my_range.toDebugString()
+ ", other_range: " + other_range.toDebugString());
{
LOG_WARNING(log,
__FUNCTION__ << ": unexpected range! my_range: " << my_range.toDebugString()
<< ", other_range: " << other_range.toDebugString() << ", aborted");
return {};
}

StableValueSpacePtr my_new_stable;
StableValueSpacePtr other_stable;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ class Segment : private boost::noncopyable
}
bool hasAbandoned() { return delta->hasAbandoned(); }

bool isSplitForbidden() { return split_forbidden; }
void forbidSplit() { split_forbidden = true; }

void drop(const FileProviderPtr & file_provider) { stable->drop(file_provider); }

RowsAndBytes
Expand Down Expand Up @@ -329,6 +332,8 @@ class Segment : private boost::noncopyable
const DeltaValueSpacePtr delta;
const StableValueSpacePtr stable;

bool split_forbidden = false;

Logger * log;
};

Expand Down

0 comments on commit 8c3b94e

Please sign in to comment.