Skip to content

Commit

Permalink
Avoid blocking in Transaction::close() when there's a cancelled async…
Browse files Browse the repository at this point in the history
… write when possible

We can't cancel the wait for the write lock if another process currently holds
it, but if the DB for the current Transaction holds it and we're just waiting
in the write queue then we can safely close the Transaction immediately and
simply skip over it when it would get its turn to write.
  • Loading branch information
tgoyne committed Mar 28, 2023
1 parent ea7b5d3 commit 983dc6f
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 120 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* Changing the log level on the fly would not affect the core level log output ([#6440](https://github.com/realm/realm-core/issues/6440), since 13.7.0)
* `SyncManager::immediately_run_file_actions()` no longer ignores the result of trying to remove a realm. This could have resulted in a client reset action being reported as successful when it actually failed on windows if the `Realm` was still open ([#6050](https://github.com/realm/realm-core/issues/6050)).

* Fix a deadlock when closing a Transaction with a cancelled asynchronous write scheduled while another Transaction holds the write lock ([PR #6413](https://github.com/realm/realm-core/pull/6413), since v11.10.0).

### Breaking changes
* None.
Expand Down
178 changes: 110 additions & 68 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1668,32 +1668,6 @@ void DB::release_all_read_locks() noexcept
REALM_ASSERT(m_transaction_count == 0);
}

// Note: close() and close_internal() may be called from the DB::~DB().
// in that case, they will not throw. Throwing can only happen if called
// directly.
void DB::close(bool allow_open_read_transactions)
{
// make helper thread terminate
m_commit_helper.reset();

if (m_fake_read_lock_if_immutable) {
if (!is_attached())
return;
{
CheckedLockGuard local_lock(m_mutex);
if (!allow_open_read_transactions && m_transaction_count)
throw WrongTransactionState("Closing with open read transactions");
}
if (m_alloc.is_attached())
m_alloc.detach();
m_fake_read_lock_if_immutable.reset();
}
else {
close_internal(std::unique_lock<InterprocessMutex>(m_controlmutex, std::defer_lock),
allow_open_read_transactions);
}
}

void DB::close_internal(std::unique_lock<InterprocessMutex> lock, bool allow_open_read_transactions)
{
if (!is_attached())
Expand Down Expand Up @@ -1766,18 +1740,23 @@ bool DB::other_writers_waiting_for_lock() const

uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed);
uint32_t next_served = info->next_served.load(std::memory_order_relaxed);
// When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and
// When holding the write lock, next_ticket = next_served + 1, hence, if the difference between 'next_ticket' and
// 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock.
return next_ticket > next_served + 1;
}

class DB::AsyncCommitHelper {
class DB::AsyncCommitHelper : public std::enable_shared_from_this<DB::AsyncCommitHelper> {
public:
AsyncCommitHelper(DB* db)
: m_db(db)
{
}
~AsyncCommitHelper()
{
close();
}

void close()
{
{
std::unique_lock lg(m_mutex);
Expand All @@ -1787,10 +1766,16 @@ class DB::AsyncCommitHelper {
m_running = false;
m_cv_worker.notify_one();
}
m_thread.join();

// The last reference to the DB may have been in a callback being invoked
// by the worker thread. If so, we'll be called from within the work
// loop, and have to just unwind rather than joining.
if (m_thread.get_id() != std::this_thread::get_id()) {
m_thread.join();
}
}

void begin_write(util::UniqueFunction<void()> fn)
void begin_write(util::UniqueFunction<bool()> fn)
{
std::unique_lock lg(m_mutex);
start_thread();
Expand Down Expand Up @@ -1903,7 +1888,7 @@ class DB::AsyncCommitHelper {
std::mutex m_mutex;
std::condition_variable m_cv_worker;
std::condition_variable m_cv_callers;
std::deque<util::UniqueFunction<void()>> m_pending_writes;
std::deque<util::UniqueFunction<bool()>> m_pending_writes;
util::UniqueFunction<void()> m_pending_sync;
size_t m_write_lock_claim_ticket = 0;
size_t m_write_lock_claim_fulfilled = 0;
Expand All @@ -1921,7 +1906,9 @@ class DB::AsyncCommitHelper {
return;
}
m_running = true;
m_thread = std::thread([this]() {
// Capture a strong reference to ourself for as long as the thread is
// running as the thread can outlive the parent DB.
m_thread = std::thread([this, self = shared_from_this()]() {
main();
});
}
Expand Down Expand Up @@ -1980,21 +1967,35 @@ void DB::AsyncCommitHelper::main()
m_has_write_mutex = true;
m_owns_write_mutex = true;

// Synchronous transaction requests get priority over async
// Synchronous transaction requests get priority over async, so
// let the next pending synchronous transaction have the lock
// if there are any
if (m_write_lock_claim_fulfilled < m_write_lock_claim_ticket) {
++m_write_lock_claim_fulfilled;
m_cv_callers.notify_all();
continue;
}

REALM_ASSERT(!m_pending_writes.empty());
auto callback = std::move(m_pending_writes.front());
m_pending_writes.pop_front();
lg.unlock();
callback();
// Release things captured by the callback before reacquiring the lock
callback = nullptr;
lg.lock();
bool did_use_lock = false;
while (!m_pending_writes.empty()) {
auto callback = std::move(m_pending_writes.front());
m_pending_writes.pop_front();
lg.unlock();
did_use_lock = callback();
// Release things captured by the callback before reacquiring the lock
callback = nullptr;
lg.lock();
if (did_use_lock || !m_running) {
break;
}
}

if (!did_use_lock) {
// All of the queued writes were cancelled, so just go ahead
// and release the write lock
m_pending_mx_release = true;
}
continue;
}
}
Expand All @@ -2006,7 +2007,7 @@ void DB::AsyncCommitHelper::main()
}


void DB::async_begin_write(util::UniqueFunction<void()> fn)
void DB::async_begin_write(util::UniqueFunction<bool()> fn)
{
REALM_ASSERT(m_commit_helper);
m_commit_helper->begin_write(std::move(fn));
Expand All @@ -2024,6 +2025,36 @@ void DB::async_sync_to_disk(util::UniqueFunction<void()> fn)
m_commit_helper->sync_to_disk(std::move(fn));
}

// Note: close() and close_internal() may be called from the DB::~DB().
// in that case, they will not throw. Throwing can only happen if called
// directly.
void DB::close(bool allow_open_read_transactions)
{
// make helper thread terminate
if (m_commit_helper) {
m_commit_helper->close();
m_commit_helper.reset();
}

if (m_fake_read_lock_if_immutable) {
if (!is_attached())
return;
{
CheckedLockGuard local_lock(m_mutex);
if (!allow_open_read_transactions && m_transaction_count)
throw WrongTransactionState("Closing with open read transactions");
}
if (m_alloc.is_attached())
m_alloc.detach();
m_fake_read_lock_if_immutable.reset();
}
else {
close_internal(std::unique_lock<InterprocessMutex>(m_controlmutex, std::defer_lock),
allow_open_read_transactions);
}
}


bool DB::has_changed(TransactionRef& tr)
{
if (m_fake_read_lock_if_immutable)
Expand Down Expand Up @@ -2604,49 +2635,60 @@ TransactionRef DB::start_write(bool nonblocking)
return tr;
}

void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
void DB::async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
{
REALM_ASSERT(tr && tr->db.get() == this);
std::chrono::steady_clock::time_point request_start;
{
util::CheckedLockGuard lck(tr->m_async_mutex);
REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle);
tr->m_async_stage = Transaction::AsyncState::Requesting;
tr->m_request_time_point = std::chrono::steady_clock::now();
if (tr->db->m_logger) {
tr->db->m_logger->log(util::Logger::Level::trace, "Async request write lock");
if (m_logger) {
request_start = std::chrono::steady_clock::now();
m_logger->log(util::Logger::Level::trace, "Async request write lock");
}
}
std::weak_ptr<Transaction> weak_tr = tr;
async_begin_write([weak_tr, cb = std::move(when_acquired)]() {
if (auto tr = weak_tr.lock()) {
util::CheckedLockGuard lck(tr->m_async_mutex);
// If a synchronous transaction happened while we were pending
// we may be in HasCommits
if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
tr->m_async_stage = Transaction::AsyncState::HasLock;
}
if (tr->db->m_logger) {
auto t2 = std::chrono::steady_clock::now();
tr->db->m_logger->log(
util::Logger::Level::trace, "Got write lock in %1 us",
std::chrono::duration_cast<std::chrono::microseconds>(t2 - tr->m_request_time_point).count());
}
if (tr->m_waiting_for_write_lock) {
tr->m_waiting_for_write_lock = false;
tr->m_async_cv.notify_one();
}
else if (cb) {
cb();
}
tr.reset(); // Release pointer while lock is held
async_begin_write([this, weak_tr, cb = std::move(when_acquired), request_start]() {
// Check if the Transaction is still valid and still wants the write
// lock. If may no longer be if it was closed while waiting for the lock,
// in which case we'll either move onto the next waiting Transaction or
// just release the lock
auto tr = weak_tr.lock();
if (!tr) {
return false;
}
util::CheckedLockGuard lck(tr->m_async_mutex);
if (tr->m_async_stage == Transaction::AsyncState::Idle) {
return false;
}

// If a synchronous transaction happened while we were pending
// we may be in HasCommits
if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
tr->m_async_stage = Transaction::AsyncState::HasLock;
}
if (m_logger) {
auto t2 = std::chrono::steady_clock::now();
m_logger->log(util::Logger::Level::trace, "Got write lock in %1 us",
std::chrono::duration_cast<std::chrono::microseconds>(t2 - request_start).count());
}
if (tr->m_waiting_for_write_lock) {
tr->m_waiting_for_write_lock = false;
tr->m_async_cv.notify_one();
}
else if (cb) {
cb();
}
return true;
});
}

inline DB::DB(const DBOptions& options)
: m_upgrade_callback(std::move(options.upgrade_callback))
{
if (options.enable_async_writes) {
m_commit_helper = std::make_unique<AsyncCommitHelper>(this);
m_commit_helper = std::make_shared<AsyncCommitHelper>(this);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class DB : public std::enable_shared_from_this<DB> {
// ask for write mutex. Callback takes place when mutex has been acquired.
// callback may occur on ANOTHER THREAD. Must not be called if write mutex
// has already been acquired.
void async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired);
void async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired);

// report statistics of last commit done on THIS DB.
// The free space reported is what can be expected to be freed
Expand Down Expand Up @@ -496,7 +496,7 @@ class DB : public std::enable_shared_from_this<DB> {
util::InterprocessCondVar m_pick_next_writer;
std::function<void(int, int)> m_upgrade_callback;
std::shared_ptr<metrics::Metrics> m_metrics;
std::unique_ptr<AsyncCommitHelper> m_commit_helper;
std::shared_ptr<AsyncCommitHelper> m_commit_helper;
std::shared_ptr<util::Logger> m_logger;
bool m_is_sync_agent = false;

Expand Down Expand Up @@ -608,7 +608,7 @@ class DB : public std::enable_shared_from_this<DB> {
void close_internal(std::unique_lock<util::InterprocessMutex>, bool allow_open_read_transactions)
REQUIRES(!m_mutex);

void async_begin_write(util::UniqueFunction<void()> fn);
void async_begin_write(util::UniqueFunction<bool()> fn);
void async_end_write();
void async_sync_to_disk(util::UniqueFunction<void()> fn);

Expand Down
10 changes: 1 addition & 9 deletions src/realm/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,15 +765,7 @@ void Transaction::prepare_for_close()
break;

case AsyncState::Requesting:
// We don't have the ability to cancel a wait on the write lock, so
// unfortunately we have to wait for it to be acquired.
REALM_ASSERT(m_transact_stage == DB::transact_Reading);
REALM_ASSERT(!m_oldest_version_not_persisted);
m_waiting_for_write_lock = true;
m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
return !m_waiting_for_write_lock;
});
db->end_write_on_correct_thread();
m_async_stage = AsyncState::Idle;
break;

case AsyncState::HasLock:
Expand Down
1 change: 0 additions & 1 deletion src/realm/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ class Transaction : public Group {
util::CheckedMutex m_async_mutex;
std::condition_variable m_async_cv GUARDED_BY(m_async_mutex);
AsyncState m_async_stage GUARDED_BY(m_async_mutex) = AsyncState::Idle;
std::chrono::steady_clock::time_point m_request_time_point;
bool m_waiting_for_write_lock GUARDED_BY(m_async_mutex) = false;
bool m_waiting_for_sync GUARDED_BY(m_async_mutex) = false;

Expand Down
30 changes: 27 additions & 3 deletions test/object-store/realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,7 @@ TEST_CASE("SharedRealm: async writes") {
for (int i = 0; i < 2; ++i) {
SECTION(close_function_names[i]) {
bool persisted = false;
SECTION("before write lock is acquired") {
SECTION("while another DB instance holds lock") {
DBOptions options;
options.encryption_key = config.encryption_key.data();
// Acquire the write lock with a different DB instance so that we'll
Expand Down Expand Up @@ -1627,9 +1627,12 @@ TEST_CASE("SharedRealm: async writes") {
std::invoke(close_functions[i], *realm);

{
// Verify that we released the write lock
// We may not have released the write lock yet when close()
// returns, but it should happen promptly.
auto db = DB::create(make_in_realm_history(), config.path, options);
REQUIRE(db->start_write(/* nonblocking */ true));
timed_wait_for([&] {
return db->start_write(/* nonblocking */ true) != nullptr;
});
}

// Verify that the transaction callback never got enqueued
Expand All @@ -1638,6 +1641,27 @@ TEST_CASE("SharedRealm: async writes") {
});
wait_for_done();
}
SECTION("while another Realm instance holds lock") {
auto realm2 = Realm::get_shared_realm(config);
realm2->begin_transaction();

auto scheduler = realm->scheduler();
realm->async_begin_transaction([&] {
// We should never get here as the realm is closed
FAIL();
});

// Doesn't have to wait for the write lock because the DB
// instance already holds it and we were just waiting for our
// turn with it rather than waiting to acquire it
std::invoke(close_functions[i], *realm);

// Verify that the transaction callback never got enqueued
scheduler->invoke([&] {
done = true;
});
wait_for_done();
}
SECTION("before async_begin_transaction() callback") {
auto scheduler = realm->scheduler();
realm->async_begin_transaction([&] {
Expand Down
Loading

0 comments on commit 983dc6f

Please sign in to comment.