Skip to content

Commit

Permalink
Add partial support for cancelling async write mutex requests
Browse files Browse the repository at this point in the history
While we can't cancel the actual wait on the write mutex, we can dequeue
specific Transactions which are waiting for their turn to write, and only block
when the DB itself is destroyed. This makes it so that individual Transactions
with cancelled async writes can be cleaned up while the write lock is held.

This is done by changing the async write queue in DB::AsyncCommitHelper from
arbitrary callbacks to a queue of Transaction instances. This increases the
coupling between the types, but makes it easier to dequeue specific instances
and relying on the specific details of what Transaction will do simplifies the
locking involved.
  • Loading branch information
tgoyne committed Apr 13, 2023
1 parent 7a17dab commit 2604832
Show file tree
Hide file tree
Showing 20 changed files with 541 additions and 279 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Fixed
* Fixed a crash or exception when doing a fulltext search for multiple keywords when the intersection of results is not equal. ([#6465](https://github.com/realm/realm-core/issues/6465) since v13.2.0).
* Fixed issue where build would not succeed when consuming core as an installed dependancy due to missing install headers ([#6479](https://github.com/realm/realm-core/pull/6479) since v13.4.1).
* Fix a deadlock when closing a Transaction with a cancelled asynchronous write scheduled while another Transaction holds the write lock ([PR #6486](https://github.com/realm/realm-core/pull/6486), since v11.10.0)

### Breaking changes
* None.
Expand Down
1 change: 1 addition & 0 deletions src/realm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ set(REALM_INSTALL_HEADERS
util/random.hpp
util/safe_int_ops.hpp
util/scope_exit.hpp
util/semaphore.hpp
util/serializer.hpp
util/sha_crypto.hpp
util/span.hpp
Expand Down
205 changes: 103 additions & 102 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1861,37 +1861,43 @@ class DB::AsyncCommitHelper {
}
~AsyncCommitHelper()
{
{
std::unique_lock lg(m_mutex);
if (!m_running) {
return;
}
m_running = false;
m_cv_worker.notify_one();
if (!m_running.exchange(false, std::memory_order_relaxed)) {
return;
}
m_cv_worker.notify_one();
m_thread.join();
}

void begin_write(util::UniqueFunction<void()> fn)
void begin_write(Transaction* tr) REQUIRES(!m_mutex)
{
std::unique_lock lg(m_mutex);
start_thread();
m_pending_writes.emplace_back(std::move(fn));
m_cv_worker.notify_one();
{
util::CheckedLockGuard lg(m_mutex);
m_pending_writes.push_back(tr);
}
wake_up_thread();
}

void blocking_begin_write()
bool cancel_begin_write(Transaction* tr) REQUIRES(!m_mutex)
{
std::unique_lock lg(m_mutex);
util::CheckedLockGuard lg(m_mutex);
if (auto it = std::find(m_pending_writes.begin(), m_pending_writes.end(), tr); it != m_pending_writes.end()) {
m_pending_writes.erase(it);
return true;
}
return false;
}

void blocking_begin_write() REQUIRES(!m_mutex)
{
util::CheckedUniqueLock lg(m_mutex);

// If we support unlocking InterprocessMutex from a different thread
// than it was locked on, we can sometimes just begin the write on
// the current thread. This requires that no one is currently waiting
// for the worker thread to acquire the write lock, as we'll deadlock
// if we try to async commit while the worker is waiting for the lock.
bool can_lock_on_caller =
!InterprocessMutex::is_thread_confined && (!m_owns_write_mutex && m_pending_writes.empty() &&
m_write_lock_claim_ticket == m_write_lock_claim_fulfilled);
!InterprocessMutex::is_thread_confined && (!m_owns_write_mutex && !has_pending_write_requests());

// If we support cross-thread unlocking and m_running is false,
// can_lock_on_caller should always be true or we forgot to launch the thread
Expand All @@ -1911,17 +1917,16 @@ class DB::AsyncCommitHelper {

// Otherwise we have to ask the worker thread to acquire it and wait
// for that
start_thread();
size_t ticket = ++m_write_lock_claim_ticket;
m_cv_worker.notify_one();
m_cv_callers.wait(lg, [this, ticket] {
wake_up_thread();
m_cv_callers.wait(lg.native_handle(), [this, ticket]() REQUIRES(m_mutex) {
return ticket == m_write_lock_claim_fulfilled;
});
}

void end_write()
void end_write() REQUIRES(!m_mutex)
{
std::unique_lock lg(m_mutex);
util::CheckedLockGuard lg(m_mutex);
REALM_ASSERT(m_has_write_mutex);
REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined);

Expand All @@ -1937,9 +1942,9 @@ class DB::AsyncCommitHelper {
}
}

bool blocking_end_write()
bool blocking_end_write() REQUIRES(!m_mutex)
{
std::unique_lock lg(m_mutex);
util::CheckedUniqueLock lg(m_mutex);
if (!m_has_write_mutex) {
return false;
}
Expand All @@ -1950,7 +1955,7 @@ class DB::AsyncCommitHelper {
if (m_owns_write_mutex) {
m_pending_mx_release = true;
m_cv_worker.notify_one();
m_cv_callers.wait(lg, [this] {
m_cv_callers.wait(lg.native_handle(), [this]() REQUIRES(m_mutex) {
return !m_pending_mx_release;
});
}
Expand All @@ -1969,67 +1974,87 @@ class DB::AsyncCommitHelper {
}


void sync_to_disk(util::UniqueFunction<void()> fn)
void sync_to_disk(Transaction* tr) REQUIRES(!m_mutex)
{
REALM_ASSERT(fn);
std::unique_lock lg(m_mutex);
REALM_ASSERT(tr);
util::CheckedLockGuard lg(m_mutex);
REALM_ASSERT(!m_pending_sync);
start_thread();
m_pending_sync = std::move(fn);
m_cv_worker.notify_one();
REALM_ASSERT(m_has_write_mutex);
m_pending_sync = tr;
wake_up_thread();
}

private:
DB* m_db;
std::thread m_thread;
std::mutex m_mutex;
// A mutex which guards most of the members in this class
util::CheckedMutex m_mutex;
// CV which the worker thread waits on to await work
std::condition_variable m_cv_worker;
// CV which other threads wait on to await the worker thread completing work
std::condition_variable m_cv_callers;
std::deque<util::UniqueFunction<void()>> m_pending_writes;
util::UniqueFunction<void()> m_pending_sync;
size_t m_write_lock_claim_ticket = 0;
size_t m_write_lock_claim_fulfilled = 0;
bool m_pending_mx_release = false;
bool m_running = false;
bool m_has_write_mutex = false;
bool m_owns_write_mutex = false;
bool m_waiting_for_write_mutex = false;

void main();

void start_thread()
// Transactions which are waiting for their turn to write. These are non-owning
// pointers to avoid a retain cycle. Weak pointers would result in the
// Transaction sometimes being destroyed on the worker thread, which results
// in complicated teardown. The non-owning pointers are safe because
// Transaction will unregister itself and/or wait for operations to complete
// before closing.
std::deque<Transaction*> m_pending_writes GUARDED_BY(m_mutex);
// The Transaction which has commits to write to disk.
Transaction* m_pending_sync GUARDED_BY(m_mutex) = nullptr;
// Ticketing system for blocking write transactions. Blocking writes increment
// claim_ticket and then wait on m_cv_callers until claim_fulfilled is equal
// to the value of claim_ticket they saw at the start of the wait. The worker
// thread increments claim_fulfilled instead of calling an async callback
// if it is below claim_ticket, as sync writes take priority over async.
size_t m_write_lock_claim_ticket GUARDED_BY(m_mutex) = 0;
size_t m_write_lock_claim_fulfilled GUARDED_BY(m_mutex) = 0;
bool m_pending_mx_release GUARDED_BY(m_mutex) = false;
std::atomic<bool> m_running = false;
bool m_has_write_mutex GUARDED_BY(m_mutex) = false;
// True if the worker thread specifically owns the write mutex. May be false
// while `m_has_write_mutex` is true if the write mutex was acquired via
// a blocking begin and the mutex supports cross-thread unlocks.
bool m_owns_write_mutex GUARDED_BY(m_mutex) = false;
bool m_waiting_for_write_mutex GUARDED_BY(m_mutex) = false;

void main() REQUIRES(!m_mutex);

void wake_up_thread()
{
if (m_running) {
if (m_running.exchange(true, std::memory_order_relaxed)) {
m_cv_worker.notify_one();
return;
}
m_running = true;
m_thread = std::thread([this]() {
main();
});
}

bool has_pending_write_requests()
bool has_pending_write_requests() REQUIRES(m_mutex)
{
return m_write_lock_claim_fulfilled < m_write_lock_claim_ticket || !m_pending_writes.empty();
}
};

void DB::AsyncCommitHelper::main()
{
std::unique_lock lg(m_mutex);
util::CheckedUniqueLock lg(m_mutex);
while (m_running) {
#if 0 // Enable for testing purposes
std::this_thread::sleep_for(std::chrono::milliseconds(10));
#endif
if (m_has_write_mutex) {
if (auto cb = std::move(m_pending_sync)) {
if (auto tr = m_pending_sync) {
m_pending_sync = nullptr;
// Only one of sync_to_disk(), end_write(), or blocking_end_write()
// should be called, so we should never have both a pending sync
// and pending release.
REALM_ASSERT(!m_pending_mx_release);
lg.unlock();
cb();
cb = nullptr; // Release things captured by the callback before reacquiring the lock
// Release the lock while calling this as it performs the actual
// i/o and we need to not block other threads.
tr->sync_async_commit();
lg.lock();
m_pending_mx_release = true;
}
Expand Down Expand Up @@ -2069,29 +2094,44 @@ void DB::AsyncCommitHelper::main()
continue;
}

REALM_ASSERT(!m_pending_writes.empty());
auto callback = std::move(m_pending_writes.front());
// The request could have been cancelled while we were waiting
// for the lock
if (m_pending_writes.empty()) {
m_pending_mx_release = true;
continue;
}

auto writer = m_pending_writes.front();
m_pending_writes.pop_front();
lg.unlock();
callback();
// Release things captured by the callback before reacquiring the lock
callback = nullptr;
lg.lock();
// Call with lock held: this is safe to do because the function
// does very little work before dispatching the main work to the
// scheduler (and importantly will never call back into us), and
// it's required for Transaction teardown to be safe. We don't
// hold a strong reference to the Transaction to ensure that we
// will never destroy the DB on this thread, so we have to block
// closing the Transaction until we're done with the unowned pointer.
writer->async_write_began();
continue;
}
}
m_cv_worker.wait(lg);
m_cv_worker.wait(lg.native_handle());
}
if (m_has_write_mutex && m_owns_write_mutex) {
m_db->do_end_write();
}
}


void DB::async_begin_write(util::UniqueFunction<void()> fn)
void DB::async_begin_write(Transaction* tr)
{
REALM_ASSERT(m_commit_helper);
m_commit_helper->begin_write(std::move(fn));
m_commit_helper->begin_write(tr);
}

bool DB::cancel_async_begin_write(Transaction* tr)
{
REALM_ASSERT(m_commit_helper);
return m_commit_helper->cancel_begin_write(tr);
}

void DB::async_end_write()
Expand All @@ -2100,10 +2140,10 @@ void DB::async_end_write()
m_commit_helper->end_write();
}

void DB::async_sync_to_disk(util::UniqueFunction<void()> fn)
void DB::async_sync_to_disk(Transaction* tr)
{
REALM_ASSERT(m_commit_helper);
m_commit_helper->sync_to_disk(std::move(fn));
m_commit_helper->sync_to_disk(tr);
}

bool DB::has_changed(TransactionRef& tr)
Expand Down Expand Up @@ -2687,44 +2727,6 @@ TransactionRef DB::start_write(bool nonblocking)
return tr;
}

void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
{
{
util::CheckedLockGuard lck(tr->m_async_mutex);
REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle);
tr->m_async_stage = Transaction::AsyncState::Requesting;
tr->m_request_time_point = std::chrono::steady_clock::now();
if (tr->db->m_logger) {
tr->db->m_logger->log(util::Logger::Level::trace, "Async request write lock");
}
}
std::weak_ptr<Transaction> weak_tr = tr;
async_begin_write([weak_tr, cb = std::move(when_acquired)]() {
if (auto tr = weak_tr.lock()) {
util::CheckedLockGuard lck(tr->m_async_mutex);
// If a synchronous transaction happened while we were pending
// we may be in HasCommits
if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
tr->m_async_stage = Transaction::AsyncState::HasLock;
}
if (tr->db->m_logger) {
auto t2 = std::chrono::steady_clock::now();
tr->db->m_logger->log(
util::Logger::Level::trace, "Got write lock in %1 us",
std::chrono::duration_cast<std::chrono::microseconds>(t2 - tr->m_request_time_point).count());
}
if (tr->m_waiting_for_write_lock) {
tr->m_waiting_for_write_lock = false;
tr->m_async_cv.notify_one();
}
else if (cb) {
cb();
}
tr.reset(); // Release pointer while lock is held
}
});
}

inline DB::DB(const DBOptions& options)
: m_upgrade_callback(std::move(options.upgrade_callback))
{
Expand Down Expand Up @@ -2818,7 +2820,6 @@ void DB::do_begin_possibly_async_write()

void DB::end_write_on_correct_thread() noexcept
{
// m_local_write_mutex.unlock();
if (!m_commit_helper || !m_commit_helper->blocking_end_write()) {
do_end_write();
}
Expand Down
10 changes: 3 additions & 7 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,6 @@ class DB : public std::enable_shared_from_this<DB> {
// an invalid TransactionRef is returned.
TransactionRef start_write(bool nonblocking = false) REQUIRES(!m_mutex);

// ask for write mutex. Callback takes place when mutex has been acquired.
// callback may occur on ANOTHER THREAD. Must not be called if write mutex
// has already been acquired.
void async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired);

// report statistics of last commit done on THIS DB.
// The free space reported is what can be expected to be freed
// by compact(). This may not correspond to the space which is free
Expand Down Expand Up @@ -612,9 +607,10 @@ class DB : public std::enable_shared_from_this<DB> {
void close_internal(std::unique_lock<util::InterprocessMutex>, bool allow_open_read_transactions)
REQUIRES(!m_mutex);

void async_begin_write(util::UniqueFunction<void()> fn);
void async_begin_write(Transaction* tr);
bool cancel_async_begin_write(Transaction* tr);
void async_end_write();
void async_sync_to_disk(util::UniqueFunction<void()> fn);
void async_sync_to_disk(Transaction*);

friend class SlabAlloc;
friend class Transaction;
Expand Down
11 changes: 0 additions & 11 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1306,14 +1306,3 @@ void RealmCoordinator::write_copy(StringData path, const char* key)
{
m_db->write_copy(path, key);
}

void RealmCoordinator::async_request_write_mutex(Realm& realm)
{
auto tr = Realm::Internal::get_transaction_ref(realm);
m_db->async_request_write_mutex(tr, [realm = realm.shared_from_this()]() mutable {
auto& scheduler = *realm->scheduler();
scheduler.invoke([realm = std::move(realm)] {
Realm::Internal::run_writes(*realm);
});
});
}
Loading

0 comments on commit 2604832

Please sign in to comment.