Skip to content

Commit

Permalink
Avoid blocking in Transaction::close() when there's a cancelled async…
Browse files Browse the repository at this point in the history
… write when possible

We can't cancel the wait for the write lock if another process currently holds
it, but if the DB for the current Transaction holds it and we're just waiting
in the write queue then we can safely close the Transaction immediately and
simply skip over it when it would get its turn to write.
  • Loading branch information
tgoyne committed Mar 29, 2023
1 parent 87ea815 commit 0985a9d
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 122 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Fix a stack overflow crash when using the query parser with long chains of AND/OR conditions. ([#6428](https://github.com/realm/realm-core/pull/6428), since v11.7.0)
* Changing the log level on the fly would not affect the core level log output ([#6440](https://github.com/realm/realm-core/issues/6440), since 13.7.0)
* `SyncManager::immediately_run_file_actions()` no longer ignores the result of trying to remove a realm. This could have resulted in a client reset action being reported as successful when it actually failed on windows if the `Realm` was still open ([#6050](https://github.com/realm/realm-core/issues/6050)).
* Fix a deadlock when closing a Transaction with a cancelled asynchronous write scheduled while another Transaction holds the write lock ([PR #6413](https://github.com/realm/realm-core/pull/6413), since v11.10.0).

### Breaking changes
* None.
Expand Down
189 changes: 121 additions & 68 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1668,32 +1668,6 @@ void DB::release_all_read_locks() noexcept
REALM_ASSERT(m_transaction_count == 0);
}

// Note: close() and close_internal() may be called from the DB::~DB().
// in that case, they will not throw. Throwing can only happen if called
// directly.
void DB::close(bool allow_open_read_transactions)
{
// make helper thread terminate
m_commit_helper.reset();

if (m_fake_read_lock_if_immutable) {
if (!is_attached())
return;
{
CheckedLockGuard local_lock(m_mutex);
if (!allow_open_read_transactions && m_transaction_count)
throw WrongTransactionState("Closing with open read transactions");
}
if (m_alloc.is_attached())
m_alloc.detach();
m_fake_read_lock_if_immutable.reset();
}
else {
close_internal(std::unique_lock<InterprocessMutex>(m_controlmutex, std::defer_lock),
allow_open_read_transactions);
}
}

void DB::close_internal(std::unique_lock<InterprocessMutex> lock, bool allow_open_read_transactions)
{
if (!is_attached())
Expand Down Expand Up @@ -1766,18 +1740,23 @@ bool DB::other_writers_waiting_for_lock() const

uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed);
uint32_t next_served = info->next_served.load(std::memory_order_relaxed);
// When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and
// When holding the write lock, next_ticket = next_served + 1, hence, if the difference between 'next_ticket' and
// 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock.
return next_ticket > next_served + 1;
}

class DB::AsyncCommitHelper {
class DB::AsyncCommitHelper : public std::enable_shared_from_this<DB::AsyncCommitHelper> {
public:
AsyncCommitHelper(DB* db)
: m_db(db)
{
}
~AsyncCommitHelper()
{
close();
}

void close()
{
{
std::unique_lock lg(m_mutex);
Expand All @@ -1787,10 +1766,19 @@ class DB::AsyncCommitHelper {
m_running = false;
m_cv_worker.notify_one();
}
m_thread.join();

// The last reference to the DB may have been in a callback being invoked
// by the worker thread. If so, we'll be called from within the work
// loop, and have to just unwind rather than joining.
if (m_thread.get_id() != std::this_thread::get_id()) {
m_thread.join();
}
else {
m_thread.detach();
}
}

void begin_write(util::UniqueFunction<void()> fn)
void begin_write(util::UniqueFunction<bool()> fn)
{
std::unique_lock lg(m_mutex);
start_thread();
Expand Down Expand Up @@ -1903,7 +1891,7 @@ class DB::AsyncCommitHelper {
std::mutex m_mutex;
std::condition_variable m_cv_worker;
std::condition_variable m_cv_callers;
std::deque<util::UniqueFunction<void()>> m_pending_writes;
std::deque<util::UniqueFunction<bool()>> m_pending_writes;
util::UniqueFunction<void()> m_pending_sync;
size_t m_write_lock_claim_ticket = 0;
size_t m_write_lock_claim_fulfilled = 0;
Expand All @@ -1921,7 +1909,9 @@ class DB::AsyncCommitHelper {
return;
}
m_running = true;
m_thread = std::thread([this]() {
// Capture a strong reference to ourself for as long as the thread is
// running as the thread can outlive the parent DB.
m_thread = std::thread([this, self = shared_from_this()]() {
main();
});
}
Expand All @@ -1934,6 +1924,11 @@ class DB::AsyncCommitHelper {

void DB::AsyncCommitHelper::main()
{
// While we own the write mutex we hold a strong reference onto the DB
// because DB doesn't allow closing mid-write. Note that this is
// intentionally declared before the lock as we have to release this while
// not holding the lock to avoid deadlocks.
DBRef dbref;
std::unique_lock lg(m_mutex);
while (m_running) {
#if 0 // Enable for testing purposes
Expand All @@ -1959,6 +1954,7 @@ void DB::AsyncCommitHelper::main()
m_owns_write_mutex = false;

lg.unlock();
dbref.reset();
m_cv_callers.notify_all();
lg.lock();
continue;
Expand All @@ -1973,40 +1969,56 @@ void DB::AsyncCommitHelper::main()
// deadlock if they ask us to perform the sync.
if (!m_waiting_for_write_mutex && has_pending_write_requests()) {
lg.unlock();
dbref = m_db->shared_from_this();
m_db->do_begin_write();
lg.lock();

REALM_ASSERT(!m_has_write_mutex);
m_has_write_mutex = true;
m_owns_write_mutex = true;

// Synchronous transaction requests get priority over async
// Synchronous transaction requests get priority over async, so
// let the next pending synchronous transaction have the lock
// if there are any
if (m_write_lock_claim_fulfilled < m_write_lock_claim_ticket) {
++m_write_lock_claim_fulfilled;
m_cv_callers.notify_all();
continue;
}

REALM_ASSERT(!m_pending_writes.empty());
auto callback = std::move(m_pending_writes.front());
m_pending_writes.pop_front();
lg.unlock();
callback();
// Release things captured by the callback before reacquiring the lock
callback = nullptr;
lg.lock();
bool did_use_lock = false;
while (!m_pending_writes.empty()) {
auto callback = std::move(m_pending_writes.front());
m_pending_writes.pop_front();
lg.unlock();
did_use_lock = callback();
// Release things captured by the callback before reacquiring the lock
callback = nullptr;
lg.lock();
if (did_use_lock) {
break;
}
}

if (!did_use_lock) {
// All of the queued writes were cancelled, so just go ahead
// and release the write lock
m_pending_mx_release = true;
}
continue;
}
}
m_cv_worker.wait(lg);
}
if (m_has_write_mutex && m_owns_write_mutex) {
REALM_ASSERT(dbref);
m_db->do_end_write();
}
}


void DB::async_begin_write(util::UniqueFunction<void()> fn)
void DB::async_begin_write(util::UniqueFunction<bool()> fn)
{
REALM_ASSERT(m_commit_helper);
m_commit_helper->begin_write(std::move(fn));
Expand All @@ -2024,6 +2036,36 @@ void DB::async_sync_to_disk(util::UniqueFunction<void()> fn)
m_commit_helper->sync_to_disk(std::move(fn));
}

// Note: close() and close_internal() may be called from the DB::~DB().
// in that case, they will not throw. Throwing can only happen if called
// directly.
void DB::close(bool allow_open_read_transactions)
{
// make helper thread terminate
if (m_commit_helper) {
m_commit_helper->close();
m_commit_helper.reset();
}

if (m_fake_read_lock_if_immutable) {
if (!is_attached())
return;
{
CheckedLockGuard local_lock(m_mutex);
if (!allow_open_read_transactions && m_transaction_count)
throw WrongTransactionState("Closing with open read transactions");
}
if (m_alloc.is_attached())
m_alloc.detach();
m_fake_read_lock_if_immutable.reset();
}
else {
close_internal(std::unique_lock<InterprocessMutex>(m_controlmutex, std::defer_lock),
allow_open_read_transactions);
}
}


bool DB::has_changed(TransactionRef& tr)
{
if (m_fake_read_lock_if_immutable)
Expand Down Expand Up @@ -2604,49 +2646,60 @@ TransactionRef DB::start_write(bool nonblocking)
return tr;
}

void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
void DB::async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
{
REALM_ASSERT(tr && tr->db.get() == this);
std::chrono::steady_clock::time_point request_start;
{
util::CheckedLockGuard lck(tr->m_async_mutex);
REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle);
tr->m_async_stage = Transaction::AsyncState::Requesting;
tr->m_request_time_point = std::chrono::steady_clock::now();
if (tr->db->m_logger) {
tr->db->m_logger->log(util::Logger::Level::trace, "Async request write lock");
if (m_logger) {
request_start = std::chrono::steady_clock::now();
m_logger->log(util::Logger::Level::trace, "Async request write lock");
}
}
std::weak_ptr<Transaction> weak_tr = tr;
async_begin_write([weak_tr, cb = std::move(when_acquired)]() {
if (auto tr = weak_tr.lock()) {
util::CheckedLockGuard lck(tr->m_async_mutex);
// If a synchronous transaction happened while we were pending
// we may be in HasCommits
if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
tr->m_async_stage = Transaction::AsyncState::HasLock;
}
if (tr->db->m_logger) {
auto t2 = std::chrono::steady_clock::now();
tr->db->m_logger->log(
util::Logger::Level::trace, "Got write lock in %1 us",
std::chrono::duration_cast<std::chrono::microseconds>(t2 - tr->m_request_time_point).count());
}
if (tr->m_waiting_for_write_lock) {
tr->m_waiting_for_write_lock = false;
tr->m_async_cv.notify_one();
}
else if (cb) {
cb();
}
tr.reset(); // Release pointer while lock is held
async_begin_write([this, weak_tr, cb = std::move(when_acquired), request_start]() {
// Check if the Transaction is still valid and still wants the write
// lock. If may no longer be if it was closed while waiting for the lock,
// in which case we'll either move onto the next waiting Transaction or
// just release the lock
auto tr = weak_tr.lock();
if (!tr) {
return false;
}
util::CheckedLockGuard lck(tr->m_async_mutex);
if (tr->m_async_stage == Transaction::AsyncState::Idle) {
return false;
}

// If a synchronous transaction happened while we were pending
// we may be in HasCommits
if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
tr->m_async_stage = Transaction::AsyncState::HasLock;
}
if (m_logger) {
auto t2 = std::chrono::steady_clock::now();
m_logger->log(util::Logger::Level::trace, "Got write lock in %1 us",
std::chrono::duration_cast<std::chrono::microseconds>(t2 - request_start).count());
}
if (tr->m_waiting_for_write_lock) {
tr->m_waiting_for_write_lock = false;
tr->m_async_cv.notify_one();
}
else if (cb) {
cb();
}
return true;
});
}

inline DB::DB(const DBOptions& options)
: m_upgrade_callback(std::move(options.upgrade_callback))
{
if (options.enable_async_writes) {
m_commit_helper = std::make_unique<AsyncCommitHelper>(this);
m_commit_helper = std::make_shared<AsyncCommitHelper>(this);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class DB : public std::enable_shared_from_this<DB> {
// ask for write mutex. Callback takes place when mutex has been acquired.
// callback may occur on ANOTHER THREAD. Must not be called if write mutex
// has already been acquired.
void async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired);
void async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired);

// report statistics of last commit done on THIS DB.
// The free space reported is what can be expected to be freed
Expand Down Expand Up @@ -496,7 +496,7 @@ class DB : public std::enable_shared_from_this<DB> {
util::InterprocessCondVar m_pick_next_writer;
std::function<void(int, int)> m_upgrade_callback;
std::shared_ptr<metrics::Metrics> m_metrics;
std::unique_ptr<AsyncCommitHelper> m_commit_helper;
std::shared_ptr<AsyncCommitHelper> m_commit_helper;
std::shared_ptr<util::Logger> m_logger;
bool m_is_sync_agent = false;

Expand Down Expand Up @@ -608,7 +608,7 @@ class DB : public std::enable_shared_from_this<DB> {
void close_internal(std::unique_lock<util::InterprocessMutex>, bool allow_open_read_transactions)
REQUIRES(!m_mutex);

void async_begin_write(util::UniqueFunction<void()> fn);
void async_begin_write(util::UniqueFunction<bool()> fn);
void async_end_write();
void async_sync_to_disk(util::UniqueFunction<void()> fn);

Expand Down
10 changes: 1 addition & 9 deletions src/realm/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,15 +765,7 @@ void Transaction::prepare_for_close()
break;

case AsyncState::Requesting:
// We don't have the ability to cancel a wait on the write lock, so
// unfortunately we have to wait for it to be acquired.
REALM_ASSERT(m_transact_stage == DB::transact_Reading);
REALM_ASSERT(!m_oldest_version_not_persisted);
m_waiting_for_write_lock = true;
m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
return !m_waiting_for_write_lock;
});
db->end_write_on_correct_thread();
m_async_stage = AsyncState::Idle;
break;

case AsyncState::HasLock:
Expand Down
1 change: 0 additions & 1 deletion src/realm/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ class Transaction : public Group {
util::CheckedMutex m_async_mutex;
std::condition_variable m_async_cv GUARDED_BY(m_async_mutex);
AsyncState m_async_stage GUARDED_BY(m_async_mutex) = AsyncState::Idle;
std::chrono::steady_clock::time_point m_request_time_point;
bool m_waiting_for_write_lock GUARDED_BY(m_async_mutex) = false;
bool m_waiting_for_sync GUARDED_BY(m_async_mutex) = false;

Expand Down
Loading

0 comments on commit 0985a9d

Please sign in to comment.