From f5389751b7379b8751dc9674096f63279da96e5e Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Sun, 21 Apr 2024 16:47:25 -0400 Subject: [PATCH] core: add label to io_threaded_fallbacks to categorize operations --- src/core/file.cc | 18 +++++------ src/core/reactor.cc | 59 ++++++++++++++++++++++--------------- src/core/reactor_backend.cc | 2 +- src/core/thread_pool.hh | 37 +++++++++++++++++++---- 4 files changed, 77 insertions(+), 39 deletions(-) diff --git a/src/core/file.cc b/src/core/file.cc index a82554efced..62a492d1e0b 100644 --- a/src/core/file.cc +++ b/src/core/file.cc @@ -214,7 +214,7 @@ posix_file_impl::stat() noexcept { struct stat st; auto ret = ::fstat(fd, &st); return wrap_syscall(ret, st); - }).then([] (syscall_result_extra ret) { + }, submit_reason::file_operation).then([] (syscall_result_extra ret) { ret.throw_if_error(); return make_ready_future(ret.extra); }); @@ -224,7 +224,7 @@ future<> posix_file_impl::truncate(uint64_t length) noexcept { return engine()._thread_pool->submit>([this, length] { return wrap_syscall(::ftruncate(_fd, length)); - }).then([] (syscall_result sr) { + }, submit_reason::file_operation).then([] (syscall_result sr) { sr.throw_if_error(); return make_ready_future<>(); }); @@ -234,7 +234,7 @@ future posix_file_impl::ioctl(uint64_t cmd, void* argp) noexcept { return engine()._thread_pool->submit>([this, cmd, argp] () mutable { return wrap_syscall(::ioctl(_fd, cmd, argp)); - }).then([] (syscall_result sr) { + }, submit_reason::file_operation).then([] (syscall_result sr) { sr.throw_if_error(); // Some ioctls require to return a positive integer back. return make_ready_future(sr.result); @@ -255,7 +255,7 @@ future posix_file_impl::fcntl(int op, uintptr_t arg) noexcept { return engine()._thread_pool->submit>([this, op, arg] () mutable { return wrap_syscall(::fcntl(_fd, op, arg)); - }).then([] (syscall_result sr) { + }, submit_reason::file_operation).then([] (syscall_result sr) { sr.throw_if_error(); // Some fcntls require to return a positive integer back. return make_ready_future(sr.result); @@ -277,7 +277,7 @@ posix_file_impl::discard(uint64_t offset, uint64_t length) noexcept { return engine()._thread_pool->submit>([this, offset, length] () mutable { return wrap_syscall(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, offset, length)); - }).then([] (syscall_result sr) { + }, submit_reason::file_operation).then([] (syscall_result sr) { sr.throw_if_error(); return make_ready_future<>(); }); @@ -298,7 +298,7 @@ posix_file_impl::allocate(uint64_t position, uint64_t length) noexcept { supported = false; // Racy, but harmless. At most we issue an extra call or two. } return wrap_syscall(ret); - }).then([] (syscall_result sr) { + }, submit_reason::file_operation).then([] (syscall_result sr) { sr.throw_if_error(); return make_ready_future<>(); }); @@ -340,7 +340,7 @@ posix_file_impl::close() noexcept { try { return engine()._thread_pool->submit>([fd] { return wrap_syscall(::close(fd)); - }); + }, submit_reason::file_operation); } catch (...) { report_exception("Running ::close() in reactor thread, submission failed with exception", std::current_exception()); return make_ready_future>(wrap_syscall(::close(fd))); @@ -362,7 +362,7 @@ blockdev_file_impl::size() noexcept { uint64_t size; int ret = ::ioctl(_fd, BLKGETSIZE64, &size); return wrap_syscall(ret, size); - }).then([] (syscall_result_extra ret) { + }, submit_reason::file_operation).then([] (syscall_result_extra ret) { ret.throw_if_error(); return make_ready_future(ret.extra); }); @@ -662,7 +662,7 @@ blockdev_file_impl::discard(uint64_t offset, uint64_t length) noexcept { return engine()._thread_pool->submit>([this, offset, length] () mutable { uint64_t range[2] { offset, length }; return wrap_syscall(::ioctl(_fd, BLKDISCARD, &range)); - }).then([] (syscall_result sr) { + }, submit_reason::file_operation).then([] (syscall_result sr) { sr.throw_if_error(); return make_ready_future<>(); }); diff --git a/src/core/reactor.cc b/src/core/reactor.cc index aea56cd0148..cb0dff8bba8 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -2013,7 +2013,7 @@ reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_opt } close_fd.cancel(); return wrap_syscall(fd, st); - }).then([&options, name = std::move(name), &open_flags] (syscall_result_extra sr) { + }, submit_reason::file_operation).then([&options, name = std::move(name), &open_flags] (syscall_result_extra sr) { sr.throw_fs_exception_if_error("open failed", name); return make_file_impl(sr.result, options, open_flags, sr.extra); }).then([] (shared_ptr impl) { @@ -2028,7 +2028,7 @@ reactor::remove_file(std::string_view pathname) noexcept { return futurize_invoke([this, pathname] { return _thread_pool->submit>([pathname = sstring(pathname)] { return wrap_syscall(::remove(pathname.c_str())); - }).then([pathname = sstring(pathname)] (syscall_result sr) { + }, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result sr) { sr.throw_fs_exception_if_error("remove failed", pathname); return make_ready_future<>(); }); @@ -2041,7 +2041,8 @@ reactor::rename_file(std::string_view old_pathname, std::string_view new_pathnam return futurize_invoke([this, old_pathname, new_pathname] { return _thread_pool->submit>([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] { return wrap_syscall(::rename(old_pathname.c_str(), new_pathname.c_str())); - }).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result sr) { + }, submit_reason::file_operation + ).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result sr) { sr.throw_fs_exception_if_error("rename failed", old_pathname, new_pathname); return make_ready_future<>(); }); @@ -2054,7 +2055,7 @@ reactor::link_file(std::string_view oldpath, std::string_view newpath) noexcept return futurize_invoke([this, oldpath, newpath] { return _thread_pool->submit>([oldpath = sstring(oldpath), newpath = sstring(newpath)] { return wrap_syscall(::link(oldpath.c_str(), newpath.c_str())); - }).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result sr) { + }, submit_reason::file_operation).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result sr) { sr.throw_fs_exception_if_error("link failed", oldpath, newpath); return make_ready_future<>(); }); @@ -2068,7 +2069,7 @@ reactor::chmod(std::string_view name, file_permissions permissions) noexcept { return futurize_invoke([name, mode, this] { return _thread_pool->submit>([name = sstring(name), mode] { return wrap_syscall(::chmod(name.c_str(), mode)); - }).then([name = sstring(name), mode] (syscall_result sr) { + }, submit_reason::file_operation).then([name = sstring(name), mode] (syscall_result sr) { if (sr.result == -1) { auto reason = format("chmod(0{:o}) failed", mode); sr.throw_fs_exception(reason, fs::path(name)); @@ -2112,7 +2113,7 @@ reactor::file_type(std::string_view name, follow_symlink follow) noexcept { auto stat_syscall = follow ? stat : lstat; auto ret = stat_syscall(name.c_str(), &st); return wrap_syscall(ret, st); - }).then([name = sstring(name)] (syscall_result_extra sr) { + }, submit_reason::file_operation).then([name = sstring(name)] (syscall_result_extra sr) { if (long(sr.result) == -1) { if (sr.error != ENOENT && sr.error != ENOTDIR) { sr.throw_fs_exception_if_error("stat failed", name); @@ -2142,7 +2143,7 @@ future reactor::read_directory(int fd, char* buffer, size_t buffer_size) return _thread_pool->submit>([fd, buffer, buffer_size] () { auto ret = ::syscall(__NR_getdents64, fd, reinterpret_cast(buffer), buffer_size); return wrap_syscall(ret); - }).then([] (syscall_result ret) { + }, submit_reason::file_operation).then([] (syscall_result ret) { ret.throw_if_error(); return make_ready_future(ret.result); }); @@ -2155,7 +2156,7 @@ reactor::inotify_add_watch(int fd, std::string_view path, uint32_t flags) { return _thread_pool->submit>([fd, path = sstring(path), flags] { auto ret = ::inotify_add_watch(fd, path.c_str(), flags); return wrap_syscall(ret); - }).then([] (syscall_result ret) { + }, submit_reason::file_operation).then([] (syscall_result ret) { ret.throw_if_error(); return make_ready_future(ret.result); }); @@ -2257,7 +2258,7 @@ reactor::spawn(std::string_view pathname, return wrap_syscall(::posix_spawn(&child_pid, pathname.c_str(), &actions, &attr, const_cast(argv.data()), const_cast(env.data()))); - }); + }, submit_reason::process_operation); }).finally([&actions, &attr] { posix_spawn_file_actions_destroy(&actions); posix_spawnattr_destroy(&attr); @@ -2295,7 +2296,7 @@ static auto next_waitpid_timeout(std::chrono::milliseconds this_timeout) { future reactor::waitpid(pid_t pid) { return _thread_pool->submit>([pid] { return wrap_syscall(syscall(__NR_pidfd_open, pid, O_NONBLOCK)); - }).then([pid, this] (syscall_result pidfd) { + }, submit_reason::process_operation).then([pid, this] (syscall_result pidfd) { if (pidfd.result == -1) { // pidfd_open() was introduced in linux 5.3, so the pidfd.error could be ENOSYS on // older kernels. But it could be other error like EMFILE or ENFILE. anyway, we @@ -2308,7 +2309,7 @@ future reactor::waitpid(pid_t pid) { &wait_timeout] { return _thread_pool->submit>([pid, &wstatus] { return wrap_syscall(::waitpid(pid, &wstatus, WNOHANG)); - }).then([&wstatus, &wait_timeout] (syscall_result ret) mutable { + }, submit_reason::process_operation).then([&wstatus, &wait_timeout] (syscall_result ret) mutable { if (ret.result == 0) { wait_timeout = next_waitpid_timeout(wait_timeout); return ::seastar::sleep(wait_timeout).then([] { @@ -2328,7 +2329,7 @@ future reactor::waitpid(pid_t pid) { return pidfd.readable().then([pid, &wstatus, this] { return _thread_pool->submit>([pid, &wstatus] { return wrap_syscall(::waitpid(pid, &wstatus, WNOHANG)); - }); + }, submit_reason::process_operation); }).then([&wstatus] (syscall_result ret) { ret.throw_if_error(); assert(ret.result > 0); @@ -2353,7 +2354,7 @@ reactor::file_stat(std::string_view pathname, follow_symlink follow) noexcept { auto stat_syscall = follow ? stat : lstat; auto ret = stat_syscall(pathname.c_str(), &st); return wrap_syscall(ret, st); - }).then([pathname = sstring(pathname)] (syscall_result_extra sr) { + }, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra sr) { sr.throw_fs_exception_if_error("stat failed", pathname); struct stat& st = sr.extra; stat_data sd; @@ -2391,7 +2392,7 @@ reactor::file_accessible(std::string_view pathname, access_flags flags) noexcept auto aflags = std::underlying_type_t(flags); auto ret = ::access(pathname.c_str(), aflags); return wrap_syscall(ret); - }).then([pathname = sstring(pathname), flags] (syscall_result sr) { + }, submit_reason::file_operation).then([pathname = sstring(pathname), flags] (syscall_result sr) { if (sr.result < 0) { if ((sr.error == ENOENT && flags == access_flags::exists) || (sr.error == EACCES && flags != access_flags::exists)) { @@ -2413,7 +2414,7 @@ reactor::file_system_at(std::string_view pathname) noexcept { struct statfs st; auto ret = statfs(pathname.c_str(), &st); return wrap_syscall(ret, st); - }).then([pathname = sstring(pathname)] (syscall_result_extra sr) { + }, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra sr) { static std::unordered_map type_mapper = { { internal::fs_magic::xfs, fs_type::xfs }, { internal::fs_magic::ext2, fs_type::ext2 }, @@ -2440,7 +2441,7 @@ reactor::fstatfs(int fd) noexcept { struct statfs st; auto ret = ::fstatfs(fd, &st); return wrap_syscall(ret, st); - }).then([] (syscall_result_extra sr) { + }, submit_reason::file_operation).then([] (syscall_result_extra sr) { sr.throw_if_error(); struct statfs st = sr.extra; return make_ready_future(std::move(st)); @@ -2455,7 +2456,7 @@ reactor::statvfs(std::string_view pathname) noexcept { struct statvfs st; auto ret = ::statvfs(pathname.c_str(), &st); return wrap_syscall(ret, st); - }).then([pathname = sstring(pathname)] (syscall_result_extra sr) { + }, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra sr) { sr.throw_fs_exception_if_error("statvfs failed", pathname); struct statvfs st = sr.extra; return make_ready_future(std::move(st)); @@ -2479,7 +2480,7 @@ reactor::open_directory(std::string_view name) noexcept { } } return wrap_syscall(fd, st); - }).then([name = sstring(name), oflags] (syscall_result_extra sr) { + }, submit_reason::file_operation).then([name = sstring(name), oflags] (syscall_result_extra sr) { sr.throw_fs_exception_if_error("open failed", name); return make_file_impl(sr.result, file_open_options(), oflags, sr.extra); }).then([] (shared_ptr file_impl) { @@ -2495,7 +2496,7 @@ reactor::make_directory(std::string_view name, file_permissions permissions) noe return _thread_pool->submit>([name = sstring(name), permissions] { auto mode = static_cast(permissions); return wrap_syscall(::mkdir(name.c_str(), mode)); - }).then([name = sstring(name)] (syscall_result sr) { + }, submit_reason::file_operation).then([name = sstring(name)] (syscall_result sr) { sr.throw_fs_exception_if_error("mkdir failed", name); }); }); @@ -2508,7 +2509,7 @@ reactor::touch_directory(std::string_view name, file_permissions permissions) no return _thread_pool->submit>([name = sstring(name), permissions] { auto mode = static_cast(permissions); return wrap_syscall(::mkdir(name.c_str(), mode)); - }).then([name = sstring(name)] (syscall_result sr) { + }, submit_reason::file_operation).then([name = sstring(name)] (syscall_result sr) { if (sr.result == -1 && sr.error != EEXIST) { sr.throw_fs_exception("mkdir failed", fs::path(name)); } @@ -2553,7 +2554,7 @@ reactor::fdatasync(int fd) noexcept { } return _thread_pool->submit>([fd] { return wrap_syscall(::fdatasync(fd)); - }).then([] (syscall_result sr) { + }, submit_reason::file_operation).then([] (syscall_result sr) { sr.throw_if_error(); return make_ready_future<>(); }); @@ -2700,6 +2701,12 @@ void reactor::register_metrics() { namespace sm = seastar::metrics; + auto io_fallback_counter = [this](const sstring& reason_str, submit_reason r) { + static auto reason_label = sm::label("reason"); + return sm::make_counter("io_threaded_fallbacks", std::bind(&thread_pool::count, _thread_pool.get(), r), + sm::description("Total number of io-threaded-fallbacks operations"), { reason_label(reason_str), }); + }; + _metric_groups.add_group("reactor", { sm::make_gauge("tasks_pending", std::bind(&reactor::pending_task_count, this), sm::description("Number of pending tasks in the queue")), // total_operations value:DERIVE:0:U @@ -2725,9 +2732,13 @@ void reactor::register_metrics() { // total_operations value:DERIVE:0:U sm::make_counter("fsyncs", _fsyncs, sm::description("Total number of fsync operations")), // total_operations value:DERIVE:0:U - sm::make_counter("io_threaded_fallbacks", std::bind(&thread_pool::operation_count, _thread_pool.get()), - sm::description("Total number of io-threaded-fallbacks operations")), - + io_fallback_counter("aio_fallback", submit_reason::aio_fallback), + // total_operations value:DERIVE:0:U + io_fallback_counter("file_operation", submit_reason::file_operation), + // total_operations value:DERIVE:0:U + io_fallback_counter("process_operation", submit_reason::process_operation), + // total_operations value:DERIVE:0:U + io_fallback_counter("unknown", submit_reason::unknown), }); _metric_groups.add_group("memory", { diff --git a/src/core/reactor_backend.cc b/src/core/reactor_backend.cc index 5ceb273edae..d9c09313b40 100644 --- a/src/core/reactor_backend.cc +++ b/src/core/reactor_backend.cc @@ -260,7 +260,7 @@ void aio_storage_context::schedule_retry() { return _r._thread_pool->submit>([this] () mutable { auto r = io_submit(_io_context, _aio_retries.size(), _aio_retries.data()); return wrap_syscall(r); - }).then_wrapped([this] (future> f) { + }, submit_reason::aio_fallback).then_wrapped([this] (future> f) { // If submit failed, just log the error and exit the loop. // The next call to submit_work will call schedule_retry again. if (f.failed()) { diff --git a/src/core/thread_pool.hh b/src/core/thread_pool.hh index 51820922d00..fa858930f52 100644 --- a/src/core/thread_pool.hh +++ b/src/core/thread_pool.hh @@ -27,9 +27,36 @@ namespace seastar { class reactor; +// Reasons for why a function had to be submitted to the thread_pool +enum class submit_reason : size_t { + // Used for aio operations what would block in `io_submit`. + aio_fallback, + // Used for file operations that don't have non-blocking alternatives. + file_operation, + // Used for process operations that don't have non-blocking alternatives. + process_operation, + // Used by default when a caller doesn't specify a submission reason. + unknown, +}; + +class submit_metrics { + uint64_t _counters[static_cast(submit_reason::unknown) + 1]{}; + +public: + void record_reason(submit_reason reason) { + reason = std::min(reason, submit_reason::unknown); + ++_counters[static_cast(reason)]; + } + + uint64_t count_for(submit_reason reason) const { + reason = std::min(reason, submit_reason::unknown); + return _counters[static_cast(reason)]; + } +}; + class thread_pool { reactor& _reactor; - uint64_t _aio_threaded_fallbacks = 0; + submit_metrics metrics; #ifndef HAVE_OSV syscall_work_queue inter_thread_wq; posix_thread _worker_thread; @@ -39,11 +66,11 @@ public: explicit thread_pool(reactor& r, sstring thread_name); ~thread_pool(); template - future submit(Func func) noexcept { - ++_aio_threaded_fallbacks; + future submit(Func func, submit_reason reason = submit_reason::unknown) noexcept { + metrics.record_reason(reason); return inter_thread_wq.submit(std::move(func)); } - uint64_t operation_count() const { return _aio_threaded_fallbacks; } + uint64_t count(submit_reason r) const { return metrics.count_for(r); } unsigned complete() { return inter_thread_wq.complete(); } // Before we enter interrupt mode, we must make sure that the syscall thread will properly @@ -61,7 +88,7 @@ public: #else public: template - future submit(Func func) { std::cerr << "thread_pool not yet implemented on osv\n"; abort(); } + future submit(Func func, submit_reason reason = submit_reason::unknown) { std::cerr << "thread_pool not yet implemented on osv\n"; abort(); } #endif private: void work(sstring thread_name);