Skip to content

Commit

Permalink
feat(cloud): Add File Cache Consistency Check (#41280)
Browse files Browse the repository at this point in the history
Add a feature to verify the consistency of the file cache, checking
whether the disk cache is under the control of the file cache management
system.
  • Loading branch information
yt committed Nov 12, 2024
1 parent 954661c commit f7c7869
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 2 deletions.
23 changes: 22 additions & 1 deletion be/src/http/action/file_cache_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ constexpr static std::string_view CLEAR = "clear";
constexpr static std::string_view RESET = "reset";
constexpr static std::string_view HASH = "hash";
constexpr static std::string_view LIST_CACHE = "list_cache";
constexpr static std::string_view LIST_BASE_PATH = "list_base_path";
constexpr static std::string_view CHECK_CONSISTENCY = "check_consistency";
constexpr static std::string_view CAPACITY = "capacity";
constexpr static std::string_view RELEASE = "release";
constexpr static std::string_view BASE_PATH = "base_path";
Expand Down Expand Up @@ -127,6 +129,25 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
*json_metrics = json.ToString();
}
}
} else if (operation == LIST_BASE_PATH) {
auto all_cache_base_path = io::FileCacheFactory::instance()->get_base_paths();
EasyJson json;
std::for_each(all_cache_base_path.begin(), all_cache_base_path.end(),
[&json](auto& x) { json.PushBack(std::move(x)); });
*json_metrics = json.ToString();
} else if (operation == CHECK_CONSISTENCY) {
const std::string& cache_base_path = req->param(BASE_PATH.data());
if (cache_base_path.empty()) {
st = Status::InvalidArgument("missing parameter: {} is required", VALUE.data());
} else {
auto* block_file_cache = io::FileCacheFactory::instance()->get_by_path(cache_base_path);
std::vector<std::string> inconsistencies;
RETURN_IF_ERROR(block_file_cache->report_file_cache_inconsistency(inconsistencies));
EasyJson json;
std::for_each(inconsistencies.begin(), inconsistencies.end(),
[&json](auto& x) { json.PushBack(std::move(x)); });
*json_metrics = json.ToString();
}
} else {
st = Status::InternalError("invalid operation: {}", operation);
}
Expand All @@ -145,4 +166,4 @@ void FileCacheAction::handle(HttpRequest* req) {
}
}

} // namespace doris
} // namespace doris
82 changes: 82 additions & 0 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include "io/cache/block_file_cache.h"

#include <unordered_set>

#include "common/status.h"
#include "cpp/sync_point.h"

Expand Down Expand Up @@ -2084,4 +2086,84 @@ std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& block_lock, bool sync);

Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
InconsistencyContext inconsistency_context;
RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
auto n = inconsistency_context.types.size();
results.reserve(n);
for (size_t i = 0; i < n; i++) {
std::string result;
result += "File cahce info in manager:\n";
result += inconsistency_context.infos_in_manager[i].to_string();
result += "File cahce info in storage:\n";
result += inconsistency_context.infos_in_storage[i].to_string();
result += inconsistency_context.types[i].to_string();
result += "\n";
results.push_back(std::move(result));
}
return Status::OK();
}

Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
std::lock_guard<std::mutex> cache_lock(_mutex);
std::vector<FileCacheInfo> infos_in_storage;
RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
for (const auto& info_in_storage : infos_in_storage) {
confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
if (cell == nullptr) {
inconsistency_context.infos_in_manager.emplace_back();
inconsistency_context.infos_in_storage.push_back(info_in_storage);
inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
continue;
}
FileCacheInfo info_in_manager {
.hash = info_in_storage.hash,
.expiration_time = cell->file_block->expiration_time(),
.size = cell->size(),
.offset = info_in_storage.offset,
.is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
.cache_type = cell->file_block->cache_type()};
InconsistencyType inconsistent_type;
if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
}
size_t expected_size =
info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
if (info_in_storage.size != expected_size) {
inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
}
// Only if it is not a tmp file need we check the cache type.
if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
info_in_storage.cache_type != info_in_manager.cache_type) {
inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
}
if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
}
if (inconsistent_type != InconsistencyType::NONE) {
inconsistency_context.infos_in_manager.push_back(info_in_manager);
inconsistency_context.infos_in_storage.push_back(info_in_storage);
inconsistency_context.types.push_back(inconsistent_type);
}
}

for (const auto& [hash, offset_to_cell] : _files) {
for (const auto& [offset, cell] : offset_to_cell) {
if (confirmed_blocks.contains({hash, offset})) {
continue;
}
const auto& block = cell.file_block;
inconsistency_context.infos_in_manager.emplace_back(
hash, block->expiration_time(), cell.size(), offset,
cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
inconsistency_context.infos_in_storage.emplace_back();
inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
}
}
return Status::OK();
}

} // namespace doris::io
4 changes: 4 additions & 0 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ class BlockFileCache {
using QueryFileCacheContextHolderPtr = std::unique_ptr<QueryFileCacheContextHolder>;
QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId& query_id);

Status report_file_cache_inconsistency(std::vector<std::string>& results);
Status check_file_cache_consistency(InconsistencyContext& inconsistency_context);

private:
struct FileBlockCell {
FileBlockSPtr file_block;
Expand All @@ -349,6 +352,7 @@ class BlockFileCache {
bool releasable() const { return file_block.use_count() == 1; }

size_t size() const { return file_block->_block_range.size(); }
size_t dowloading_size() const { return file_block->_downloaded_size; }

FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock);
FileBlockCell(FileBlockCell&& other) noexcept
Expand Down
52 changes: 52 additions & 0 deletions be/src/io/cache/file_cache_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@

namespace doris::io {

std::string file_cache_type_to_string(FileCacheType type) {
switch (type) {
case FileCacheType::INDEX:
return "INDEX";
case FileCacheType::NORMAL:
return "NORMAL";
case FileCacheType::DISPOSABLE:
return "DISPOSABLE";
case FileCacheType::TTL:
return "TTL";
default:
return "UNKNOWN";
}
}

std::string FileCacheSettings::to_string() const {
std::stringstream ss;
ss << "capacity: " << capacity << ", max_file_block_size: " << max_file_block_size
Expand Down Expand Up @@ -86,4 +101,41 @@ FileBlocksHolderPtr FileCacheAllocatorBuilder::allocate_cache_holder(size_t offs
return std::make_unique<FileBlocksHolder>(std::move(holder));
}

std::string FileCacheInfo::to_string() const {
std::stringstream ss;
ss << "Hash: " << hash.to_string() << "\n"
<< "Expiration Time: " << expiration_time << "\n"
<< "Offset: " << offset << "\n"
<< "Cache Type: " << file_cache_type_to_string(cache_type) << "\n";
return ss.str();
}

std::string InconsistencyType::to_string() const {
std::string result = "Inconsistency Reason: ";
if (type == NONE) {
result += "NONE";
} else {
if (type & NOT_LOADED) {
result += "NOT_LOADED ";
}
if (type & MISSING_IN_STORAGE) {
result += "MISSING_IN_STORAGE ";
}
if (type & SIZE_INCONSISTENT) {
result += "SIZE_INCONSISTENT ";
}
if (type & CACHE_TYPE_INCONSISTENT) {
result += "CACHE_TYPE_INCONSISTENT ";
}
if (type & EXPIRATION_TIME_INCONSISTENT) {
result += "EXPIRATION_TIME_INCONSISTENT ";
}
if (type & TMP_FILE_EXPECT_DOWNLOADING_STATE) {
result += "TMP_FILE_EXPECT_DOWNLOADING_STATE";
}
}
result += "\n";
return result;
}

} // namespace doris::io
40 changes: 40 additions & 0 deletions be/src/io/cache/file_cache_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
// and modified by Doris

#pragma once
#include <cstdint>
#include <vector>

#include "io/io_common.h"
#include "vec/common/uint128.h"

Expand All @@ -38,6 +41,7 @@ enum FileCacheType {
DISPOSABLE = 0,
TTL = 3,
};
std::string file_cache_type_to_string(FileCacheType type);

struct UInt128Wrapper {
uint128_t value_;
Expand Down Expand Up @@ -133,5 +137,41 @@ struct CacheContext {
int64_t expiration_time {0};
bool is_cold_data {false};
};
struct FileCacheInfo {
UInt128Wrapper hash {0};
uint64 expiration_time {0};
uint64_t size {0};
size_t offset {0};
bool is_tmp {false};
FileCacheType cache_type {NORMAL};

std::string to_string() const;
};

class InconsistencyType {
uint32_t type;

public:
enum : uint32_t {
NONE = 0,
NOT_LOADED = 1 << 0,
MISSING_IN_STORAGE = 1 << 1,
SIZE_INCONSISTENT = 1 << 2,
CACHE_TYPE_INCONSISTENT = 1 << 3,
EXPIRATION_TIME_INCONSISTENT = 1 << 4,
TMP_FILE_EXPECT_DOWNLOADING_STATE = 1 << 5
};
InconsistencyType(uint32_t t = 0) : type(t) {}
operator uint32_t&() { return type; }

std::string to_string() const;
};

struct InconsistencyContext {
// The infos in _files of BlockFileCache.
std::vector<FileCacheInfo> infos_in_manager;
std::vector<FileCacheInfo> infos_in_storage;
std::vector<InconsistencyType> types;
};

} // namespace doris::io
7 changes: 7 additions & 0 deletions be/src/io/cache/file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#pragma once

#include <mutex>

#include "common/status.h"
#include "io/cache/file_cache_common.h"
#include "util/slice.h"

Expand Down Expand Up @@ -67,6 +70,10 @@ class FileCacheStorage {
virtual FileCacheStorageType get_type() = 0;
// get local cached file
virtual std::string get_local_file(const FileCacheKey& key) = 0;
virtual Status get_file_cache_infos(std::vector<FileCacheInfo>& infos,
std::lock_guard<std::mutex>& cache_lock) const {
return Status::OK();
};
};

} // namespace doris::io
69 changes: 68 additions & 1 deletion be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

#include "io/cache/fs_file_cache_storage.h"

#include <fmt/core.h>

#include <filesystem>
#include <mutex>
#include <system_error>

#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
#include "io/cache/file_cache_storage.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_reader.h"
Expand Down Expand Up @@ -523,7 +527,7 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
std::error_code ec;
std::filesystem::directory_iterator offset_it(key_it->path(), ec);
if (ec) [[unlikely]] {
LOG(WARNING) << "filesystem error, failed to remove file, file=" << key_it->path()
LOG(WARNING) << "filesystem error, failed to list dir, dir=" << key_it->path()
<< " error=" << ec.message();
continue;
}
Expand Down Expand Up @@ -605,6 +609,69 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile2");
}

Status FSFileCacheStorage::get_file_cache_infos(std::vector<FileCacheInfo>& infos,
std::lock_guard<std::mutex>& cache_lock) const {
std::error_code ec;
std::filesystem::directory_iterator key_prefix_it {_cache_base_path, ec};
if (ec) [[unlikely]] {
LOG(ERROR) << fmt::format("Failed to list dir {}, err={}", _cache_base_path, ec.message());
return Status::InternalError("Failed to list dir {}, err={}", _cache_base_path,
ec.message());
}
// Only supports version 2. For more details, refer to 'USE_CACHE_VERSION2'.
for (; key_prefix_it != std::filesystem::directory_iterator(); ++key_prefix_it) {
if (!key_prefix_it->is_directory()) {
// skip version file
continue;
}
if (key_prefix_it->path().filename().native().size() != KEY_PREFIX_LENGTH) {
LOG(WARNING) << "Unknown directory " << key_prefix_it->path().native();
continue;
}
std::filesystem::directory_iterator key_it {key_prefix_it->path(), ec};
if (ec) [[unlikely]] {
LOG(ERROR) << fmt::format("Failed to list dir {}, err={}",
key_prefix_it->path().filename().native(), ec.message());
return Status::InternalError("Failed to list dir {}, err={}",
key_prefix_it->path().filename().native(), ec.message());
}
for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
auto key_with_suffix = key_it->path().filename().native();
auto delim_pos = key_with_suffix.find('_');
DCHECK(delim_pos != std::string::npos);
std::string key_str = key_with_suffix.substr(0, delim_pos);
std::string expiration_time_str = key_with_suffix.substr(delim_pos + 1);
long expiration_time = std::stoul(expiration_time_str);
auto hash = UInt128Wrapper(vectorized::unhex_uint<uint128_t>(key_str.c_str()));
std::error_code ec;
std::filesystem::directory_iterator offset_it(key_it->path(), ec);
if (ec) [[unlikely]] {
LOG(ERROR) << fmt::format("Failed to list dir {}, err={}",
key_it->path().filename().native(), ec.message());
return Status::InternalError("Failed to list dir {}, err={}",
key_it->path().filename().native(), ec.message());
}
for (; offset_it != std::filesystem::directory_iterator(); ++offset_it) {
size_t size = offset_it->file_size(ec);
if (ec) [[unlikely]] {
LOG(ERROR) << fmt::format("Failed to get file size, file name {}, err={}",
key_it->path().filename().native(), ec.message());
return Status::InternalError("Failed to get file size, file name {}, err={}",
key_it->path().filename().native(), ec.message());
}
size_t offset = 0;
bool is_tmp = false;
FileCacheType cache_type = FileCacheType::NORMAL;
RETURN_IF_ERROR(this->parse_filename_suffix_to_cache_type(
fs, offset_it->path().filename().native(), expiration_time, size, &offset,
&is_tmp, &cache_type));
infos.emplace_back(hash, expiration_time, size, offset, is_tmp, cache_type);
}
}
}
return Status::OK();
}

void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, const FileCacheKey& key,
std::lock_guard<std::mutex>& cache_lock) {
// async load, can't find key, need to check exist.
Expand Down
Loading

0 comments on commit f7c7869

Please sign in to comment.