Skip to content

Commit

Permalink
Fix inconsist number of rows after applying snapshot (#2504) (#2516)
Browse files Browse the repository at this point in the history
* Fix bug for loading key-values from SST
* Fix bug for SSTReader->key/value return BaseBuffView

Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang committed Jul 28, 2021
1 parent 0e93f79 commit c8fabfb
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 49 deletions.
9 changes: 6 additions & 3 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
M(minimum_block_size_for_cross_join)
#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
M(minimum_block_size_for_cross_join) \
M(random_slow_page_storage_remove_expired_snapshots) \
M(random_slow_page_storage_list_all_live_files) \
M(force_set_safepoint_when_decode_block)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ DBGInvoker::DBGInvoker()
regSchemalessFunc("region_snapshot_pre_handle_block", /**/ MockRaftCommand::dbgFuncRegionSnapshotPreHandleBlock);
regSchemalessFunc("region_snapshot_apply_block", /* */ MockRaftCommand::dbgFuncRegionSnapshotApplyBlock);
regSchemalessFunc("region_snapshot_pre_handle_file", /* */ MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFiles);
regSchemalessFunc("region_snapshot_pre_handle_file_pks", MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFilesWithHandles);
regSchemalessFunc("region_snapshot_apply_file", /* */ MockRaftCommand::dbgFuncRegionSnapshotApplyDTFiles);
regSchemalessFunc("region_ingest_sst", MockRaftCommand::dbgFuncIngestSST);

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/dbgFuncMockRaftCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ struct MockRaftCommand
// ./storage-client.sh "DBGInvoke region_snapshot_pre_handle_file(database_name, table_name, region_id, start, end, schema_string, pk_name[, test-fields=1, cfs="write,default"])"
static void dbgFuncRegionSnapshotPreHandleDTFiles(Context & context, const ASTs & args, DBGInvoker::Printer output);

static void dbgFuncRegionSnapshotPreHandleDTFilesWithHandles(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Apply snapshot for a region. (apply a pre-handle snapshot)
// Usage:
// ./storages-client.sh "DBGInvoke region_snapshot_apply_file(region_id)"
Expand Down
175 changes: 170 additions & 5 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace FailPoints
{
extern const char force_set_sst_to_dtfile_block_size[];
extern const char force_set_sst_decode_rand[];
extern const char force_set_safepoint_when_decode_block[];
} // namespace FailPoints

namespace ErrorCodes
Expand Down Expand Up @@ -356,19 +357,86 @@ void GenMockSSTData(const TiDB::TableInfo & table_info,
fields.emplace_back(handle_id / 2);
}

// Check the MVCC (key-format and transaction model) for details
// https://en.pingcap.com/blog/2016-11-17-mvcc-in-tikv#mvcc
{
TiKVKey key = RecordKVFormat::genKey(table_id, handle_id);
std::stringstream ss;
RegionBench::encodeRow(table_info, fields, ss);
TiKVValue prewrite_value(ss.str());
UInt64 commit_ts = handle_id;
UInt64 prewrite_ts = commit_ts;
TiKVValue commit_value = RecordKVFormat::encodeWriteCfValue(Region::PutFlag, prewrite_ts);
TiKVKey commit_key = RecordKVFormat::appendTs(key, commit_ts);

UInt64 prewrite_ts = handle_id;
UInt64 commit_ts = prewrite_ts + 100; // Assume that commit_ts is larger that prewrite_ts

TiKVKey prewrite_key = RecordKVFormat::appendTs(key, prewrite_ts);
default_kv_list.emplace_back(std::make_pair(std::move(prewrite_key), std::move(prewrite_value)));

TiKVKey commit_key = RecordKVFormat::appendTs(key, commit_ts);
TiKVValue commit_value = RecordKVFormat::encodeWriteCfValue(Region::PutFlag, prewrite_ts);
write_kv_list.emplace_back(std::make_pair(std::move(commit_key), std::move(commit_value)));
}
}

MockSSTReader::getMockSSTData().clear();

if (cfs.count(ColumnFamilyType::Write) > 0)
MockSSTReader::getMockSSTData()[MockSSTReader::Key{store_key, ColumnFamilyType::Write}] = std::move(write_kv_list);
if (cfs.count(ColumnFamilyType::Default) > 0)
MockSSTReader::getMockSSTData()[MockSSTReader::Key{store_key, ColumnFamilyType::Default}] = std::move(default_kv_list);
}

// TODO: make it a more generic testing function
void GenMockSSTDataByHandles(const TiDB::TableInfo & table_info,
TableID table_id,
const String & store_key,
const std::vector<UInt64> & handles,
UInt64 num_fields = 1,
const std::unordered_set<ColumnFamilyType> & cfs = {ColumnFamilyType::Write, ColumnFamilyType::Default})
{
MockSSTReader::Data write_kv_list, default_kv_list;
size_t num_rows = handles.size();

for (size_t index = 0; index < handles.size(); ++index)
{
const auto handle_id = handles[index];
std::vector<Field> fields;
if (num_fields > 0)
{
// make it have one column Int64 just for test
fields.emplace_back(-handle_id);
}
if (num_fields > 1 && index >= num_rows / 3)
{
// column String for test
std::string s = "_" + DB::toString(handle_id);
Field f(s.data(), s.size());
fields.emplace_back(std::move(f));
}
if (num_fields > 2 && index >= 2 * num_rows / 3)
{
// column UInt64 for test
fields.emplace_back(handle_id / 2);
}

// Check the MVCC (key-format and transaction model) for details
// https://en.pingcap.com/blog/2016-11-17-mvcc-in-tikv#mvcc
// The rows (primary key, timestamp) are sorted by primary key asc, timestamp desc in SSTFiles
// https://github.com/pingcap/tics/issues/1864
{
TiKVKey key = RecordKVFormat::genKey(table_id, handle_id);
std::stringstream ss;
RegionBench::encodeRow(table_info, fields, ss);
TiKVValue prewrite_value(ss.str());

UInt64 prewrite_ts = 100000 + num_rows - index; // make it to be timestamp desc in SSTFiles
UInt64 commit_ts = prewrite_ts + 100; // Assume that commit_ts is larger that prewrite_ts

TiKVKey prewrite_key = RecordKVFormat::appendTs(key, prewrite_ts);
default_kv_list.emplace_back(std::make_pair(std::move(prewrite_key), std::move(prewrite_value)));

TiKVKey commit_key = RecordKVFormat::appendTs(key, commit_ts);
TiKVValue commit_value = RecordKVFormat::encodeWriteCfValue(Region::PutFlag, prewrite_ts);
write_kv_list.emplace_back(std::make_pair(std::move(commit_key), std::move(commit_value)));
}
}

Expand Down Expand Up @@ -592,7 +660,102 @@ void MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFiles(Context & context, c
// We may call this function mutiple time to mock some situation, try to reuse the region in `GLOBAL_REGION_MAP`
// so that we can collect uncommitted data.
UInt64 index = MockTiKV::instance().getRaftIndex(region_id) + 1;
RegionPtr new_region = RegionBench::createRegion(table->id(), region_id, start_handle, end_handle, index);
RegionPtr new_region = RegionBench::createRegion(table->id(), region_id, start_handle, end_handle + 10000, index);

// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, new_region);

std::vector<SSTView> sst_views;
{
if (cfs.count(ColumnFamilyType::Write) > 0)
sst_views.push_back(SSTView{
ColumnFamilyType::Write,
BaseBuffView{region_name.data(), region_name.length()},
});
if (cfs.count(ColumnFamilyType::Default) > 0)
sst_views.push_back(SSTView{
ColumnFamilyType::Default,
BaseBuffView{region_name.data(), region_name.length()},
});
}

// set block size so that we can test for schema-sync while decoding dt files
FailPointHelper::enableFailPoint(FailPoints::force_set_sst_to_dtfile_block_size);
FailPointHelper::enableFailPoint(FailPoints::force_set_safepoint_when_decode_block);

auto ingest_ids = kvstore->preHandleSnapshotToFiles(
new_region, SSTViewVec{sst_views.data(), sst_views.size()}, index, MockTiKV::instance().getRaftTerm(region_id), tmt);
GLOBAL_REGION_MAP.insertRegionSnap(region_name, {new_region, ingest_ids});

FailPointHelper::disableFailPoint(FailPoints::force_set_safepoint_when_decode_block);
{
std::stringstream ss;
ss << "Generate " << ingest_ids.size() << " files for [region_id=" << region_id << "]";
output(ss.str());
}
}

// Simulate a region pre-handle snapshot data to DTFiles
// ./storage-client.sh "DBGInvoke region_snapshot_pre_handle_file_with_handles(database_name, table_name, region_id, schema_string, pk_name, handle0, handle1, ..., handlek)"
void MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFilesWithHandles(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 6)
throw Exception(
"Args not matched, should be: database_name, table_name, region_id, schema_string, pk_name, handle0, handle1, ..., handlek",
ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
RegionID region_id = (RegionID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[2]).value);

const String schema_str = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);
String handle_pk_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[4]).value);

std::vector<UInt64> handles;
for (size_t i = 5; i < args.size(); ++i)
{
handles.push_back(safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[i]).value));
}

UInt64 test_fields = 1;
std::unordered_set<ColumnFamilyType> cfs;
cfs.insert(ColumnFamilyType::Write);
cfs.insert(ColumnFamilyType::Default);

// Parse a TableInfo from `schema_str` to generate data with this schema
TiDB::TableInfoPtr mocked_table_info;
{
ASTPtr columns_ast;
ParserColumnDeclarationList schema_parser;
Tokens tokens(schema_str.data(), schema_str.data() + schema_str.length());
TokenIterator pos(tokens);
Expected expected;
if (!schema_parser.parse(pos, columns_ast, expected))
throw Exception("Invalid TiDB table schema", ErrorCodes::LOGICAL_ERROR);
ColumnsDescription columns
= InterpreterCreateQuery::getColumnsDescription(typeid_cast<const ASTExpressionList &>(*columns_ast), context);
mocked_table_info = MockTiDB::parseColumns(table_name, columns, handle_pk_name, "dt");
}

MockTiDB::TablePtr table = MockTiDB::instance().getTableByName(database_name, table_name);
const auto & table_info = RegionBench::getTableInfo(context, database_name, table_name);
if (table_info.is_common_handle)
throw Exception("Mocking pre handle SST files to DTFiles to a common handle table is not supported", ErrorCodes::LOGICAL_ERROR);

// Mock SST data for handle [start, end)
const auto region_name = "__snap_snap_" + std::to_string(region_id);
GenMockSSTDataByHandles(*mocked_table_info, table->id(), region_name, handles, test_fields, cfs);

auto & tmt = context.getTMTContext();
auto & kvstore = tmt.getKVStore();
auto old_region = kvstore->getRegion(region_id);

// We may call this function mutiple time to mock some situation, try to reuse the region in `GLOBAL_REGION_MAP`
// so that we can collect uncommitted data.
UInt64 index = MockTiKV::instance().getRaftIndex(region_id) + 1;
UInt64 region_start_handle = handles[0];
UInt64 region_end_handle = handles.back() + 10000;
RegionPtr new_region = RegionBench::createRegion(table->id(), region_id, region_start_handle, region_end_handle, index);

// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, new_region);
Expand All @@ -613,11 +776,13 @@ void MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFiles(Context & context, c

// set block size so that we can test for schema-sync while decoding dt files
FailPointHelper::enableFailPoint(FailPoints::force_set_sst_to_dtfile_block_size);
FailPointHelper::enableFailPoint(FailPoints::force_set_safepoint_when_decode_block);

auto ingest_ids = kvstore->preHandleSnapshotToFiles(
new_region, SSTViewVec{sst_views.data(), sst_views.size()}, index, MockTiKV::instance().getRaftTerm(region_id), tmt);
GLOBAL_REGION_MAP.insertRegionSnap(region_name, {new_region, ingest_ids});

FailPointHelper::disableFailPoint(FailPoints::force_set_safepoint_when_decode_block);
{
std::stringstream ss;
ss << "Generate " << ingest_ids.size() << " files for [region_id=" << region_id << "]";
Expand Down
Loading

0 comments on commit c8fabfb

Please sign in to comment.