From 983dc6fb9e87f055eb6610a04cf5e5c80c45b969 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Tue, 21 Mar 2023 14:48:33 -0700 Subject: [PATCH] Avoid blocking in Transaction::close() when there's a cancelled async write when possible We can't cancel the wait for the write lock if another process currently holds it, but if the DB for the current Transaction holds it and we're just waiting in the write queue then we can safely close the Transaction immediately and simply skip over it when it would get its turn to write. --- CHANGELOG.md | 2 +- src/realm/db.cpp | 178 +++++++++++++-------- src/realm/db.hpp | 6 +- src/realm/transaction.cpp | 10 +- src/realm/transaction.hpp | 1 - test/object-store/realm.cpp | 30 +++- test/object-store/sync/sync_test_utils.cpp | 23 --- test/object-store/sync/sync_test_utils.hpp | 8 - test/object-store/util/test_utils.cpp | 27 +++- test/object-store/util/test_utils.hpp | 12 +- 10 files changed, 177 insertions(+), 120 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 81cd4d14a06..8b234bf1cb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) * Changing the log level on the fly would not affect the core level log output ([#6440](https://github.com/realm/realm-core/issues/6440), since 13.7.0) * `SyncManager::immediately_run_file_actions()` no longer ignores the result of trying to remove a realm. This could have resulted in a client reset action being reported as successful when it actually failed on windows if the `Realm` was still open ([#6050](https://github.com/realm/realm-core/issues/6050)). - +* Fix a deadlock when closing a Transaction with a cancelled asynchronous write scheduled while another Transaction holds the write lock ([PR #6413](https://github.com/realm/realm-core/pull/6413), since v11.10.0). ### Breaking changes * None. diff --git a/src/realm/db.cpp b/src/realm/db.cpp index 445baa40955..e9437876915 100644 --- a/src/realm/db.cpp +++ b/src/realm/db.cpp @@ -1668,32 +1668,6 @@ void DB::release_all_read_locks() noexcept REALM_ASSERT(m_transaction_count == 0); } -// Note: close() and close_internal() may be called from the DB::~DB(). -// in that case, they will not throw. Throwing can only happen if called -// directly. -void DB::close(bool allow_open_read_transactions) -{ - // make helper thread terminate - m_commit_helper.reset(); - - if (m_fake_read_lock_if_immutable) { - if (!is_attached()) - return; - { - CheckedLockGuard local_lock(m_mutex); - if (!allow_open_read_transactions && m_transaction_count) - throw WrongTransactionState("Closing with open read transactions"); - } - if (m_alloc.is_attached()) - m_alloc.detach(); - m_fake_read_lock_if_immutable.reset(); - } - else { - close_internal(std::unique_lock(m_controlmutex, std::defer_lock), - allow_open_read_transactions); - } -} - void DB::close_internal(std::unique_lock lock, bool allow_open_read_transactions) { if (!is_attached()) @@ -1766,18 +1740,23 @@ bool DB::other_writers_waiting_for_lock() const uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed); uint32_t next_served = info->next_served.load(std::memory_order_relaxed); - // When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and + // When holding the write lock, next_ticket = next_served + 1, hence, if the difference between 'next_ticket' and // 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock. return next_ticket > next_served + 1; } -class DB::AsyncCommitHelper { +class DB::AsyncCommitHelper : public std::enable_shared_from_this { public: AsyncCommitHelper(DB* db) : m_db(db) { } ~AsyncCommitHelper() + { + close(); + } + + void close() { { std::unique_lock lg(m_mutex); @@ -1787,10 +1766,16 @@ class DB::AsyncCommitHelper { m_running = false; m_cv_worker.notify_one(); } - m_thread.join(); + + // The last reference to the DB may have been in a callback being invoked + // by the worker thread. If so, we'll be called from within the work + // loop, and have to just unwind rather than joining. + if (m_thread.get_id() != std::this_thread::get_id()) { + m_thread.join(); + } } - void begin_write(util::UniqueFunction fn) + void begin_write(util::UniqueFunction fn) { std::unique_lock lg(m_mutex); start_thread(); @@ -1903,7 +1888,7 @@ class DB::AsyncCommitHelper { std::mutex m_mutex; std::condition_variable m_cv_worker; std::condition_variable m_cv_callers; - std::deque> m_pending_writes; + std::deque> m_pending_writes; util::UniqueFunction m_pending_sync; size_t m_write_lock_claim_ticket = 0; size_t m_write_lock_claim_fulfilled = 0; @@ -1921,7 +1906,9 @@ class DB::AsyncCommitHelper { return; } m_running = true; - m_thread = std::thread([this]() { + // Capture a strong reference to ourself for as long as the thread is + // running as the thread can outlive the parent DB. + m_thread = std::thread([this, self = shared_from_this()]() { main(); }); } @@ -1980,7 +1967,9 @@ void DB::AsyncCommitHelper::main() m_has_write_mutex = true; m_owns_write_mutex = true; - // Synchronous transaction requests get priority over async + // Synchronous transaction requests get priority over async, so + // let the next pending synchronous transaction have the lock + // if there are any if (m_write_lock_claim_fulfilled < m_write_lock_claim_ticket) { ++m_write_lock_claim_fulfilled; m_cv_callers.notify_all(); @@ -1988,13 +1977,25 @@ void DB::AsyncCommitHelper::main() } REALM_ASSERT(!m_pending_writes.empty()); - auto callback = std::move(m_pending_writes.front()); - m_pending_writes.pop_front(); - lg.unlock(); - callback(); - // Release things captured by the callback before reacquiring the lock - callback = nullptr; - lg.lock(); + bool did_use_lock = false; + while (!m_pending_writes.empty()) { + auto callback = std::move(m_pending_writes.front()); + m_pending_writes.pop_front(); + lg.unlock(); + did_use_lock = callback(); + // Release things captured by the callback before reacquiring the lock + callback = nullptr; + lg.lock(); + if (did_use_lock || !m_running) { + break; + } + } + + if (!did_use_lock) { + // All of the queued writes were cancelled, so just go ahead + // and release the write lock + m_pending_mx_release = true; + } continue; } } @@ -2006,7 +2007,7 @@ void DB::AsyncCommitHelper::main() } -void DB::async_begin_write(util::UniqueFunction fn) +void DB::async_begin_write(util::UniqueFunction fn) { REALM_ASSERT(m_commit_helper); m_commit_helper->begin_write(std::move(fn)); @@ -2024,6 +2025,36 @@ void DB::async_sync_to_disk(util::UniqueFunction fn) m_commit_helper->sync_to_disk(std::move(fn)); } +// Note: close() and close_internal() may be called from the DB::~DB(). +// in that case, they will not throw. Throwing can only happen if called +// directly. +void DB::close(bool allow_open_read_transactions) +{ + // make helper thread terminate + if (m_commit_helper) { + m_commit_helper->close(); + m_commit_helper.reset(); + } + + if (m_fake_read_lock_if_immutable) { + if (!is_attached()) + return; + { + CheckedLockGuard local_lock(m_mutex); + if (!allow_open_read_transactions && m_transaction_count) + throw WrongTransactionState("Closing with open read transactions"); + } + if (m_alloc.is_attached()) + m_alloc.detach(); + m_fake_read_lock_if_immutable.reset(); + } + else { + close_internal(std::unique_lock(m_controlmutex, std::defer_lock), + allow_open_read_transactions); + } +} + + bool DB::has_changed(TransactionRef& tr) { if (m_fake_read_lock_if_immutable) @@ -2604,41 +2635,52 @@ TransactionRef DB::start_write(bool nonblocking) return tr; } -void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction&& when_acquired) +void DB::async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction&& when_acquired) { + REALM_ASSERT(tr && tr->db.get() == this); + std::chrono::steady_clock::time_point request_start; { util::CheckedLockGuard lck(tr->m_async_mutex); REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle); tr->m_async_stage = Transaction::AsyncState::Requesting; - tr->m_request_time_point = std::chrono::steady_clock::now(); - if (tr->db->m_logger) { - tr->db->m_logger->log(util::Logger::Level::trace, "Async request write lock"); + if (m_logger) { + request_start = std::chrono::steady_clock::now(); + m_logger->log(util::Logger::Level::trace, "Async request write lock"); } } std::weak_ptr weak_tr = tr; - async_begin_write([weak_tr, cb = std::move(when_acquired)]() { - if (auto tr = weak_tr.lock()) { - util::CheckedLockGuard lck(tr->m_async_mutex); - // If a synchronous transaction happened while we were pending - // we may be in HasCommits - if (tr->m_async_stage == Transaction::AsyncState::Requesting) { - tr->m_async_stage = Transaction::AsyncState::HasLock; - } - if (tr->db->m_logger) { - auto t2 = std::chrono::steady_clock::now(); - tr->db->m_logger->log( - util::Logger::Level::trace, "Got write lock in %1 us", - std::chrono::duration_cast(t2 - tr->m_request_time_point).count()); - } - if (tr->m_waiting_for_write_lock) { - tr->m_waiting_for_write_lock = false; - tr->m_async_cv.notify_one(); - } - else if (cb) { - cb(); - } - tr.reset(); // Release pointer while lock is held + async_begin_write([this, weak_tr, cb = std::move(when_acquired), request_start]() { + // Check if the Transaction is still valid and still wants the write + // lock. If may no longer be if it was closed while waiting for the lock, + // in which case we'll either move onto the next waiting Transaction or + // just release the lock + auto tr = weak_tr.lock(); + if (!tr) { + return false; } + util::CheckedLockGuard lck(tr->m_async_mutex); + if (tr->m_async_stage == Transaction::AsyncState::Idle) { + return false; + } + + // If a synchronous transaction happened while we were pending + // we may be in HasCommits + if (tr->m_async_stage == Transaction::AsyncState::Requesting) { + tr->m_async_stage = Transaction::AsyncState::HasLock; + } + if (m_logger) { + auto t2 = std::chrono::steady_clock::now(); + m_logger->log(util::Logger::Level::trace, "Got write lock in %1 us", + std::chrono::duration_cast(t2 - request_start).count()); + } + if (tr->m_waiting_for_write_lock) { + tr->m_waiting_for_write_lock = false; + tr->m_async_cv.notify_one(); + } + else if (cb) { + cb(); + } + return true; }); } @@ -2646,7 +2688,7 @@ inline DB::DB(const DBOptions& options) : m_upgrade_callback(std::move(options.upgrade_callback)) { if (options.enable_async_writes) { - m_commit_helper = std::make_unique(this); + m_commit_helper = std::make_shared(this); } } diff --git a/src/realm/db.hpp b/src/realm/db.hpp index b2323a59fde..26d9c609683 100644 --- a/src/realm/db.hpp +++ b/src/realm/db.hpp @@ -259,7 +259,7 @@ class DB : public std::enable_shared_from_this { // ask for write mutex. Callback takes place when mutex has been acquired. // callback may occur on ANOTHER THREAD. Must not be called if write mutex // has already been acquired. - void async_request_write_mutex(TransactionRef& tr, util::UniqueFunction&& when_acquired); + void async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction&& when_acquired); // report statistics of last commit done on THIS DB. // The free space reported is what can be expected to be freed @@ -496,7 +496,7 @@ class DB : public std::enable_shared_from_this { util::InterprocessCondVar m_pick_next_writer; std::function m_upgrade_callback; std::shared_ptr m_metrics; - std::unique_ptr m_commit_helper; + std::shared_ptr m_commit_helper; std::shared_ptr m_logger; bool m_is_sync_agent = false; @@ -608,7 +608,7 @@ class DB : public std::enable_shared_from_this { void close_internal(std::unique_lock, bool allow_open_read_transactions) REQUIRES(!m_mutex); - void async_begin_write(util::UniqueFunction fn); + void async_begin_write(util::UniqueFunction fn); void async_end_write(); void async_sync_to_disk(util::UniqueFunction fn); diff --git a/src/realm/transaction.cpp b/src/realm/transaction.cpp index 76a1004512f..af633c2f247 100644 --- a/src/realm/transaction.cpp +++ b/src/realm/transaction.cpp @@ -765,15 +765,7 @@ void Transaction::prepare_for_close() break; case AsyncState::Requesting: - // We don't have the ability to cancel a wait on the write lock, so - // unfortunately we have to wait for it to be acquired. - REALM_ASSERT(m_transact_stage == DB::transact_Reading); - REALM_ASSERT(!m_oldest_version_not_persisted); - m_waiting_for_write_lock = true; - m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) { - return !m_waiting_for_write_lock; - }); - db->end_write_on_correct_thread(); + m_async_stage = AsyncState::Idle; break; case AsyncState::HasLock: diff --git a/src/realm/transaction.hpp b/src/realm/transaction.hpp index b3cd39e2c04..b515b937c55 100644 --- a/src/realm/transaction.hpp +++ b/src/realm/transaction.hpp @@ -204,7 +204,6 @@ class Transaction : public Group { util::CheckedMutex m_async_mutex; std::condition_variable m_async_cv GUARDED_BY(m_async_mutex); AsyncState m_async_stage GUARDED_BY(m_async_mutex) = AsyncState::Idle; - std::chrono::steady_clock::time_point m_request_time_point; bool m_waiting_for_write_lock GUARDED_BY(m_async_mutex) = false; bool m_waiting_for_sync GUARDED_BY(m_async_mutex) = false; diff --git a/test/object-store/realm.cpp b/test/object-store/realm.cpp index 1b00f7938bc..765b3e8ace1 100644 --- a/test/object-store/realm.cpp +++ b/test/object-store/realm.cpp @@ -1596,7 +1596,7 @@ TEST_CASE("SharedRealm: async writes") { for (int i = 0; i < 2; ++i) { SECTION(close_function_names[i]) { bool persisted = false; - SECTION("before write lock is acquired") { + SECTION("while another DB instance holds lock") { DBOptions options; options.encryption_key = config.encryption_key.data(); // Acquire the write lock with a different DB instance so that we'll @@ -1627,9 +1627,12 @@ TEST_CASE("SharedRealm: async writes") { std::invoke(close_functions[i], *realm); { - // Verify that we released the write lock + // We may not have released the write lock yet when close() + // returns, but it should happen promptly. auto db = DB::create(make_in_realm_history(), config.path, options); - REQUIRE(db->start_write(/* nonblocking */ true)); + timed_wait_for([&] { + return db->start_write(/* nonblocking */ true) != nullptr; + }); } // Verify that the transaction callback never got enqueued @@ -1638,6 +1641,27 @@ TEST_CASE("SharedRealm: async writes") { }); wait_for_done(); } + SECTION("while another Realm instance holds lock") { + auto realm2 = Realm::get_shared_realm(config); + realm2->begin_transaction(); + + auto scheduler = realm->scheduler(); + realm->async_begin_transaction([&] { + // We should never get here as the realm is closed + FAIL(); + }); + + // Doesn't have to wait for the write lock because the DB + // instance already holds it and we were just waiting for our + // turn with it rather than waiting to acquire it + std::invoke(close_functions[i], *realm); + + // Verify that the transaction callback never got enqueued + scheduler->invoke([&] { + done = true; + }); + wait_for_done(); + } SECTION("before async_begin_transaction() callback") { auto scheduler = realm->scheduler(); realm->async_begin_transaction([&] { diff --git a/test/object-store/sync/sync_test_utils.cpp b/test/object-store/sync/sync_test_utils.cpp index dece34092f4..a5300bfd9ce 100644 --- a/test/object-store/sync/sync_test_utils.cpp +++ b/test/object-store/sync/sync_test_utils.cpp @@ -87,29 +87,6 @@ bool ReturnsTrueWithinTimeLimit::match(util::FunctionRef condition) cons return predicate_returned_true; } -void timed_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms) -{ - const auto wait_start = std::chrono::steady_clock::now(); - util::EventLoop::main().run_until([&] { - if (std::chrono::steady_clock::now() - wait_start > max_ms) { - throw std::runtime_error(util::format("timed_wait_for exceeded %1 ms", max_ms.count())); - } - return condition(); - }); -} - -void timed_sleeping_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms, - std::chrono::milliseconds sleep_ms) -{ - const auto wait_start = std::chrono::steady_clock::now(); - while (!condition()) { - if (std::chrono::steady_clock::now() - wait_start > max_ms) { - throw std::runtime_error(util::format("timed_sleeping_wait_for exceeded %1 ms", max_ms.count())); - } - std::this_thread::sleep_for(sleep_ms); - } -} - auto do_hash = [](const std::string& name) -> std::string { std::array hash; util::sha256(name.data(), name.size(), hash.data()); diff --git a/test/object-store/sync/sync_test_utils.hpp b/test/object-store/sync/sync_test_utils.hpp index 4e2a318c2a3..fc7e7ca61ab 100644 --- a/test/object-store/sync/sync_test_utils.hpp +++ b/test/object-store/sync/sync_test_utils.hpp @@ -28,7 +28,6 @@ #include #include -#include "util/event_loop.hpp" #include "util/test_file.hpp" #include "util/test_utils.hpp" @@ -47,13 +46,6 @@ bool results_contains_user(SyncUserMetadataResults& results, const std::string& const std::string& auth_server); bool results_contains_original_name(SyncFileActionMetadataResults& results, const std::string& original_name); -void timed_wait_for(util::FunctionRef condition, - std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000)); - -void timed_sleeping_wait_for(util::FunctionRef condition, - std::chrono::milliseconds max_ms = std::chrono::seconds(30), - std::chrono::milliseconds sleep_ms = std::chrono::milliseconds(1)); - class ReturnsTrueWithinTimeLimit : public Catch::Matchers::MatcherGenericBase { public: ReturnsTrueWithinTimeLimit(std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000)) diff --git a/test/object-store/util/test_utils.cpp b/test/object-store/util/test_utils.cpp index 4cd18d5769a..3ebb747188d 100644 --- a/test/object-store/util/test_utils.cpp +++ b/test/object-store/util/test_utils.cpp @@ -16,13 +16,13 @@ // //////////////////////////////////////////////////////////////////////////// -#include #include "test_utils.hpp" +#include +#include #include #include #include -#include #include @@ -230,4 +230,27 @@ void chmod(const std::string& path, int permissions) #endif } +void timed_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms) +{ + const auto wait_start = std::chrono::steady_clock::now(); + util::EventLoop::main().run_until([&] { + if (std::chrono::steady_clock::now() - wait_start > max_ms) { + throw std::runtime_error(util::format("timed_wait_for exceeded %1 ms", max_ms.count())); + } + return condition(); + }); +} + +void timed_sleeping_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms, + std::chrono::milliseconds sleep_ms) +{ + const auto wait_start = std::chrono::steady_clock::now(); + while (!condition()) { + if (std::chrono::steady_clock::now() - wait_start > max_ms) { + throw std::runtime_error(util::format("timed_sleeping_wait_for exceeded %1 ms", max_ms.count())); + } + std::this_thread::sleep_for(sleep_ms); + } +} + } // namespace realm diff --git a/test/object-store/util/test_utils.hpp b/test/object-store/util/test_utils.hpp index 38f3e8f1ef3..973e03abcc5 100644 --- a/test/object-store/util/test_utils.hpp +++ b/test/object-store/util/test_utils.hpp @@ -19,11 +19,13 @@ #ifndef REALM_TEST_UTILS_HPP #define REALM_TEST_UTILS_HPP -#include -#include +#include "util/event_loop.hpp" + #include #include +#include +#include #include #include namespace fs = std::filesystem; @@ -145,6 +147,12 @@ bool chmod_supported(const std::string& path); int get_permissions(const std::string& path); void chmod(const std::string& path, int permissions); +void timed_wait_for(util::FunctionRef condition, + std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000)); + +void timed_sleeping_wait_for(util::FunctionRef condition, + std::chrono::milliseconds max_ms = std::chrono::seconds(30), + std::chrono::milliseconds sleep_ms = std::chrono::milliseconds(1)); } // namespace realm #define REQUIRE_DIR_EXISTS(macro_path) \