Skip to content

Commit

Permalink
Avoid blocking in Transaction::close() when there's a cancelled async…
Browse files Browse the repository at this point in the history
… write when possible

We can't cancel the wait for the write lock if another process currently holds
it, but if the DB for the current Transaction holds it and we're just waiting
in the write queue then we can safely close the Transaction immediately and
simply skip over it when it would get its turn to write.
  • Loading branch information
tgoyne committed Mar 23, 2023
1 parent b8aad6b commit ef2145d
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
### Fixed
* `SyncSession::pause()` could hold a reference to the database open after shutting down the sync session, preventing users from being able to delete the realm. ([#6372](https://github.com/realm/realm-core/issues/6372), since v13.3.0)
* `Logger::set_default_logger()` did not perform any locking, resulting in data races if it was called while the default logger was being read on another thread ([PR #6398](https://github.com/realm/realm-core/pull/6398), since v13.7.0).
* Fix a deadlock when closing a Transaction with a cancelled asynchronous write scheduled while another Transaction holds the write lock ([PR #6413](https://github.com/realm/realm-core/pull/6413), since v11.10.0).

### Breaking changes
* None.
Expand Down
99 changes: 62 additions & 37 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1792,7 +1792,7 @@ class DB::AsyncCommitHelper {
m_thread.join();
}

void begin_write(util::UniqueFunction<void()> fn)
void begin_write(util::UniqueFunction<bool()> fn)
{
std::unique_lock lg(m_mutex);
start_thread();
Expand Down Expand Up @@ -1905,7 +1905,7 @@ class DB::AsyncCommitHelper {
std::mutex m_mutex;
std::condition_variable m_cv_worker;
std::condition_variable m_cv_callers;
std::deque<util::UniqueFunction<void()>> m_pending_writes;
std::deque<util::UniqueFunction<bool()>> m_pending_writes;
util::UniqueFunction<void()> m_pending_sync;
size_t m_write_lock_claim_ticket = 0;
size_t m_write_lock_claim_fulfilled = 0;
Expand Down Expand Up @@ -1982,21 +1982,35 @@ void DB::AsyncCommitHelper::main()
m_has_write_mutex = true;
m_owns_write_mutex = true;

// Synchronous transaction requests get priority over async
// Synchronous transaction requests get priority over async, so
// let the next pending synchronous transaction have the lock
// if there are any
if (m_write_lock_claim_fulfilled < m_write_lock_claim_ticket) {
++m_write_lock_claim_fulfilled;
m_cv_callers.notify_all();
continue;
}

REALM_ASSERT(!m_pending_writes.empty());
auto callback = std::move(m_pending_writes.front());
m_pending_writes.pop_front();
lg.unlock();
callback();
// Release things captured by the callback before reacquiring the lock
callback = nullptr;
lg.lock();
bool did_use_lock = false;
while (!m_pending_writes.empty()) {
auto callback = std::move(m_pending_writes.front());
m_pending_writes.pop_front();
lg.unlock();
did_use_lock = callback();
// Release things captured by the callback before reacquiring the lock
callback = nullptr;
lg.lock();
if (did_use_lock) {
break;
}
}

if (!did_use_lock) {
// All of the queued writes were cancelled, so just go ahead
// and release the write lock
m_pending_mx_release = true;
}
continue;
}
}
Expand All @@ -2008,7 +2022,7 @@ void DB::AsyncCommitHelper::main()
}


void DB::async_begin_write(util::UniqueFunction<void()> fn)
void DB::async_begin_write(util::UniqueFunction<bool()> fn)
{
REALM_ASSERT(m_commit_helper);
m_commit_helper->begin_write(std::move(fn));
Expand Down Expand Up @@ -2606,41 +2620,52 @@ TransactionRef DB::start_write(bool nonblocking)
return tr;
}

void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
void DB::async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
{
REALM_ASSERT(tr && tr->db.get() == this);
std::chrono::steady_clock::time_point request_start;
{
util::CheckedLockGuard lck(tr->m_async_mutex);
REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle);
tr->m_async_stage = Transaction::AsyncState::Requesting;
tr->m_request_time_point = std::chrono::steady_clock::now();
if (tr->db->m_logger) {
tr->db->m_logger->log(util::Logger::Level::trace, "Async request write lock");
if (m_logger) {
request_start = std::chrono::steady_clock::now();
m_logger->log(util::Logger::Level::trace, "Async request write lock");
}
}
std::weak_ptr<Transaction> weak_tr = tr;
async_begin_write([weak_tr, cb = std::move(when_acquired)]() {
if (auto tr = weak_tr.lock()) {
util::CheckedLockGuard lck(tr->m_async_mutex);
// If a synchronous transaction happened while we were pending
// we may be in HasCommits
if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
tr->m_async_stage = Transaction::AsyncState::HasLock;
}
if (tr->db->m_logger) {
auto t2 = std::chrono::steady_clock::now();
tr->db->m_logger->log(
util::Logger::Level::trace, "Got write lock in %1 us",
std::chrono::duration_cast<std::chrono::microseconds>(t2 - tr->m_request_time_point).count());
}
if (tr->m_waiting_for_write_lock) {
tr->m_waiting_for_write_lock = false;
tr->m_async_cv.notify_one();
}
else if (cb) {
cb();
}
tr.reset(); // Release pointer while lock is held
async_begin_write([this, weak_tr, cb = std::move(when_acquired), request_start]() {
// Check if the Transaction is still valid and still wants the write
// lock. If may no longer be if it was closed while waiting for the lock,
// in which case we'll either move onto the next waiting Transaction or
// just release the lock
auto tr = weak_tr.lock();
if (!tr) {
return false;
}
util::CheckedLockGuard lck(tr->m_async_mutex);
if (tr->m_async_stage == Transaction::AsyncState::Idle) {
return false;
}

// If a synchronous transaction happened while we were pending
// we may be in HasCommits
if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
tr->m_async_stage = Transaction::AsyncState::HasLock;
}
if (m_logger) {
auto t2 = std::chrono::steady_clock::now();
m_logger->log(util::Logger::Level::trace, "Got write lock in %1 us",
std::chrono::duration_cast<std::chrono::microseconds>(t2 - request_start).count());
}
if (tr->m_waiting_for_write_lock) {
tr->m_waiting_for_write_lock = false;
tr->m_async_cv.notify_one();
}
else if (cb) {
cb();
}
return true;
});
}

Expand Down
4 changes: 2 additions & 2 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class DB : public std::enable_shared_from_this<DB> {
// ask for write mutex. Callback takes place when mutex has been acquired.
// callback may occur on ANOTHER THREAD. Must not be called if write mutex
// has already been acquired.
void async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired);
void async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired);

// report statistics of last commit done on THIS DB.
// The free space reported is what can be expected to be freed
Expand Down Expand Up @@ -608,7 +608,7 @@ class DB : public std::enable_shared_from_this<DB> {
void close_internal(std::unique_lock<util::InterprocessMutex>, bool allow_open_read_transactions)
REQUIRES(!m_mutex);

void async_begin_write(util::UniqueFunction<void()> fn);
void async_begin_write(util::UniqueFunction<bool()> fn);
void async_end_write();
void async_sync_to_disk(util::UniqueFunction<void()> fn);

Expand Down
10 changes: 1 addition & 9 deletions src/realm/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,15 +765,7 @@ void Transaction::prepare_for_close()
break;

case AsyncState::Requesting:
// We don't have the ability to cancel a wait on the write lock, so
// unfortunately we have to wait for it to be acquired.
REALM_ASSERT(m_transact_stage == DB::transact_Reading);
REALM_ASSERT(!m_oldest_version_not_persisted);
m_waiting_for_write_lock = true;
m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
return !m_waiting_for_write_lock;
});
db->end_write_on_correct_thread();
m_async_stage = AsyncState::Idle;
break;

case AsyncState::HasLock:
Expand Down
1 change: 0 additions & 1 deletion src/realm/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ class Transaction : public Group {
util::CheckedMutex m_async_mutex;
std::condition_variable m_async_cv GUARDED_BY(m_async_mutex);
AsyncState m_async_stage GUARDED_BY(m_async_mutex) = AsyncState::Idle;
std::chrono::steady_clock::time_point m_request_time_point;
bool m_waiting_for_write_lock GUARDED_BY(m_async_mutex) = false;
bool m_waiting_for_sync GUARDED_BY(m_async_mutex) = false;

Expand Down
30 changes: 27 additions & 3 deletions test/object-store/realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,7 @@ TEST_CASE("SharedRealm: async writes") {
for (int i = 0; i < 2; ++i) {
SECTION(close_function_names[i]) {
bool persisted = false;
SECTION("before write lock is acquired") {
SECTION("while another DB instance holds lock") {
DBOptions options;
options.encryption_key = config.encryption_key.data();
// Acquire the write lock with a different DB instance so that we'll
Expand Down Expand Up @@ -1627,9 +1627,12 @@ TEST_CASE("SharedRealm: async writes") {
std::invoke(close_functions[i], *realm);

{
// Verify that we released the write lock
// We may not have released the write lock yet when close()
// returns, but it should happen promptly.
auto db = DB::create(make_in_realm_history(), config.path, options);
REQUIRE(db->start_write(/* nonblocking */ true));
timed_wait_for([&] {
return db->start_write(/* nonblocking */ true) != nullptr;
});
}

// Verify that the transaction callback never got enqueued
Expand All @@ -1638,6 +1641,27 @@ TEST_CASE("SharedRealm: async writes") {
});
wait_for_done();
}
SECTION("while another Realm instance holds lock") {
auto realm2 = Realm::get_shared_realm(config);
realm2->begin_transaction();

auto scheduler = realm->scheduler();
realm->async_begin_transaction([&] {
// We should never get here as the realm is closed
FAIL();
});

// Doesn't have to wait for the write lock because the DB
// instance already holds it and we were just waiting for our
// turn with it rather than waiting to acquire it
std::invoke(close_functions[i], *realm);

// Verify that the transaction callback never got enqueued
scheduler->invoke([&] {
done = true;
});
wait_for_done();
}
SECTION("before async_begin_transaction() callback") {
auto scheduler = realm->scheduler();
realm->async_begin_transaction([&] {
Expand Down
23 changes: 0 additions & 23 deletions test/object-store/sync/sync_test_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,29 +87,6 @@ bool ReturnsTrueWithinTimeLimit::match(util::FunctionRef<bool()> condition) cons
return predicate_returned_true;
}

void timed_wait_for(util::FunctionRef<bool()> condition, std::chrono::milliseconds max_ms)
{
const auto wait_start = std::chrono::steady_clock::now();
util::EventLoop::main().run_until([&] {
if (std::chrono::steady_clock::now() - wait_start > max_ms) {
throw std::runtime_error(util::format("timed_wait_for exceeded %1 ms", max_ms.count()));
}
return condition();
});
}

void timed_sleeping_wait_for(util::FunctionRef<bool()> condition, std::chrono::milliseconds max_ms,
std::chrono::milliseconds sleep_ms)
{
const auto wait_start = std::chrono::steady_clock::now();
while (!condition()) {
if (std::chrono::steady_clock::now() - wait_start > max_ms) {
throw std::runtime_error(util::format("timed_sleeping_wait_for exceeded %1 ms", max_ms.count()));
}
std::this_thread::sleep_for(sleep_ms);
}
}

auto do_hash = [](const std::string& name) -> std::string {
std::array<unsigned char, 32> hash;
util::sha256(name.data(), name.size(), hash.data());
Expand Down
8 changes: 0 additions & 8 deletions test/object-store/sync/sync_test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include <realm/util/functional.hpp>
#include <realm/util/function_ref.hpp>

#include "util/event_loop.hpp"
#include "util/test_file.hpp"
#include "util/test_utils.hpp"

Expand All @@ -47,13 +46,6 @@ bool results_contains_user(SyncUserMetadataResults& results, const std::string&
const std::string& auth_server);
bool results_contains_original_name(SyncFileActionMetadataResults& results, const std::string& original_name);

void timed_wait_for(util::FunctionRef<bool()> condition,
std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000));

void timed_sleeping_wait_for(util::FunctionRef<bool()> condition,
std::chrono::milliseconds max_ms = std::chrono::seconds(30),
std::chrono::milliseconds sleep_ms = std::chrono::milliseconds(1));

class ReturnsTrueWithinTimeLimit : public Catch::Matchers::MatcherGenericBase {
public:
ReturnsTrueWithinTimeLimit(std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000))
Expand Down
27 changes: 25 additions & 2 deletions test/object-store/util/test_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
//
////////////////////////////////////////////////////////////////////////////

#include <realm/object-store/impl/realm_coordinator.hpp>
#include "test_utils.hpp"

#include <realm/object-store/impl/realm_coordinator.hpp>
#include <realm/string_data.hpp>
#include <realm/util/base64.hpp>
#include <realm/util/demangle.hpp>
#include <realm/util/file.hpp>
#include <realm/string_data.hpp>

#include <external/json/json.hpp>

Expand Down Expand Up @@ -227,4 +227,27 @@ void chmod(const std::string& path, int permissions)
#endif
}

void timed_wait_for(util::FunctionRef<bool()> condition, std::chrono::milliseconds max_ms)
{
const auto wait_start = std::chrono::steady_clock::now();
util::EventLoop::main().run_until([&] {
if (std::chrono::steady_clock::now() - wait_start > max_ms) {
throw std::runtime_error(util::format("timed_wait_for exceeded %1 ms", max_ms.count()));
}
return condition();
});
}

void timed_sleeping_wait_for(util::FunctionRef<bool()> condition, std::chrono::milliseconds max_ms,
std::chrono::milliseconds sleep_ms)
{
const auto wait_start = std::chrono::steady_clock::now();
while (!condition()) {
if (std::chrono::steady_clock::now() - wait_start > max_ms) {
throw std::runtime_error(util::format("timed_sleeping_wait_for exceeded %1 ms", max_ms.count()));
}
std::this_thread::sleep_for(sleep_ms);
}
}

} // namespace realm
12 changes: 10 additions & 2 deletions test/object-store/util/test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
#ifndef REALM_TEST_UTILS_HPP
#define REALM_TEST_UTILS_HPP

#include <catch2/catch_all.hpp>
#include <catch2/matchers/catch_matchers_all.hpp>
#include "util/event_loop.hpp"

#include <realm/util/file.hpp>
#include <realm/util/optional.hpp>

#include <catch2/catch_all.hpp>
#include <catch2/matchers/catch_matchers_all.hpp>
#include <functional>
#include <filesystem>
namespace fs = std::filesystem;
Expand Down Expand Up @@ -144,6 +146,12 @@ bool chmod_supported(const std::string& path);
int get_permissions(const std::string& path);
void chmod(const std::string& path, int permissions);

void timed_wait_for(util::FunctionRef<bool()> condition,
std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000));

void timed_sleeping_wait_for(util::FunctionRef<bool()> condition,
std::chrono::milliseconds max_ms = std::chrono::seconds(30),
std::chrono::milliseconds sleep_ms = std::chrono::milliseconds(1));
} // namespace realm

#define REQUIRE_DIR_EXISTS(macro_path) \
Expand Down

0 comments on commit ef2145d

Please sign in to comment.