Skip to content

Commit

Permalink
Do background compaction when the ratio of stable data covered by del…
Browse files Browse the repository at this point in the history
…ete range is too large (#3698)
  • Loading branch information
lidezhu authored Jan 28, 2022
1 parent 18e3df0 commit 29ddfbe
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 134 deletions.
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ struct Settings
M(SettingUInt64, dt_bg_gc_check_interval, 600, "Background gc thread check interval, the unit is second.")\
M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 15, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.")\
M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all segments")\
M(SettingFloat, dt_bg_gc_delta_delete_ratio_to_trigger_gc, 0.3, "Trigger segment's gc when the ratio of delta delete range to stable exceeds this ratio.")\
M(SettingUInt64, dt_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaTree Engine. By default 0 means no limit.")\
M(SettingBool, dt_enable_rough_set_filter, true, "Whether to parse where expression as Rough Set Index filter or not.") \
M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.")\
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ class DeltaValueSnapshot : public std::enable_shared_from_this<DeltaValueSnapsho
size_t getBytes() const { return bytes; }
size_t getDeletes() const { return deletes; }

RowKeyRange getSquashDeleteRange() const;

const auto & getStorageSnapshot() { return storage_snap; }
const auto & getSharedDeltaIndex() { return shared_delta_index; }
};
Expand Down
13 changes: 12 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,22 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool
return snap;
}

RowKeyRange DeltaValueSnapshot::getSquashDeleteRange() const
{
RowKeyRange squashed_delete_range = RowKeyRange::newNone(is_common_handle, rowkey_column_size);
for (auto iter = packs.cbegin(); iter != packs.cend(); ++iter)
{
const auto & pack = *iter;
if (auto dp_delete = pack->tryToDeleteRange(); dp_delete)
squashed_delete_range = squashed_delete_range.merge(dp_delete->getDeleteRange());
}
return squashed_delete_range;
}

// ================================================
// DeltaValueReader
// ================================================


DeltaValueReader::DeltaValueReader(const DMContext & context,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
Expand Down
171 changes: 107 additions & 64 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp

Large diffs are not rendered by default.

56 changes: 31 additions & 25 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ class DeltaMergeStore : private boost::noncopyable

enum TaskRunThread
{
Thread_BG_Thread_Pool,
Thread_FG,
Thread_BG_GC,
BackgroundThreadPool,
Foreground,
BackgroundGCThread,
};

static std::string toString(ThreadType type)
Expand Down Expand Up @@ -204,21 +204,6 @@ class DeltaMergeStore : private boost::noncopyable
}
}

static std::string toString(TaskRunThread type)
{
switch (type)
{
case Thread_BG_Thread_Pool:
return "BackgroundThreadPool";
case Thread_FG:
return "Foreground";
case Thread_BG_GC:
return "BackgroundGCThread";
default:
return "Unknown";
}
}

static std::string toString(TaskType type)
{
switch (type)
Expand All @@ -240,6 +225,21 @@ class DeltaMergeStore : private boost::noncopyable
}
}

static std::string toString(TaskRunThread type)
{
switch (type)
{
case BackgroundThreadPool:
return "BackgroundThreadPool";
case Foreground:
return "Foreground";
case BackgroundGCThread:
return "BackgroundGCThread";
default:
return "Unknown";
}
}

struct BackgroundTask
{
TaskType type;
Expand Down Expand Up @@ -354,7 +354,10 @@ class DeltaMergeStore : private boost::noncopyable
/// Compact fregment packs into bigger one.
void compact(const Context & context, const RowKeyRange & range);

/// Apply `commands` on `table_columns`
/// Iterator over all segments and apply gc jobs.
UInt64 onSyncGc(Int64 limit);

/// Apply DDL `commands` on `table_columns`
void applyAlters(const AlterCommands & commands, //
const OptionTableInfoConstRef table_info,
ColumnID & max_column_id_used,
Expand All @@ -378,8 +381,6 @@ class DeltaMergeStore : private boost::noncopyable
bool isCommonHandle() const { return is_common_handle; }
size_t getRowKeyColumnSize() const { return rowkey_column_size; }

UInt64 onSyncGc(Int64 limit);

public:
/// Methods mainly used by region split.

Expand All @@ -393,8 +394,7 @@ class DeltaMergeStore : private boost::noncopyable
RegionSplitRes getRegionSplitPoint(DMContext & dm_context, const RowKeyRange & check_range, size_t max_region_size, size_t split_size);

private:

DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id="");
DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id = "");

static bool pkIsHandle(const ColumnDefine & handle_define) { return handle_define.id != EXTRA_HANDLE_COLUMN_ID; }

Expand All @@ -405,13 +405,19 @@ class DeltaMergeStore : private boost::noncopyable

SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground);
void segmentMerge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right, bool is_foreground);
SegmentPtr segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, const TaskRunThread thread);
SegmentPtr segmentMergeDelta(DMContext & dm_context,
const SegmentPtr & segment,
const TaskRunThread thread,
SegmentSnapshotPtr segment_snap = nullptr);

bool updateGCSafePoint();

bool handleBackgroundTask(bool heavy);

bool isSegmentValid(const SegmentPtr & segment);
// isSegmentValid should be protected by lock on `read_write_mutex`
inline bool isSegmentValid(std::shared_lock<std::shared_mutex> &, const SegmentPtr & segment) { return doIsSegmentValid(segment); }
inline bool isSegmentValid(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment) { return doIsSegmentValid(segment); }
bool doIsSegmentValid(const SegmentPtr & segment);

void restoreStableFiles();

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/RowKeyRange.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <Columns/ColumnString.h>
#include <Core/Types.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteHelpers.h>
Expand All @@ -10,7 +11,6 @@
#include <Storages/Transaction/TiKVKeyValue.h>
#include <Storages/Transaction/TiKVRecordFormat.h>
#include <Storages/Transaction/Types.h>
#include <Columns/ColumnString.h>

namespace DB::DM
{
Expand Down Expand Up @@ -116,7 +116,7 @@ struct RowKeyValue
RowKeyValue toPrefixNext()
{
std::vector<UInt8> keys(value->begin(), value->end());
int index = keys.size() - 1;
int index = keys.size() - 1;
for (; index >= 0; index--)
{
if (keys[index] == std::numeric_limits<UInt8>::max())
Expand All @@ -133,8 +133,8 @@ struct RowKeyValue
keys.insert(keys.end(), value->begin(), value->end());
keys.push_back(0);
}
HandleValuePtr prefix_value = std::make_shared<String>(keys.begin(), keys.end());
Int64 prefix_int_value = int_value;
HandleValuePtr prefix_value = std::make_shared<String>(keys.begin(), keys.end());
Int64 prefix_int_value = int_value;
if (!is_common_handle && prefix_int_value != std::numeric_limits<Int64>::max())
{
prefix_int_value++;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ class Segment : private boost::noncopyable
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

/// Return a stream which is suitable for exporting data.
/// reorgize_block: put those rows with the same pk rows into the same block or not.
/// reorganize_block: put those rows with the same pk rows into the same block or not.
BlockInputStreamPtr getInputStreamForDataExport(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRange & data_range,
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
bool reorgnize_block = true) const;
bool reorganize_block = true) const;

BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
Expand Down
Loading

0 comments on commit 29ddfbe

Please sign in to comment.