diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a49eb15392..aa618064037 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * Changing the log level on the fly would not affect the core level log output ([#6440](https://github.com/realm/realm-core/issues/6440), since 13.7.0) * `SyncManager::immediately_run_file_actions()` no longer ignores the result of trying to remove a realm. This could have resulted in a client reset action being reported as successful when it actually failed on windows if the `Realm` was still open ([#6050](https://github.com/realm/realm-core/issues/6050)). * Fix a data race in `DB::VersionManager`. If one thread committed a write transaction which increased the number of live versions above the previous highest seen during the current session at the same time as another thread began a read, the reading thread could read from a no-longer-valid memory mapping ([PR #6411](https://github.com/realm/realm-core/pull/6411), since v13.0.0). +* Fix a deadlock when closing a Transaction with a cancelled asynchronous write scheduled while another Transaction holds the write lock ([PR #6413](https://github.com/realm/realm-core/pull/6413), since v11.10.0). ### Breaking changes * None. diff --git a/src/realm/db.cpp b/src/realm/db.cpp index 70240c0d4c6..c5b695185ec 100644 --- a/src/realm/db.cpp +++ b/src/realm/db.cpp @@ -1750,32 +1750,6 @@ void DB::release_all_read_locks() noexcept REALM_ASSERT(m_transaction_count == 0); } -// Note: close() and close_internal() may be called from the DB::~DB(). -// in that case, they will not throw. Throwing can only happen if called -// directly. -void DB::close(bool allow_open_read_transactions) -{ - // make helper thread(s) terminate - m_commit_helper.reset(); - - if (m_fake_read_lock_if_immutable) { - if (!is_attached()) - return; - { - CheckedLockGuard local_lock(m_mutex); - if (!allow_open_read_transactions && m_transaction_count) - throw WrongTransactionState("Closing with open read transactions"); - } - if (m_alloc.is_attached()) - m_alloc.detach(); - m_fake_read_lock_if_immutable.reset(); - } - else { - close_internal(std::unique_lock(m_controlmutex, std::defer_lock), - allow_open_read_transactions); - } -} - void DB::close_internal(std::unique_lock lock, bool allow_open_read_transactions) { if (!is_attached()) @@ -1848,18 +1822,23 @@ bool DB::other_writers_waiting_for_lock() const uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed); uint32_t next_served = info->next_served.load(std::memory_order_relaxed); - // When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and + // When holding the write lock, next_ticket = next_served + 1, hence, if the difference between 'next_ticket' and // 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock. return next_ticket > next_served + 1; } -class DB::AsyncCommitHelper { +class DB::AsyncCommitHelper : public std::enable_shared_from_this { public: AsyncCommitHelper(DB* db) : m_db(db) { } ~AsyncCommitHelper() + { + close(); + } + + void close() { { std::unique_lock lg(m_mutex); @@ -1869,10 +1848,19 @@ class DB::AsyncCommitHelper { m_running = false; m_cv_worker.notify_one(); } - m_thread.join(); + + // The last reference to the DB may have been in a callback being invoked + // by the worker thread. If so, we'll be called from within the work + // loop, and have to just unwind rather than joining. + if (m_thread.get_id() != std::this_thread::get_id()) { + m_thread.join(); + } + else { + m_thread.detach(); + } } - void begin_write(util::UniqueFunction fn) + void begin_write(util::UniqueFunction fn) { std::unique_lock lg(m_mutex); start_thread(); @@ -1985,7 +1973,7 @@ class DB::AsyncCommitHelper { std::mutex m_mutex; std::condition_variable m_cv_worker; std::condition_variable m_cv_callers; - std::deque> m_pending_writes; + std::deque> m_pending_writes; util::UniqueFunction m_pending_sync; size_t m_write_lock_claim_ticket = 0; size_t m_write_lock_claim_fulfilled = 0; @@ -2003,7 +1991,9 @@ class DB::AsyncCommitHelper { return; } m_running = true; - m_thread = std::thread([this]() { + // Capture a strong reference to ourself for as long as the thread is + // running as the thread can outlive the parent DB. + m_thread = std::thread([this, self = shared_from_this()]() { main(); }); } @@ -2016,6 +2006,11 @@ class DB::AsyncCommitHelper { void DB::AsyncCommitHelper::main() { + // While we own the write mutex we hold a strong reference onto the DB + // because DB doesn't allow closing mid-write. Note that this is + // intentionally declared before the lock as we have to release this while + // not holding the lock to avoid deadlocks. + DBRef dbref; std::unique_lock lg(m_mutex); while (m_running) { #if 0 // Enable for testing purposes @@ -2041,6 +2036,7 @@ void DB::AsyncCommitHelper::main() m_owns_write_mutex = false; lg.unlock(); + dbref.reset(); m_cv_callers.notify_all(); lg.lock(); continue; @@ -2055,6 +2051,7 @@ void DB::AsyncCommitHelper::main() // deadlock if they ask us to perform the sync. if (!m_waiting_for_write_mutex && has_pending_write_requests()) { lg.unlock(); + dbref = m_db->shared_from_this(); m_db->do_begin_write(); lg.lock(); @@ -2062,7 +2059,9 @@ void DB::AsyncCommitHelper::main() m_has_write_mutex = true; m_owns_write_mutex = true; - // Synchronous transaction requests get priority over async + // Synchronous transaction requests get priority over async, so + // let the next pending synchronous transaction have the lock + // if there are any if (m_write_lock_claim_fulfilled < m_write_lock_claim_ticket) { ++m_write_lock_claim_fulfilled; m_cv_callers.notify_all(); @@ -2070,25 +2069,38 @@ void DB::AsyncCommitHelper::main() } REALM_ASSERT(!m_pending_writes.empty()); - auto callback = std::move(m_pending_writes.front()); - m_pending_writes.pop_front(); - lg.unlock(); - callback(); - // Release things captured by the callback before reacquiring the lock - callback = nullptr; - lg.lock(); + bool did_use_lock = false; + while (!m_pending_writes.empty()) { + auto callback = std::move(m_pending_writes.front()); + m_pending_writes.pop_front(); + lg.unlock(); + did_use_lock = callback(); + // Release things captured by the callback before reacquiring the lock + callback = nullptr; + lg.lock(); + if (did_use_lock) { + break; + } + } + + if (!did_use_lock) { + // All of the queued writes were cancelled, so just go ahead + // and release the write lock + m_pending_mx_release = true; + } continue; } } m_cv_worker.wait(lg); } if (m_has_write_mutex && m_owns_write_mutex) { + REALM_ASSERT(dbref); m_db->do_end_write(); } } -void DB::async_begin_write(util::UniqueFunction fn) +void DB::async_begin_write(util::UniqueFunction fn) { REALM_ASSERT(m_commit_helper); m_commit_helper->begin_write(std::move(fn)); @@ -2106,6 +2118,36 @@ void DB::async_sync_to_disk(util::UniqueFunction fn) m_commit_helper->sync_to_disk(std::move(fn)); } +// Note: close() and close_internal() may be called from the DB::~DB(). +// in that case, they will not throw. Throwing can only happen if called +// directly. +void DB::close(bool allow_open_read_transactions) +{ + // make helper thread terminate + if (m_commit_helper) { + m_commit_helper->close(); + m_commit_helper.reset(); + } + + if (m_fake_read_lock_if_immutable) { + if (!is_attached()) + return; + { + CheckedLockGuard local_lock(m_mutex); + if (!allow_open_read_transactions && m_transaction_count) + throw WrongTransactionState("Closing with open read transactions"); + } + if (m_alloc.is_attached()) + m_alloc.detach(); + m_fake_read_lock_if_immutable.reset(); + } + else { + close_internal(std::unique_lock(m_controlmutex, std::defer_lock), + allow_open_read_transactions); + } +} + + bool DB::has_changed(TransactionRef& tr) { if (m_fake_read_lock_if_immutable) @@ -2687,41 +2729,52 @@ TransactionRef DB::start_write(bool nonblocking) return tr; } -void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction&& when_acquired) +void DB::async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction&& when_acquired) { + REALM_ASSERT(tr && tr->db.get() == this); + std::chrono::steady_clock::time_point request_start; { util::CheckedLockGuard lck(tr->m_async_mutex); REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle); tr->m_async_stage = Transaction::AsyncState::Requesting; - tr->m_request_time_point = std::chrono::steady_clock::now(); - if (tr->db->m_logger) { - tr->db->m_logger->log(util::Logger::Level::trace, "Async request write lock"); + if (m_logger) { + request_start = std::chrono::steady_clock::now(); + m_logger->log(util::Logger::Level::trace, "Async request write lock"); } } std::weak_ptr weak_tr = tr; - async_begin_write([weak_tr, cb = std::move(when_acquired)]() { - if (auto tr = weak_tr.lock()) { - util::CheckedLockGuard lck(tr->m_async_mutex); - // If a synchronous transaction happened while we were pending - // we may be in HasCommits - if (tr->m_async_stage == Transaction::AsyncState::Requesting) { - tr->m_async_stage = Transaction::AsyncState::HasLock; - } - if (tr->db->m_logger) { - auto t2 = std::chrono::steady_clock::now(); - tr->db->m_logger->log( - util::Logger::Level::trace, "Got write lock in %1 us", - std::chrono::duration_cast(t2 - tr->m_request_time_point).count()); - } - if (tr->m_waiting_for_write_lock) { - tr->m_waiting_for_write_lock = false; - tr->m_async_cv.notify_one(); - } - else if (cb) { - cb(); - } - tr.reset(); // Release pointer while lock is held + async_begin_write([this, weak_tr, cb = std::move(when_acquired), request_start]() { + // Check if the Transaction is still valid and still wants the write + // lock. If may no longer be if it was closed while waiting for the lock, + // in which case we'll either move onto the next waiting Transaction or + // just release the lock + auto tr = weak_tr.lock(); + if (!tr) { + return false; } + util::CheckedLockGuard lck(tr->m_async_mutex); + if (tr->m_async_stage == Transaction::AsyncState::Idle) { + return false; + } + + // If a synchronous transaction happened while we were pending + // we may be in HasCommits + if (tr->m_async_stage == Transaction::AsyncState::Requesting) { + tr->m_async_stage = Transaction::AsyncState::HasLock; + } + if (m_logger) { + auto t2 = std::chrono::steady_clock::now(); + m_logger->log(util::Logger::Level::trace, "Got write lock in %1 us", + std::chrono::duration_cast(t2 - request_start).count()); + } + if (tr->m_waiting_for_write_lock) { + tr->m_waiting_for_write_lock = false; + tr->m_async_cv.notify_one(); + } + else if (cb) { + cb(); + } + return true; }); } @@ -2729,7 +2782,7 @@ inline DB::DB(const DBOptions& options) : m_upgrade_callback(std::move(options.upgrade_callback)) { if (options.enable_async_writes) { - m_commit_helper = std::make_unique(this); + m_commit_helper = std::make_shared(this); } } diff --git a/src/realm/db.hpp b/src/realm/db.hpp index 03b06b011d4..ff780139749 100644 --- a/src/realm/db.hpp +++ b/src/realm/db.hpp @@ -261,7 +261,7 @@ class DB : public std::enable_shared_from_this { // ask for write mutex. Callback takes place when mutex has been acquired. // callback may occur on ANOTHER THREAD. Must not be called if write mutex // has already been acquired. - void async_request_write_mutex(TransactionRef& tr, util::UniqueFunction&& when_acquired); + void async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction&& when_acquired); // report statistics of last commit done on THIS DB. // The free space reported is what can be expected to be freed @@ -500,7 +500,7 @@ class DB : public std::enable_shared_from_this { util::InterprocessCondVar m_pick_next_writer; std::function m_upgrade_callback; std::shared_ptr m_metrics; - std::unique_ptr m_commit_helper; + std::shared_ptr m_commit_helper; std::shared_ptr m_logger; bool m_is_sync_agent = false; @@ -612,7 +612,7 @@ class DB : public std::enable_shared_from_this { void close_internal(std::unique_lock, bool allow_open_read_transactions) REQUIRES(!m_mutex); - void async_begin_write(util::UniqueFunction fn); + void async_begin_write(util::UniqueFunction fn); void async_end_write(); void async_sync_to_disk(util::UniqueFunction fn); diff --git a/src/realm/transaction.cpp b/src/realm/transaction.cpp index d2de3c0adf0..dd2c36cc59f 100644 --- a/src/realm/transaction.cpp +++ b/src/realm/transaction.cpp @@ -766,15 +766,7 @@ void Transaction::prepare_for_close() break; case AsyncState::Requesting: - // We don't have the ability to cancel a wait on the write lock, so - // unfortunately we have to wait for it to be acquired. - REALM_ASSERT(m_transact_stage == DB::transact_Reading); - REALM_ASSERT(!m_oldest_version_not_persisted); - m_waiting_for_write_lock = true; - m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) { - return !m_waiting_for_write_lock; - }); - db->end_write_on_correct_thread(); + m_async_stage = AsyncState::Idle; break; case AsyncState::HasLock: diff --git a/src/realm/transaction.hpp b/src/realm/transaction.hpp index 7af3064aa73..3573f8a9a62 100644 --- a/src/realm/transaction.hpp +++ b/src/realm/transaction.hpp @@ -204,7 +204,6 @@ class Transaction : public Group { util::CheckedMutex m_async_mutex; std::condition_variable m_async_cv GUARDED_BY(m_async_mutex); AsyncState m_async_stage GUARDED_BY(m_async_mutex) = AsyncState::Idle; - std::chrono::steady_clock::time_point m_request_time_point; bool m_waiting_for_write_lock GUARDED_BY(m_async_mutex) = false; bool m_waiting_for_sync GUARDED_BY(m_async_mutex) = false; diff --git a/test/object-store/realm.cpp b/test/object-store/realm.cpp index 1b00f7938bc..adaef30a760 100644 --- a/test/object-store/realm.cpp +++ b/test/object-store/realm.cpp @@ -1596,7 +1596,7 @@ TEST_CASE("SharedRealm: async writes") { for (int i = 0; i < 2; ++i) { SECTION(close_function_names[i]) { bool persisted = false; - SECTION("before write lock is acquired") { + SECTION("while another DB instance holds lock") { DBOptions options; options.encryption_key = config.encryption_key.data(); // Acquire the write lock with a different DB instance so that we'll @@ -1608,9 +1608,9 @@ TEST_CASE("SharedRealm: async writes") { sema.add_stone(); // Wait until the main thread is waiting for the lock. - while (!db->other_writers_waiting_for_lock()) { - millisleep(1); - } + timed_sleeping_wait_for([&] { + return db->other_writers_waiting_for_lock(); + }); write->close(); }); @@ -1627,9 +1627,12 @@ TEST_CASE("SharedRealm: async writes") { std::invoke(close_functions[i], *realm); { - // Verify that we released the write lock + // We may not have released the write lock yet when close() + // returns, but it should happen promptly. auto db = DB::create(make_in_realm_history(), config.path, options); - REQUIRE(db->start_write(/* nonblocking */ true)); + timed_wait_for([&] { + return db->start_write(/* nonblocking */ true) != nullptr; + }); } // Verify that the transaction callback never got enqueued @@ -1638,6 +1641,27 @@ TEST_CASE("SharedRealm: async writes") { }); wait_for_done(); } + SECTION("while another Realm instance holds lock") { + auto realm2 = Realm::get_shared_realm(config); + realm2->begin_transaction(); + + auto scheduler = realm->scheduler(); + realm->async_begin_transaction([&] { + // We should never get here as the realm is closed + FAIL(); + }); + + // Doesn't have to wait for the write lock because the DB + // instance already holds it and we were just waiting for our + // turn with it rather than waiting to acquire it + std::invoke(close_functions[i], *realm); + + // Verify that the transaction callback never got enqueued + scheduler->invoke([&] { + done = true; + }); + wait_for_done(); + } SECTION("before async_begin_transaction() callback") { auto scheduler = realm->scheduler(); realm->async_begin_transaction([&] { diff --git a/test/object-store/sync/sync_test_utils.cpp b/test/object-store/sync/sync_test_utils.cpp index dece34092f4..a5300bfd9ce 100644 --- a/test/object-store/sync/sync_test_utils.cpp +++ b/test/object-store/sync/sync_test_utils.cpp @@ -87,29 +87,6 @@ bool ReturnsTrueWithinTimeLimit::match(util::FunctionRef condition) cons return predicate_returned_true; } -void timed_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms) -{ - const auto wait_start = std::chrono::steady_clock::now(); - util::EventLoop::main().run_until([&] { - if (std::chrono::steady_clock::now() - wait_start > max_ms) { - throw std::runtime_error(util::format("timed_wait_for exceeded %1 ms", max_ms.count())); - } - return condition(); - }); -} - -void timed_sleeping_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms, - std::chrono::milliseconds sleep_ms) -{ - const auto wait_start = std::chrono::steady_clock::now(); - while (!condition()) { - if (std::chrono::steady_clock::now() - wait_start > max_ms) { - throw std::runtime_error(util::format("timed_sleeping_wait_for exceeded %1 ms", max_ms.count())); - } - std::this_thread::sleep_for(sleep_ms); - } -} - auto do_hash = [](const std::string& name) -> std::string { std::array hash; util::sha256(name.data(), name.size(), hash.data()); diff --git a/test/object-store/sync/sync_test_utils.hpp b/test/object-store/sync/sync_test_utils.hpp index 4e2a318c2a3..fc7e7ca61ab 100644 --- a/test/object-store/sync/sync_test_utils.hpp +++ b/test/object-store/sync/sync_test_utils.hpp @@ -28,7 +28,6 @@ #include #include -#include "util/event_loop.hpp" #include "util/test_file.hpp" #include "util/test_utils.hpp" @@ -47,13 +46,6 @@ bool results_contains_user(SyncUserMetadataResults& results, const std::string& const std::string& auth_server); bool results_contains_original_name(SyncFileActionMetadataResults& results, const std::string& original_name); -void timed_wait_for(util::FunctionRef condition, - std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000)); - -void timed_sleeping_wait_for(util::FunctionRef condition, - std::chrono::milliseconds max_ms = std::chrono::seconds(30), - std::chrono::milliseconds sleep_ms = std::chrono::milliseconds(1)); - class ReturnsTrueWithinTimeLimit : public Catch::Matchers::MatcherGenericBase { public: ReturnsTrueWithinTimeLimit(std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000)) diff --git a/test/object-store/util/test_utils.cpp b/test/object-store/util/test_utils.cpp index 4cd18d5769a..3ebb747188d 100644 --- a/test/object-store/util/test_utils.cpp +++ b/test/object-store/util/test_utils.cpp @@ -16,13 +16,13 @@ // //////////////////////////////////////////////////////////////////////////// -#include #include "test_utils.hpp" +#include +#include #include #include #include -#include #include @@ -230,4 +230,27 @@ void chmod(const std::string& path, int permissions) #endif } +void timed_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms) +{ + const auto wait_start = std::chrono::steady_clock::now(); + util::EventLoop::main().run_until([&] { + if (std::chrono::steady_clock::now() - wait_start > max_ms) { + throw std::runtime_error(util::format("timed_wait_for exceeded %1 ms", max_ms.count())); + } + return condition(); + }); +} + +void timed_sleeping_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms, + std::chrono::milliseconds sleep_ms) +{ + const auto wait_start = std::chrono::steady_clock::now(); + while (!condition()) { + if (std::chrono::steady_clock::now() - wait_start > max_ms) { + throw std::runtime_error(util::format("timed_sleeping_wait_for exceeded %1 ms", max_ms.count())); + } + std::this_thread::sleep_for(sleep_ms); + } +} + } // namespace realm diff --git a/test/object-store/util/test_utils.hpp b/test/object-store/util/test_utils.hpp index 38f3e8f1ef3..973e03abcc5 100644 --- a/test/object-store/util/test_utils.hpp +++ b/test/object-store/util/test_utils.hpp @@ -19,11 +19,13 @@ #ifndef REALM_TEST_UTILS_HPP #define REALM_TEST_UTILS_HPP -#include -#include +#include "util/event_loop.hpp" + #include #include +#include +#include #include #include namespace fs = std::filesystem; @@ -145,6 +147,12 @@ bool chmod_supported(const std::string& path); int get_permissions(const std::string& path); void chmod(const std::string& path, int permissions); +void timed_wait_for(util::FunctionRef condition, + std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000)); + +void timed_sleeping_wait_for(util::FunctionRef condition, + std::chrono::milliseconds max_ms = std::chrono::seconds(30), + std::chrono::milliseconds sleep_ms = std::chrono::milliseconds(1)); } // namespace realm #define REQUIRE_DIR_EXISTS(macro_path) \