From 7ac4f36504324e2064a60d6758bad68894891038 Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Wed, 2 Aug 2023 18:15:36 -0500 Subject: [PATCH] Add readdir policies --- README.md | 49 +++-- src/config.cpp | 4 +- src/config.hpp | 6 +- src/fs_devid.hpp | 13 ++ src/fs_dirfd.hpp | 4 +- src/fs_inode.cpp | 13 ++ src/fs_inode.hpp | 6 +- src/fuse_readdir.cpp | 75 ++++--- src/fuse_readdir.hpp | 33 ++- src/fuse_readdir_base.hpp | 34 +++ src/fuse_readdir_cor.cpp | 194 ++++++++++++++++++ ...onfig_readdir.cpp => fuse_readdir_cor.hpp} | 48 ++--- src/fuse_readdir_cosr.cpp | 173 ++++++++++++++++ src/fuse_readdir_cosr.hpp | 39 ++++ src/fuse_readdir_factory.cpp | 75 +++++++ ...g_readdir.hpp => fuse_readdir_factory.hpp} | 18 +- src/fuse_readdir_plus.cpp | 30 +-- ...readdir_posix.cpp => fuse_readdir_seq.cpp} | 43 ++-- ...readdir_posix.hpp => fuse_readdir_seq.hpp} | 19 +- src/unbounded_queue.hpp | 161 +++++++++++++++ src/unbounded_thread_pool.hpp | 124 +++++++++++ 21 files changed, 1010 insertions(+), 151 deletions(-) create mode 100644 src/fuse_readdir_base.hpp create mode 100644 src/fuse_readdir_cor.cpp rename src/{config_readdir.cpp => fuse_readdir_cor.hpp} (57%) create mode 100644 src/fuse_readdir_cosr.cpp create mode 100644 src/fuse_readdir_cosr.hpp create mode 100644 src/fuse_readdir_factory.cpp rename src/{config_readdir.hpp => fuse_readdir_factory.hpp} (74%) rename src/{fuse_readdir_posix.cpp => fuse_readdir_seq.cpp} (79%) rename src/{fuse_readdir_posix.hpp => fuse_readdir_seq.hpp} (77%) create mode 100644 src/unbounded_queue.hpp create mode 100644 src/unbounded_thread_pool.hpp diff --git a/README.md b/README.md index 5aca5ce2e..a8ab7a4e5 100644 --- a/README.md +++ b/README.md @@ -256,6 +256,9 @@ These options are the same regardless of whether you use them with the concatenated together with the longest common prefix removed. * **func.FUNC=POLICY**: Sets the specific FUSE function's policy. See below for the list of value types. Example: **func.getattr=newest** +* **func.readdir=seq|cosr|cor|cosr:INT|cor:INT**: Sets `readdir` + policy. INT value sets the number of threads to use for + concurrency. (default: seq) * **category.action=POLICY**: Sets policy of all FUSE functions in the action category. (default: epall) * **category.create=POLICY**: Sets policy of all FUSE functions in the @@ -682,9 +685,8 @@ rather than file paths, which were created by `open` or `create`. That said many times the current FUSE kernel driver will not always provide the file handle when a client calls `fgetattr`, `fchown`, `fchmod`, `futimens`, `ftruncate`, etc. This means it will call the regular, -path based, versions. `readdir` has no real need for a policy given -the purpose is merely to return a list of entries in a -directory. `statfs`'s behavior can be modified via other options. +path based, versions. `statfs`'s behavior can be modified via other +options. When using policies which are based on a branch's available space the base path provided is used. Not the full path to the file in @@ -715,8 +717,7 @@ In cases where something may be searched for (such as a path to clone) ### Policies A policy is the algorithm used to choose a branch or branches for a -function to work on. Think of them as ways to filter and sort -branches. +function to work on or generally how the function behaves. Any function in the `create` category will clone the relative path if needed. Some other functions (`rename`,`link`,`ioctl`) have special @@ -725,12 +726,11 @@ requirements or behaviors which you can read more about below. #### Filtering -Policies basically search branches and create a list of files / paths +Most policies basically search branches and create a list of files / paths for functions to work on. The policy is responsible for filtering and sorting the branches. Filters include **minfreespace**, whether or not a branch is mounted read-only, and the branch tagging -(RO,NC,RW). These filters are applied across all policies unless -otherwise noted. +(RO,NC,RW). These filters are applied across most policies. * No **search** function policies filter. * All **action** function policies filter out branches which are @@ -823,6 +823,26 @@ policies is not appropriate. | search | ff | +#### func.readdir + +examples: `fuse.readdir=seq`, `fuse.readdir=cor:4` + +`readdir` has policies to control how it manages reading directory +content. + +| Policy | Description | +|--------|-------------| +| seq | "sequential" : Iterate over branches in the order defined. This is the default and traditional behavior found prior to the readdir policy introduction. | +| cosr | "concurrent open, sequential read" : Concurrently open branch directories using a thread pool and process them in order of definition. This keeps memory and CPU usage low while also reducing the time spent waiting on branches to respond. Number of threads defaults to the number of logical cores. Can be overwritten via the syntax `fuse.readdir=cosr:N` where `N` is the number of threads. | +| cor | "concurrent open and read" : Concurrently open branch directories and immediately start reading their contents using a thread pool. This will result in slightly higher memory and CPU usage but reduced latency. Particularly when using higher latency / slower speed network filesystem branches. Unlike `seq` and `cosr` the order of files could change due the async nature of the thread pool. Number of threads defaults to the number of logical cores. Can be overwritten via the syntax `fuse.readdir=cor:N` where `N` is the number of threads. + +Keep in mind that `readdir` mostly just provides a list of file names +in a directory and possibly some basic metadata about said files. To +know details about the files, as one would see from commands like +`find` or `ls`, it is required to call `stat` on the file which is +controlled by `fuse.getattr`. + + #### ioctl When `ioctl` is used with an open file then it will use the file @@ -891,19 +911,6 @@ returned but it will still be possible. **link** uses the same strategy but without the removals. -#### readdir #### - -[readdir](http://linux.die.net/man/3/readdir) is different from all -other filesystem functions. While it could have its own set of -policies to tweak its behavior at this time it provides a simple union -of files and directories found. Remember that any action or -information queried about these files and directories come from the -respective function. For instance: an **ls** is a **readdir** and for -each file/directory returned **getattr** is called. Meaning the policy -of **getattr** is responsible for choosing the file/directory which is -the source of the metadata you see in an **ls**. - - #### statfs / statvfs #### [statvfs](http://linux.die.net/man/2/statvfs) normalizes the source diff --git a/src/config.cpp b/src/config.cpp index 16a5999d8..2cb64238b 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -112,7 +112,7 @@ Config::Config() pid(::getpid()), posix_acl(false), readahead(0), - readdir(ReadDir::ENUM::POSIX), + readdir("seq"), readdirplus(false), rename_exdev(RenameEXDEV::ENUM::PASSTHROUGH), scheduling_priority(-10), @@ -162,6 +162,7 @@ Config::Config() _map["func.mkdir"] = &func.mkdir; _map["func.mknod"] = &func.mknod; _map["func.open"] = &func.open; + _map["func.readdir"] = &readdir; _map["func.readlink"] = &func.readlink; _map["func.removexattr"] = &func.removexattr; _map["func.rename"] = &func.rename; @@ -189,7 +190,6 @@ Config::Config() _map["pin-threads"] = &fuse_pin_threads; _map["posix_acl"] = &posix_acl; _map["readahead"] = &readahead; - // _map["readdir"] = &readdir; _map["readdirplus"] = &readdirplus; _map["rename-exdev"] = &rename_exdev; _map["scheduling-priority"] = &scheduling_priority; diff --git a/src/config.hpp b/src/config.hpp index 387489b70..6f0300eb4 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -25,15 +25,15 @@ #include "config_log_metrics.hpp" #include "config_moveonenospc.hpp" #include "config_nfsopenhack.hpp" -#include "config_readdir.hpp" #include "config_rename_exdev.hpp" +#include "config_set.hpp" #include "config_statfs.hpp" #include "config_statfsignore.hpp" #include "config_xattr.hpp" -#include "config_set.hpp" #include "enum.hpp" #include "errno.hpp" #include "funcs.hpp" +#include "fuse_readdir.hpp" #include "policy.hpp" #include "rwlock.hpp" #include "tofrom_wrapper.hpp" @@ -135,7 +135,7 @@ class Config ConfigUINT64 pid; ConfigBOOL posix_acl; ConfigUINT64 readahead; - ReadDir readdir; + FUSE::ReadDir readdir; ConfigBOOL readdirplus; RenameEXDEV rename_exdev; ConfigINT scheduling_priority; diff --git a/src/fs_devid.hpp b/src/fs_devid.hpp index d75bcf3e5..2ad50954d 100644 --- a/src/fs_devid.hpp +++ b/src/fs_devid.hpp @@ -19,6 +19,7 @@ #pragma once #include "fs_fstat.hpp" +#include "fs_dirfd.hpp" namespace fs @@ -37,4 +38,16 @@ namespace fs return st.st_dev; } + + static + inline + dev_t + devid(DIR *dh_) + { + int dirfd; + + dirfd = fs::dirfd(dh_); + + return fs::devid(dirfd); + } } diff --git a/src/fs_dirfd.hpp b/src/fs_dirfd.hpp index 767d66850..06aca2edf 100644 --- a/src/fs_dirfd.hpp +++ b/src/fs_dirfd.hpp @@ -27,8 +27,8 @@ namespace fs static inline int - dirfd(DIR *dirp_) + dirfd(DIR *dh_) { - return ::dirfd(dirp_); + return ::dirfd(dh_); } } diff --git a/src/fs_inode.cpp b/src/fs_inode.cpp index b0330c776..19098440c 100644 --- a/src/fs_inode.cpp +++ b/src/fs_inode.cpp @@ -208,6 +208,19 @@ namespace fs return g_func(fusepath_,fusepath_len_,mode_,dev_,ino_); } + uint64_t + calc(std::string const &fusepath_, + const mode_t mode_, + const dev_t dev_, + const ino_t ino_) + { + return calc(fusepath_.c_str(), + fusepath_.size(), + mode_, + dev_, + ino_); + } + void calc(const char *fusepath_, const uint64_t fusepath_len_, diff --git a/src/fs_inode.hpp b/src/fs_inode.hpp index baf7d718c..11e772c83 100644 --- a/src/fs_inode.hpp +++ b/src/fs_inode.hpp @@ -37,7 +37,11 @@ namespace fs const uint64_t fusepath_len, const mode_t mode, const dev_t dev, - const ino_t ion); + const ino_t ino); + uint64_t calc(std::string const &fusepath, + mode_t const mode, + dev_t const dev, + ino_t ino); void calc(const char *fusepath, const uint64_t fusepath_len, struct stat *st); diff --git a/src/fuse_readdir.cpp b/src/fuse_readdir.cpp index 2ef0b3e47..47392db53 100644 --- a/src/fuse_readdir.cpp +++ b/src/fuse_readdir.cpp @@ -16,35 +16,64 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -#include "fuse_readdir_posix.hpp" -#include "fuse_readdir_linux.hpp" +#include "fuse_readdir.hpp" +#include "fuse_readdir_factory.hpp" #include "config.hpp" -#include "dirinfo.hpp" -#include "rwlock.hpp" -#include "ugid.hpp" -#include "fuse.h" +int +FUSE::readdir(const fuse_file_info_t *ffi_, + fuse_dirents_t *buf_) +{ + Config::Write cfg; + + return cfg->readdir(ffi_,buf_); +} + +FUSE::ReadDir::ReadDir(std::string const s_) +{ + from_string(s_); + assert(_readdir); +} -namespace FUSE +std::string +FUSE::ReadDir::to_string() const { - int - readdir(const fuse_file_info_t *ffi_, - fuse_dirents_t *buf_) + std::lock_guard lg(_mutex); + + return _type; +} + +int +FUSE::ReadDir::from_string(std::string const &str_) +{ + std::shared_ptr tmp; + + tmp = FUSE::ReadDirFactory::make(str_); + if(!tmp) + return -EINVAL; + + { + std::lock_guard lg(_mutex); + + _type = str_; + _readdir = tmp; + } + + return 0; +} + +int +FUSE::ReadDir::operator()(fuse_file_info_t const *ffi_, + fuse_dirents_t *buf_) +{ + std::shared_ptr readdir; + { - Config::Read cfg; - DirInfo *di = reinterpret_cast(ffi_->fh); - const fuse_context *fc = fuse_get_context(); - const ugid::Set ugid(fc->uid,fc->gid); - - switch(cfg->readdir) - { - case ReadDir::ENUM::LINUX: - return FUSE::readdir_linux(cfg->branches,di->fusepath.c_str(),buf_); - default: - case ReadDir::ENUM::POSIX: - return FUSE::readdir_posix(cfg->branches,di->fusepath.c_str(),buf_); - } + std::lock_guard lg(_mutex); + readdir = _readdir; } + + return (*readdir)(ffi_,buf_); } diff --git a/src/fuse_readdir.hpp b/src/fuse_readdir.hpp index c82937c38..20cc1b30f 100644 --- a/src/fuse_readdir.hpp +++ b/src/fuse_readdir.hpp @@ -17,11 +17,38 @@ #pragma once #include "fuse.h" +#include "tofrom_string.hpp" +#include "fuse_readdir_base.hpp" + +#include +#include + +#include namespace FUSE { - int - readdir(const fuse_file_info_t *ffi, - fuse_dirents_t *buf); + int readdir(fuse_file_info_t const *ffi, + fuse_dirents_t *buf); + + class ReadDir : public ToFromString + { + public: + ReadDir(std::string const s_); + + public: + std::string to_string() const; + int from_string(const std::string &); + + public: + int operator()(fuse_file_info_t const *ffi, + fuse_dirents_t *buf); + + private: + mutable std::mutex _mutex; + + private: + std::string _type; + std::shared_ptr _readdir; + }; } diff --git a/src/fuse_readdir_base.hpp b/src/fuse_readdir_base.hpp new file mode 100644 index 000000000..dbdffba5a --- /dev/null +++ b/src/fuse_readdir_base.hpp @@ -0,0 +1,34 @@ +/* + Copyright (c) 2023, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#pragma once + +#include "fuse.h" + + +namespace FUSE +{ + class ReadDirBase + { + public: + ReadDirBase() {}; + virtual ~ReadDirBase() {}; + + public: + virtual int operator()(fuse_file_info_t const *ffi, + fuse_dirents_t *buf) = 0; + }; +} diff --git a/src/fuse_readdir_cor.cpp b/src/fuse_readdir_cor.cpp new file mode 100644 index 000000000..9036cd4ab --- /dev/null +++ b/src/fuse_readdir_cor.cpp @@ -0,0 +1,194 @@ +/* + Copyright (c) 2023, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#include "fuse_readdir_cor.hpp" + +#include "config.hpp" +#include "dirinfo.hpp" +#include "errno.hpp" +#include "fs_closedir.hpp" +#include "fs_devid.hpp" +#include "fs_inode.hpp" +#include "fs_opendir.hpp" +#include "fs_path.hpp" +#include "fs_readdir.hpp" +#include "hashset.hpp" +#include "ugid.hpp" + +#include "fuse_dirents.h" + + +FUSE::ReadDirCOR::ReadDirCOR(unsigned concurrency_) + : _tp(concurrency_) +{ + +} + +FUSE::ReadDirCOR::~ReadDirCOR() +{ + +} + +namespace l +{ + static + inline + uint64_t + dirent_exact_namelen(const struct dirent *d_) + { +#ifdef _D_EXACT_NAMLEN + return _D_EXACT_NAMLEN(d_); +#elif defined _DIRENT_HAVE_D_NAMLEN + return d_->d_namlen; +#else + return strlen(d_->d_name); +#endif + } + + static + inline + int + readdir(std::string basepath_, + HashSet &names_, + std::mutex &names_mutex_, + fuse_dirents_t *buf_, + std::mutex &dirents_mutex_) + { + int rv; + int err; + DIR *dh; + dev_t dev; + std::string filepath; + + dh = fs::opendir(basepath_); + if(dh == NULL) + return -errno; + + dev = fs::devid(dh); + + rv = 0; + err = 0; + for(struct dirent *de = fs::readdir(dh); de && !rv; de = fs::readdir(dh)) + { + std::uint64_t namelen; + + namelen = l::dirent_exact_namelen(de); + + { + std::lock_guard lk(names_mutex_); + rv = names_.put(de->d_name,namelen); + if(rv == 0) + continue; + } + + filepath = fs::path::make(basepath_,de->d_name); + de->d_ino = fs::inode::calc(filepath, + DTTOIF(de->d_type), + dev, + de->d_ino); + + { + std::lock_guard lk(dirents_mutex_); + rv = fuse_dirents_add(buf_,de,namelen); + if(rv == 0) + continue; + } + + err = -ENOMEM; + } + + fs::closedir(dh); + + return err; + } + + static + std::vector + concurrent_readdir(ThreadPool &tp_, + const Branches::CPtr &branches_, + const char *dirname_, + fuse_dirents_t *buf_) + { + HashSet names; + std::mutex names_mutex; + std::mutex dirents_mutex; + std::vector rv; + std::vector> futures; + + for(auto const &branch : *branches_) + { + auto func = [&]() + { + std::string basepath = fs::path::make(branch.path,dirname_); + + return l::readdir(basepath,names,names_mutex,buf_,dirents_mutex); + }; + + auto rv = tp_.enqueue_task(func); + + futures.emplace_back(std::move(rv)); + } + + for(auto &future : futures) + rv.push_back(future.get()); + + return rv; + } + + static + int + calc_rv(std::vector rvs_) + { + for(auto rv : rvs_) + { + if(rv == 0) + return 0; + } + + if(rvs_.empty()) + return -ENOENT; + + return rvs_[0]; + } + + static + int + readdir(ThreadPool &tp_, + const Branches::CPtr &branches_, + const char *dirname_, + fuse_dirents_t *buf_) + { + std::vector rvs; + + fuse_dirents_reset(buf_); + + rvs = l::concurrent_readdir(tp_,branches_,dirname_,buf_); + + return l::calc_rv(rvs); + } +} + +int +FUSE::ReadDirCOR::operator()(fuse_file_info_t const *ffi_, + fuse_dirents_t *buf_) +{ + Config::Read cfg; + DirInfo *di = reinterpret_cast(ffi_->fh); + const fuse_context *fc = fuse_get_context(); + const ugid::Set ugid(fc->uid,fc->gid); + + return l::readdir(_tp,cfg->branches,di->fusepath.c_str(),buf_); +} diff --git a/src/config_readdir.cpp b/src/fuse_readdir_cor.hpp similarity index 57% rename from src/config_readdir.cpp rename to src/fuse_readdir_cor.hpp index f03b2a777..872352ff3 100644 --- a/src/config_readdir.cpp +++ b/src/fuse_readdir_cor.hpp @@ -1,7 +1,7 @@ /* ISC License - Copyright (c) 2020, Antonio SJ Musumeci + Copyright (c) 2023, Antonio SJ Musumeci Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above @@ -16,36 +16,24 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -#include "config_readdir.hpp" -#include "ef.hpp" -#include "errno.hpp" +#pragma once +#include "fuse_readdir_base.hpp" +#include "unbounded_thread_pool.hpp" -template<> -int -ReadDir::from_string(const std::string &s_) +// concurrent open & read +namespace FUSE { - if(s_ == "posix") - _data = ReadDir::ENUM::POSIX; - ef(s_ == "linux") - _data = ReadDir::ENUM::LINUX; - else - return -EINVAL; - - return 0; -} - -template<> -std::string -ReadDir::to_string(void) const -{ - switch(_data) - { - case ReadDir::ENUM::POSIX: - return "posix"; - case ReadDir::ENUM::LINUX: - return "linux"; - } - - return "invalid"; + class ReadDirCOR final : public FUSE::ReadDirBase + { + public: + ReadDirCOR(unsigned concurrency); + ~ReadDirCOR(); + + int operator()(fuse_file_info_t const *ffi, + fuse_dirents_t *buf); + + private: + ThreadPool _tp; + }; } diff --git a/src/fuse_readdir_cosr.cpp b/src/fuse_readdir_cosr.cpp new file mode 100644 index 000000000..f0750f56a --- /dev/null +++ b/src/fuse_readdir_cosr.cpp @@ -0,0 +1,173 @@ +/* + Copyright (c) 2023, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#include "fuse_readdir_cosr.hpp" + +#include "config.hpp" +#include "dirinfo.hpp" +#include "errno.hpp" +#include "fs_closedir.hpp" +#include "fs_devid.hpp" +#include "fs_dirfd.hpp" +#include "fs_inode.hpp" +#include "fs_opendir.hpp" +#include "fs_path.hpp" +#include "fs_readdir.hpp" +#include "fs_stat.hpp" +#include "hashset.hpp" +#include "ugid.hpp" + +#include "fuse_dirents.h" + + +FUSE::ReadDirCOSR::ReadDirCOSR(unsigned concurrency_) + : _tp(concurrency_) +{ + +} + +FUSE::ReadDirCOSR::~ReadDirCOSR() +{ + +} + +namespace l +{ + static + inline + uint64_t + dirent_exact_namelen(const struct dirent *d_) + { +#ifdef _D_EXACT_NAMLEN + return _D_EXACT_NAMLEN(d_); +#elif defined _DIRENT_HAVE_D_NAMLEN + return d_->d_namlen; +#else + return strlen(d_->d_name); +#endif + } + + static + inline + std::vector> + opendir(ThreadPool &tp_, + const Branches::CPtr &branches_, + char const *dirname_) + { + std::vector> futures; + + for(auto const &branch : *branches_) + { + auto func = [&branch,dirname_]() + { + std::string basepath = fs::path::make(branch.path,dirname_); + + return fs::opendir(basepath); + }; + + auto rv = tp_.enqueue_task(func); + + futures.emplace_back(std::move(rv)); + } + + return futures; + } + + static + inline + int + readdir(std::vector> &dh_futures_, + char const *dirname_, + fuse_dirents_t *buf_) + { + int err; + HashSet names; + std::string fullpath; + + err = 0; + for(auto &dh_future : dh_futures_) + { + int rv; + DIR *dh; + dev_t dev; + + dh = dh_future.get(); + if(dh == NULL) + continue; + + dev = fs::devid(dh); + + rv = 0; + for(struct dirent *de = fs::readdir(dh); de && !rv; de = fs::readdir(dh)) + { + std::uint64_t namelen; + + namelen = l::dirent_exact_namelen(de); + + rv = names.put(de->d_name,namelen); + if(rv == 0) + continue; + + fullpath = fs::path::make(dirname_,de->d_name); + de->d_ino = fs::inode::calc(fullpath, + DTTOIF(de->d_type), + dev, + de->d_ino); + + rv = fuse_dirents_add(buf_,de,namelen); + if(rv == 0) + continue; + + err = -ENOMEM; + } + + fs::closedir(dh); + } + + return err; + } + + static + inline + int + readdir(ThreadPool &tp_, + const Branches::CPtr &branches_, + const char *dirname_, + fuse_dirents_t *buf_) + { + int rv; + std::vector> dh_futures; + + fuse_dirents_reset(buf_); + + dh_futures = l::opendir(tp_,branches_,dirname_); + rv = l::readdir(dh_futures,dirname_,buf_); + + return rv; + } +} + +int +FUSE::ReadDirCOSR::operator()(fuse_file_info_t const *ffi_, + fuse_dirents_t *buf_) +{ + Config::Read cfg; + DirInfo *di = reinterpret_cast(ffi_->fh); + const fuse_context *fc = fuse_get_context(); + const ugid::Set ugid(fc->uid,fc->gid); + + return l::readdir(_tp,cfg->branches,di->fusepath.c_str(),buf_); +} diff --git a/src/fuse_readdir_cosr.hpp b/src/fuse_readdir_cosr.hpp new file mode 100644 index 000000000..caf6c13a6 --- /dev/null +++ b/src/fuse_readdir_cosr.hpp @@ -0,0 +1,39 @@ +/* + ISC License + + Copyright (c) 2023, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#pragma once + +#include "fuse_readdir_base.hpp" +#include "unbounded_thread_pool.hpp" + +// concurrent open, sequential read +namespace FUSE +{ + class ReadDirCOSR final : public FUSE::ReadDirBase + { + public: + ReadDirCOSR(unsigned concurrency); + ~ReadDirCOSR(); + + int operator()(fuse_file_info_t const *ffi, + fuse_dirents_t *buf); + + private: + ThreadPool _tp; + }; +} diff --git a/src/fuse_readdir_factory.cpp b/src/fuse_readdir_factory.cpp new file mode 100644 index 000000000..663188249 --- /dev/null +++ b/src/fuse_readdir_factory.cpp @@ -0,0 +1,75 @@ +/* + ISC License + + Copyright (c) 2023, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#include "fuse_readdir_factory.hpp" + +#include "fuse_readdir_cor.hpp" +#include "fuse_readdir_cosr.hpp" +#include "fuse_readdir_seq.hpp" + +#include +#include +#include +#include + + +namespace l +{ + static + void + read_cfg(std::string const cfgstr_, + std::string &type_, + int &concurrency_) + { + char type[16]; + int concurrency; + + concurrency = 0; + std::sscanf(cfgstr_.c_str(),"%15[a-z]:%u",type,&concurrency); + + if(concurrency == 0) + concurrency = std::thread::hardware_concurrency(); + else if(concurrency < 0) + concurrency = (std::thread::hardware_concurrency() / std::abs(concurrency)); + + if(concurrency == 0) + concurrency = 1; + + type_ = type; + concurrency_ = concurrency; + } +} + +std::shared_ptr +FUSE::ReadDirFactory::make(std::string const cfgstr_) +{ + int concurrency; + std::string type; + + l::read_cfg(cfgstr_,type,concurrency); + assert(concurrency); + + if(type == "seq") + return std::make_shared(); + if(type == "cosr") + return std::make_shared(concurrency); + if(type == "cor") + return std::make_shared(concurrency); + + return {}; +} diff --git a/src/config_readdir.hpp b/src/fuse_readdir_factory.hpp similarity index 74% rename from src/config_readdir.hpp rename to src/fuse_readdir_factory.hpp index 96767f202..c99b9fabc 100644 --- a/src/config_readdir.hpp +++ b/src/fuse_readdir_factory.hpp @@ -1,7 +1,7 @@ /* ISC License - Copyright (c) 2020, Antonio SJ Musumeci + Copyright (c) 2023, Antonio SJ Musumeci Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above @@ -18,13 +18,17 @@ #pragma once -#include "enum.hpp" +#include "fuse_readdir_base.hpp" +#include +#include -enum class ReadDirEnum + +namespace FUSE +{ + class ReadDirFactory { - POSIX, - LINUX + public: + static std::shared_ptr make(std::string const cfgstr); }; - -typedef Enum ReadDir; +} diff --git a/src/fuse_readdir_plus.cpp b/src/fuse_readdir_plus.cpp index e9870fa2b..3b913fba9 100644 --- a/src/fuse_readdir_plus.cpp +++ b/src/fuse_readdir_plus.cpp @@ -16,13 +16,7 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -#include "fuse_readdir_plus_linux.hpp" -#include "fuse_readdir_plus_posix.hpp" - -#include "config.hpp" -#include "dirinfo.hpp" -#include "rwlock.hpp" -#include "ugid.hpp" +#include "errno.hpp" #include "fuse.h" @@ -33,26 +27,6 @@ namespace FUSE readdir_plus(const fuse_file_info_t *ffi_, fuse_dirents_t *buf_) { - Config::Read cfg; - DirInfo *di = reinterpret_cast(ffi_->fh); - const fuse_context *fc = fuse_get_context(); - const ugid::Set ugid(fc->uid,fc->gid); - - switch(cfg->readdir) - { - case ReadDir::ENUM::LINUX: - return FUSE::readdir_plus_linux(cfg->branches, - di->fusepath.c_str(), - cfg->cache_entry, - cfg->cache_attr, - buf_); - default: - case ReadDir::ENUM::POSIX: - return FUSE::readdir_plus_posix(cfg->branches, - di->fusepath.c_str(), - cfg->cache_entry, - cfg->cache_attr, - buf_); - } + return -ENOTSUP; } } diff --git a/src/fuse_readdir_posix.cpp b/src/fuse_readdir_seq.cpp similarity index 79% rename from src/fuse_readdir_posix.cpp rename to src/fuse_readdir_seq.cpp index 0e2000738..67d2f43e9 100644 --- a/src/fuse_readdir_posix.cpp +++ b/src/fuse_readdir_seq.cpp @@ -16,7 +16,11 @@ #define _DEFAULT_SOURCE +#include "fuse_readdir_seq.hpp" + #include "branches.hpp" +#include "config.hpp" +#include "dirinfo.hpp" #include "errno.hpp" #include "fs_closedir.hpp" #include "fs_devid.hpp" @@ -27,6 +31,7 @@ #include "fs_readdir.hpp" #include "fs_stat.hpp" #include "hashset.hpp" +#include "ugid.hpp" #include "fuse.h" #include "fuse_dirents.h" @@ -34,11 +39,6 @@ #include #include -#include - -using std::string; -using std::vector; - namespace l { @@ -61,19 +61,17 @@ namespace l const char *dirname_, fuse_dirents_t *buf_) { - dev_t dev; HashSet names; - string basepath; - string fullpath; - uint64_t namelen; + std::string basepath; + std::string fullpath; fuse_dirents_reset(buf_); for(const auto &branch : *branches_) { int rv; - int dirfd; DIR *dh; + dev_t dev; basepath = fs::path::make(branch.path,dirname_); @@ -81,12 +79,13 @@ namespace l if(!dh) continue; - dirfd = fs::dirfd(dh); - dev = fs::devid(dirfd); + dev = fs::devid(dh); rv = 0; for(struct dirent *de = fs::readdir(dh); de && !rv; de = fs::readdir(dh)) { + std::uint64_t namelen; + namelen = l::dirent_exact_namelen(de); rv = names.put(de->d_name,namelen); @@ -94,8 +93,7 @@ namespace l continue; fullpath = fs::path::make(dirname_,de->d_name); - de->d_ino = fs::inode::calc(fullpath.c_str(), - fullpath.size(), + de->d_ino = fs::inode::calc(fullpath, DTTOIF(de->d_type), dev, de->d_ino); @@ -112,13 +110,14 @@ namespace l } } -namespace FUSE +int +FUSE::ReadDirSeq::operator()(fuse_file_info_t const *ffi_, + fuse_dirents_t *buf_) { - int - readdir_posix(const Branches::CPtr &branches_, - const char *dirname_, - fuse_dirents_t *buf_) - { - return l::readdir(branches_,dirname_,buf_); - } + Config::Read cfg; + DirInfo *di = reinterpret_cast(ffi_->fh); + const fuse_context *fc = fuse_get_context(); + const ugid::Set ugid(fc->uid,fc->gid); + + return l::readdir(cfg->branches,di->fusepath.c_str(),buf_); } diff --git a/src/fuse_readdir_posix.hpp b/src/fuse_readdir_seq.hpp similarity index 77% rename from src/fuse_readdir_posix.hpp rename to src/fuse_readdir_seq.hpp index 36d08dbb3..f89b87794 100644 --- a/src/fuse_readdir_posix.hpp +++ b/src/fuse_readdir_seq.hpp @@ -18,17 +18,18 @@ #pragma once -#include "branches.hpp" - -#include "fuse.h" - -#include +#include "fuse_readdir_base.hpp" namespace FUSE { - int - readdir_posix(const Branches::CPtr &branches, - const char *dirname, - fuse_dirents_t *buf); + class ReadDirSeq final : public FUSE::ReadDirBase + { + public: + ReadDirSeq() {} + ~ReadDirSeq() {} + + int operator()(fuse_file_info_t const *ffi, + fuse_dirents_t *buf); + }; } diff --git a/src/unbounded_queue.hpp b/src/unbounded_queue.hpp new file mode 100644 index 000000000..1e527b78e --- /dev/null +++ b/src/unbounded_queue.hpp @@ -0,0 +1,161 @@ +#pragma once + +#include +#include +#include +#include + + +template +class UnboundedQueue +{ +public: + explicit + UnboundedQueue(bool block_ = true) + : _block(block_) + { + } + + void + push(const T& item_) + { + { + std::lock_guard guard(_queue_lock); + _queue.push(item_); + } + _condition.notify_one(); + } + + void + push(T&& item_) + { + { + std::lock_guard guard(_queue_lock); + _queue.push(std::move(item_)); + } + + _condition.notify_one(); + } + + template + void + emplace(Args&&... args_) + { + { + std::lock_guard guard(_queue_lock); + _queue.emplace(std::forward(args_)...); + } + + _condition.notify_one(); + } + + bool + try_push(const T& item_) + { + { + std::unique_lock lock(_queue_lock, std::try_to_lock); + if(!lock) + return false; + _queue.push(item_); + } + + _condition.notify_one(); + + return true; + } + + bool + try_push(T&& item_) + { + { + std::unique_lock lock(_queue_lock, std::try_to_lock); + if(!lock) + return false; + _queue.push(std::move(item_)); + } + + _condition.notify_one(); + + return true; + } + + //TODO: push multiple T at once + + bool + pop(T& item_) + { + std::unique_lock guard(_queue_lock); + + _condition.wait(guard, [&]() { return !_queue.empty() || !_block; }); + if(_queue.empty()) + return false; + + item_ = std::move(_queue.front()); + _queue.pop(); + + return true; + } + + bool + try_pop(T& item_) + { + std::unique_lock lock(_queue_lock, std::try_to_lock); + if(!lock || _queue.empty()) + return false; + + item_ = std::move(_queue.front()); + _queue.pop(); + + return true; + } + + std::size_t + size() const + { + std::lock_guard guard(_queue_lock); + + return _queue.size(); + } + + bool + empty() const + { + std::lock_guard guard(_queue_lock); + + return _queue.empty(); + } + + void + block() + { + std::lock_guard guard(_queue_lock); + _block = true; + } + + void + unblock() + { + { + std::lock_guard guard(_queue_lock); + _block = false; + } + + _condition.notify_all(); + } + + bool + blocking() const + { + std::lock_guard guard(_queue_lock); + + return _block; + } + +private: + mutable std::mutex _queue_lock; + +private: + bool _block; + std::queue _queue; + std::condition_variable _condition; +}; diff --git a/src/unbounded_thread_pool.hpp b/src/unbounded_thread_pool.hpp new file mode 100644 index 000000000..4905853ce --- /dev/null +++ b/src/unbounded_thread_pool.hpp @@ -0,0 +1,124 @@ +#pragma once + +#include "unbounded_queue.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +class ThreadPool +{ +public: + explicit + ThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency()) + : _queues(thread_count_), + _count(thread_count_) + { + auto worker = [this](std::size_t i) + { + while(true) + { + Proc f; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count].try_pop(f)) + break; + } + + if(!f && !_queues[i].pop(f)) + break; + + f(); + } + }; + + _threads.reserve(thread_count_); + for(std::size_t i = 0; i < thread_count_; ++i) + _threads.emplace_back(worker, i); + } + + ~ThreadPool() + { + for(auto& queue : _queues) + queue.unblock(); + for(auto& thread : _threads) + thread.join(); + } + + template + void + enqueue_work(F&& f_) + { + auto i = _index++; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count].try_push(f_)) + return; + } + + _queues[i % _count].push(std::move(f_)); + } + + template + [[nodiscard]] + std::future::type> + enqueue_task(F&& f_) + { + using TaskReturnType = typename std::result_of::type; + using Promise = std::promise; + + auto i = _index++; + auto promise = std::make_shared(); + auto future = promise->get_future(); + auto work = [=]() { + auto rv = f_(); + promise->set_value(rv); + }; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count].try_push(work)) + return future; + } + + _queues[i % _count].push(std::move(work)); + + return future; + } + +public: + std::vector + threads() + { + std::vector rv; + + for(auto &thread : _threads) + rv.push_back(thread.native_handle()); + + return rv; + } + +private: + using Proc = std::function; + using Queue = UnboundedQueue; + using Queues = std::vector; + Queues _queues; + +private: + std::vector _threads; + +private: + const std::size_t _count; + std::atomic_uint _index; + + static const unsigned int K = 2; +};