Skip to content

cli: reuse env KVEngine in dump-* #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions src/client/cli/admin/DumpDirEntries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ namespace {
auto getParser() {
argparse::ArgumentParser parser("dump-dentries");
parser.add_argument("-n", "--num-dentries-perfile").default_value(uint32_t{10'000'000}).scan<'u', uint32_t>();
parser.add_argument("-f", "--fdb-cluster-file").default_value(std::string{"./fdb.cluster"});
parser.add_argument("-d", "--dentry-dir").default_value(std::string{"dentries"});
parser.add_argument("-t", "--threads").default_value(uint32_t(4)).scan<'u', uint32_t>();
return parser;
Expand All @@ -37,14 +36,13 @@ CoTryTask<Dispatcher::OutputTable> dumpDirEntries(IEnv &ienv,
auto &env = dynamic_cast<AdminEnv &>(ienv);
ENSURE_USAGE(args.empty());
ENSURE_USAGE(env.mgmtdClientGetter);
ENSURE_USAGE(env.kvEngineGetter);

const auto &fdbClusterFile = parser.get<std::string>("fdb-cluster-file");
const auto &numEntriesPerFile = parser.get<uint32_t>("num-dentries-perfile");
const auto &dentryDir = parser.get<std::string>("dentry-dir");
const auto threads = parser.get<uint32_t>("threads");

ENSURE_USAGE(threads > 0);
ENSURE_USAGE(!fdbClusterFile.empty());
ENSURE_USAGE(!dentryDir.empty());
ENSURE_USAGE(numEntriesPerFile > 0);

Expand All @@ -55,13 +53,13 @@ CoTryTask<Dispatcher::OutputTable> dumpDirEntries(IEnv &ienv,

Dispatcher::OutputTable table;
auto dumpRes =
co_await dumpDirEntriesFromFdb(fdbClusterFile, numEntriesPerFile, dentryDir, std::max(uint32_t(1), threads));
co_await dumpDirEntriesFromFdb(env.kvEngineGetter(), numEntriesPerFile, dentryDir, std::max(uint32_t(1), threads));
if (!dumpRes) co_return makeError(dumpRes.error());
co_return table;
}
} // namespace

CoTryTask<Void> dumpDirEntriesFromFdb(const std::string fdbClusterFile,
CoTryTask<Void> dumpDirEntriesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine,
const uint32_t numEntriesPerDir,
const std::string dentryDir,
const uint32_t threads) {
Expand All @@ -77,10 +75,9 @@ CoTryTask<Void> dumpDirEntriesFromFdb(const std::string fdbClusterFile,
XLOGF(CRITICAL, "Saving directory entries to directory: {}", dentryDir);

meta::server::MetaScan::Options options;
options.fdb_cluster_file = fdbClusterFile;
options.threads = 8;
options.coroutines = 32;
auto scan = std::make_unique<meta::server::MetaScan>(options);
auto scan = std::make_unique<meta::server::MetaScan>(options, kvEngine);
auto exec = std::make_unique<folly::CPUThreadPoolExecutor>(16);

time_t timestamp = UtcClock::secondsSinceEpoch();
Expand Down
3 changes: 2 additions & 1 deletion src/client/cli/admin/DumpDirEntries.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstdint>
#include <vector>

#include "common/kv/IKVEngine.h"
#include "common/serde/Serde.h"
#include "fbs/meta/Schema.h"
namespace hf3fs::client::cli {
Expand All @@ -21,7 +22,7 @@ struct DirEntryTable {
};
static_assert(serde::Serializable<DirEntryTable>);

CoTryTask<Void> dumpDirEntriesFromFdb(const std::string fdbClusterFile,
CoTryTask<Void> dumpDirEntriesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine,
const uint32_t numEntriesPerDir,
const std::string dentryDir,
const uint32_t threads);
Expand Down
11 changes: 4 additions & 7 deletions src/client/cli/admin/DumpInodes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ namespace {
auto getParser() {
argparse::ArgumentParser parser("dump-inodes");
parser.add_argument("-n", "--num-inodes-perfile").default_value(uint32_t{10'000'000}).scan<'u', uint32_t>();
parser.add_argument("-f", "--fdb-cluster-file").default_value(std::string{"./fdb.cluster"});
parser.add_argument("-i", "--inode-dir").default_value(std::string{"inodes"});
parser.add_argument("-q", "--parquet-format").default_value(false).implicit_value(true);
parser.add_argument("-a", "--all-inodes").default_value(false).implicit_value(true);
Expand All @@ -44,24 +43,23 @@ CoTryTask<Dispatcher::OutputTable> dumpInodes(IEnv &ienv,
auto &env = dynamic_cast<AdminEnv &>(ienv);
ENSURE_USAGE(args.empty());
ENSURE_USAGE(env.mgmtdClientGetter);
ENSURE_USAGE(env.kvEngineGetter);

const auto &fdbClusterFile = parser.get<std::string>("fdb-cluster-file");
const auto &numInodesPerFile = parser.get<uint32_t>("num-inodes-perfile");
const auto &inodeDir = parser.get<std::string>("inode-dir");
const auto &parquetFormat = parser.get<bool>("parquet-format");
const auto &allInodes = parser.get<bool>("all-inodes");
const auto &threads = parser.get<uint32_t>("threads");

ENSURE_USAGE(threads > 0);
ENSURE_USAGE(!fdbClusterFile.empty());

if (boost::filesystem::exists(inodeDir)) {
XLOGF(CRITICAL, "Output directory for inodes already exists: {}", inodeDir);
co_return makeError(StatusCode::kInvalidArg);
}

Dispatcher::OutputTable table;
auto dumpRes = co_await dumpInodesFromFdb(fdbClusterFile,
auto dumpRes = co_await dumpInodesFromFdb(env.kvEngineGetter(),
numInodesPerFile,
inodeDir,
parquetFormat,
Expand All @@ -73,7 +71,7 @@ CoTryTask<Dispatcher::OutputTable> dumpInodes(IEnv &ienv,

} // namespace

CoTryTask<Void> dumpInodesFromFdb(const std::string fdbClusterFile,
CoTryTask<Void> dumpInodesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine,
const uint32_t numInodesPerFile,
const std::string inodeDir,
const bool parquetFormat,
Expand All @@ -90,10 +88,9 @@ CoTryTask<Void> dumpInodesFromFdb(const std::string fdbClusterFile,
XLOGF(CRITICAL, "Saving inodes to directory: {}", inodeDir);

meta::server::MetaScan::Options options;
options.fdb_cluster_file = fdbClusterFile;
options.threads = 8;
options.coroutines = 32;
auto scan = std::make_unique<meta::server::MetaScan>(options);
auto scan = std::make_unique<meta::server::MetaScan>(options, kvEngine);
auto exec = std::make_unique<folly::CPUThreadPoolExecutor>(16);

time_t timestamp = UtcClock::secondsSinceEpoch();
Expand Down
3 changes: 2 additions & 1 deletion src/client/cli/admin/DumpInodes.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/kv/IKVEngine.h"
#include "common/serde/Serde.h"
#include "common/utils/Coroutine.h"
#include "fbs/meta/Schema.h"
Expand All @@ -25,7 +26,7 @@ static_assert(serde::Serializable<InodeTable>);

std::vector<Path> listFilesFromPath(const Path path);

CoTryTask<Void> dumpInodesFromFdb(const std::string fdbClusterFile,
CoTryTask<Void> dumpInodesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine,
const uint32_t numInodesPerFile,
const std::string inodeDir,
const bool parquetFormat = false,
Expand Down
5 changes: 2 additions & 3 deletions src/client/cli/admin/RemoveChunks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace {
auto getParser() {
argparse::ArgumentParser parser("remove-chunks");
parser.add_argument("-n", "--num-inodes-perfile").default_value(uint32_t{10'000'000}).scan<'u', uint32_t>();
parser.add_argument("-f", "--fdb-cluster-file").default_value(std::string{"./fdb.cluster"});
parser.add_argument("-i", "--inode-dir").default_value(std::string{"inodes2"});
parser.add_argument("-o", "--orphaned-path").default_value(std::string{"orphaned"});
parser.add_argument("-r", "--do-remove").default_value(false).implicit_value(true);
Expand All @@ -30,11 +29,11 @@ CoTryTask<Dispatcher::OutputTable> removeChunks(IEnv &ienv,
auto &env = dynamic_cast<AdminEnv &>(ienv);
ENSURE_USAGE(args.empty());
ENSURE_USAGE(env.mgmtdClientGetter);
ENSURE_USAGE(env.kvEngineGetter);
Dispatcher::OutputTable table;

// dump latest inodes

const auto &fdbClusterFile = parser.get<std::string>("fdb-cluster-file");
const auto &numInodesPerFile = parser.get<uint32_t>("num-inodes-perfile");
const auto &inodeDir = parser.get<std::string>("inode-dir");
const auto &parquetFormat = parser.get<bool>("parquet-format");
Expand All @@ -44,7 +43,7 @@ CoTryTask<Dispatcher::OutputTable> removeChunks(IEnv &ienv,
co_return makeError(StatusCode::kInvalidArg);
}

auto dumpRes = co_await dumpInodesFromFdb(fdbClusterFile, numInodesPerFile, inodeDir, parquetFormat);
auto dumpRes = co_await dumpInodesFromFdb(env.kvEngineGetter(), numInodesPerFile, inodeDir, parquetFormat);
if (!dumpRes) co_return makeError(dumpRes.error());

// load the inode dump
Expand Down
24 changes: 2 additions & 22 deletions src/meta/event/Scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,13 @@ MetaScan::MetaScan(Options options, std::shared_ptr<kv::IKVEngine> kvEngine)
if (options_.threads < 0 || options_.coroutines < 0) {
throw std::runtime_error("Invalid options, thread < 0 or coroutines < 0");
}
if (!kvEngine && options_.fdb_cluster_file.empty()) {
throw std::runtime_error("Should set kvEngine or fdb cluster file");
if (!kvEngine) {
throw std::runtime_error("kvEngine is required");
}
if (!options_.logging.empty()) {
XLOGF(INFO, "Setup log: {}", options_.logging);
logging::initOrDie(options_.logging);
}

createKVEngine();
}

MetaScan::~MetaScan() {
Expand All @@ -98,24 +96,6 @@ MetaScan::~MetaScan() {
scanDirEntryTask_->cancel.requestCancellation();
}
exec_.stop();
if (fdbNetwork_) {
kv::fdb::DB::stopNetwork();
fdbNetwork_->join();
}
}

void MetaScan::createKVEngine() {
if (kvEngine_) {
return;
}

kv::fdb::DB::selectAPIVersion(FDB_API_VERSION);
auto error = kv::fdb::DB::setupNetwork();
if (error) {
throw std::runtime_error(fmt::format("Failed to setup fdb network, error {}", kv::fdb::DB::errorMsg(error)));
}
fdbNetwork_ = std::jthread([&]() { kv::fdb::DB::runNetwork(); });
kvEngine_ = std::make_shared<kv::FDBKVEngine>(kv::fdb::DB(options_.fdb_cluster_file, true /* readonly */));
}

std::vector<Inode> MetaScan::getInodes() {
Expand Down
6 changes: 1 addition & 5 deletions src/meta/event/Scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ class MetaScan {
double backoff_total_wait = 60; // 60s
// log level
std::string logging;
// create FDB client with given config path
std::string fdb_cluster_file;
};

MetaScan(Options options,
std::shared_ptr<kv::IKVEngine> kvEngine = {} /* create new fdb client if kvEngine is not set */);
MetaScan(Options options, std::shared_ptr<kv::IKVEngine> kvEngine);
~MetaScan();

std::vector<Inode> getInodes();
Expand Down Expand Up @@ -105,7 +102,6 @@ class MetaScan {

std::mutex mutex_;
Options options_;
std::optional<std::jthread> fdbNetwork_;
std::shared_ptr<kv::IKVEngine> kvEngine_;
folly::CPUThreadPoolExecutor exec_;
std::optional<BackgroundTask<Inode>> scanInodeTask_;
Expand Down
108 changes: 0 additions & 108 deletions tests/meta/event/TestScan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,112 +162,4 @@ TYPED_TEST(TestScan, Exit) {
ASSERT_FALSE(entries.empty());
}

DEFINE_string(fdb_cluster, "", "fdb cluster file path");
DEFINE_uint32(scan_threads, 4, "scan threads");
DEFINE_uint32(scan_coroutines, 8, "scan coroutines");
DEFINE_bool(scan_all, false, "scan all inodes and direntries");
DEFINE_bool(scan_check, false, "check parent exists");

TEST(TestScanFDB, DISABLED_FDB) {
MetaScan::Options options;
options.fdb_cluster_file = FLAGS_fdb_cluster;
options.threads = FLAGS_scan_threads;
options.coroutines = FLAGS_scan_coroutines;
MetaScan scan(options);

std::set<InodeId> directories;
std::set<InodeId> missing;

auto begin = SteadyClock::now();
size_t totalInodes = 0;
while (true) {
auto inodes = scan.getInodes();
totalInodes += inodes.size();
if (inodes.empty() || !FLAGS_scan_all) {
break;
}
if (FLAGS_scan_check) {
for (auto &inode : inodes) {
if (inode.isDirectory()) {
directories.insert(inode.id);
}
}
}
XLOGF(DBG, "MetaScan get {} inodes", inodes.size());
}
XLOGF(INFO,
"MetaScan scan Inode finished, duration {}, get {} inodes\n",
std::chrono::duration_cast<std::chrono::milliseconds>(SteadyClock::now() - begin),
totalInodes);

begin = SteadyClock::now();
size_t totalEntries = 0;
while (true) {
auto entries = scan.getDirEntries();
totalEntries += entries.size();
if (entries.empty() || !FLAGS_scan_all) {
break;
}
if (FLAGS_scan_check) {
for (auto &entry : entries) {
if (!directories.contains(entry.parent)) {
missing.insert(entry.parent);
}
}
}
XLOGF(DBG, "MetaScan get {} entries", entries.size());
}
XLOGF(INFO,
"MetaScan scan DirEntry finished, duration {}, get {} entries",
std::chrono::duration_cast<std::chrono::milliseconds>(SteadyClock::now() - begin),
totalEntries);
XLOGF_IF(WARNING, !missing.empty(), "Missing parent: {}", fmt::join(missing.begin(), missing.end(), ", "));
}

DEFINE_int64(inode_id, 0, "inode id");

TEST(TestScanFDB, DISABLED_PrintInode) {
MetaScan::Options options;
options.fdb_cluster_file = FLAGS_fdb_cluster;
options.threads = FLAGS_scan_threads;
options.coroutines = FLAGS_scan_coroutines;
MetaScan scan(options);
auto &kv = scan.kvEngine();

folly::coro::blockingWait([&]() -> CoTask<void> {
auto txn = kv.createReadonlyTransaction();
auto inodeId = InodeId(FLAGS_inode_id);
auto inode = co_await Inode::snapshotLoad(*txn, inodeId);
CO_ASSERT_OK(inode);
if (inode->has_value()) {
fmt::print("inode: {}\n", **inode);
} else {
fmt::print("inode {} doesn't exist!", inodeId);
}
fmt::print("============\n");

std::string prev;
while (true) {
auto txn = kv.createReadonlyTransaction();
auto entries = co_await DirEntryList::snapshotLoad(*txn, inodeId, prev, -1);
CO_ASSERT_OK(entries);
for (auto entry : entries->entries) {
auto inode = co_await Inode::snapshotLoad(*kv.createReadWriteTransaction(), entry.id);
CO_ASSERT_OK(inode);
fmt::print("entry: {} inode: {}\n", entry, inode->has_value() ? fmt::format("{}", **inode) : "");
prev = entry.name;
}
if (!entries->more) break;
}
fmt::print("============\n");

if (inode->has_value() && inode->value().isDirectory()) {
auto txn = kv.createReadonlyTransaction();
auto entry = co_await inode->value().snapshotLoadDirEntry(*txn);
CO_ASSERT_OK(entry);
fmt::print("entry points to {}: {}\n", inodeId, *entry);
}
}());
}

} // namespace hf3fs::meta::server