Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid blocking in Transaction::close() when there's a cancelled async write when possible #6413

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Changing the log level on the fly would not affect the core level log output ([#6440](https://github.com/realm/realm-core/issues/6440), since 13.7.0)
* `SyncManager::immediately_run_file_actions()` no longer ignores the result of trying to remove a realm. This could have resulted in a client reset action being reported as successful when it actually failed on windows if the `Realm` was still open ([#6050](https://github.com/realm/realm-core/issues/6050)).
* Fix a data race in `DB::VersionManager`. If one thread committed a write transaction which increased the number of live versions above the previous highest seen during the current session at the same time as another thread began a read, the reading thread could read from a no-longer-valid memory mapping ([PR #6411](https://github.com/realm/realm-core/pull/6411), since v13.0.0).
* Fix a deadlock when closing a Transaction with a cancelled asynchronous write scheduled while another Transaction holds the write lock ([PR #6413](https://github.com/realm/realm-core/pull/6413), since v11.10.0).

### Breaking changes
* None.
Expand Down
189 changes: 121 additions & 68 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1750,32 +1750,6 @@ void DB::release_all_read_locks() noexcept
REALM_ASSERT(m_transaction_count == 0);
}

// Note: close() and close_internal() may be called from the DB::~DB().
// in that case, they will not throw. Throwing can only happen if called
// directly.
void DB::close(bool allow_open_read_transactions)
{
// make helper thread(s) terminate
m_commit_helper.reset();

if (m_fake_read_lock_if_immutable) {
if (!is_attached())
return;
{
CheckedLockGuard local_lock(m_mutex);
if (!allow_open_read_transactions && m_transaction_count)
throw WrongTransactionState("Closing with open read transactions");
}
if (m_alloc.is_attached())
m_alloc.detach();
m_fake_read_lock_if_immutable.reset();
}
else {
close_internal(std::unique_lock<InterprocessMutex>(m_controlmutex, std::defer_lock),
allow_open_read_transactions);
}
}

void DB::close_internal(std::unique_lock<InterprocessMutex> lock, bool allow_open_read_transactions)
{
if (!is_attached())
Expand Down Expand Up @@ -1848,18 +1822,23 @@ bool DB::other_writers_waiting_for_lock() const

uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed);
uint32_t next_served = info->next_served.load(std::memory_order_relaxed);
// When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and
// When holding the write lock, next_ticket = next_served + 1, hence, if the difference between 'next_ticket' and
// 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock.
return next_ticket > next_served + 1;
}

class DB::AsyncCommitHelper {
class DB::AsyncCommitHelper : public std::enable_shared_from_this<DB::AsyncCommitHelper> {
public:
AsyncCommitHelper(DB* db)
: m_db(db)
{
}
~AsyncCommitHelper()
{
close();
}

void close()
{
{
std::unique_lock lg(m_mutex);
Expand All @@ -1869,10 +1848,19 @@ class DB::AsyncCommitHelper {
m_running = false;
m_cv_worker.notify_one();
}
m_thread.join();

// The last reference to the DB may have been in a callback being invoked
// by the worker thread. If so, we'll be called from within the work
// loop, and have to just unwind rather than joining.
if (m_thread.get_id() != std::this_thread::get_id()) {
m_thread.join();
}
else {
m_thread.detach();
}
}

void begin_write(util::UniqueFunction<void()> fn)
void begin_write(util::UniqueFunction<bool()> fn)
{
std::unique_lock lg(m_mutex);
start_thread();
Expand Down Expand Up @@ -1985,7 +1973,7 @@ class DB::AsyncCommitHelper {
std::mutex m_mutex;
std::condition_variable m_cv_worker;
std::condition_variable m_cv_callers;
std::deque<util::UniqueFunction<void()>> m_pending_writes;
std::deque<util::UniqueFunction<bool()>> m_pending_writes;
util::UniqueFunction<void()> m_pending_sync;
size_t m_write_lock_claim_ticket = 0;
size_t m_write_lock_claim_fulfilled = 0;
Expand All @@ -2003,7 +1991,9 @@ class DB::AsyncCommitHelper {
return;
}
m_running = true;
m_thread = std::thread([this]() {
// Capture a strong reference to ourself for as long as the thread is
// running as the thread can outlive the parent DB.
m_thread = std::thread([this, self = shared_from_this()]() {
main();
});
}
Expand All @@ -2016,6 +2006,11 @@ class DB::AsyncCommitHelper {

void DB::AsyncCommitHelper::main()
{
// While we own the write mutex we hold a strong reference onto the DB
// because DB doesn't allow closing mid-write. Note that this is
// intentionally declared before the lock as we have to release this while
// not holding the lock to avoid deadlocks.
DBRef dbref;
std::unique_lock lg(m_mutex);
while (m_running) {
#if 0 // Enable for testing purposes
Expand All @@ -2041,6 +2036,7 @@ void DB::AsyncCommitHelper::main()
m_owns_write_mutex = false;

lg.unlock();
dbref.reset();
m_cv_callers.notify_all();
lg.lock();
continue;
Expand All @@ -2055,40 +2051,56 @@ void DB::AsyncCommitHelper::main()
// deadlock if they ask us to perform the sync.
if (!m_waiting_for_write_mutex && has_pending_write_requests()) {
lg.unlock();
dbref = m_db->shared_from_this();
m_db->do_begin_write();
lg.lock();

REALM_ASSERT(!m_has_write_mutex);
m_has_write_mutex = true;
m_owns_write_mutex = true;

// Synchronous transaction requests get priority over async
// Synchronous transaction requests get priority over async, so
// let the next pending synchronous transaction have the lock
// if there are any
if (m_write_lock_claim_fulfilled < m_write_lock_claim_ticket) {
++m_write_lock_claim_fulfilled;
m_cv_callers.notify_all();
continue;
}

REALM_ASSERT(!m_pending_writes.empty());
auto callback = std::move(m_pending_writes.front());
m_pending_writes.pop_front();
lg.unlock();
callback();
// Release things captured by the callback before reacquiring the lock
callback = nullptr;
lg.lock();
bool did_use_lock = false;
while (!m_pending_writes.empty()) {
auto callback = std::move(m_pending_writes.front());
m_pending_writes.pop_front();
lg.unlock();
did_use_lock = callback();
// Release things captured by the callback before reacquiring the lock
callback = nullptr;
lg.lock();
if (did_use_lock) {
break;
}
}

if (!did_use_lock) {
// All of the queued writes were cancelled, so just go ahead
// and release the write lock
m_pending_mx_release = true;
}
continue;
}
}
m_cv_worker.wait(lg);
}
if (m_has_write_mutex && m_owns_write_mutex) {
REALM_ASSERT(dbref);
m_db->do_end_write();
}
}


void DB::async_begin_write(util::UniqueFunction<void()> fn)
void DB::async_begin_write(util::UniqueFunction<bool()> fn)
{
REALM_ASSERT(m_commit_helper);
m_commit_helper->begin_write(std::move(fn));
Expand All @@ -2106,6 +2118,36 @@ void DB::async_sync_to_disk(util::UniqueFunction<void()> fn)
m_commit_helper->sync_to_disk(std::move(fn));
}

// Note: close() and close_internal() may be called from the DB::~DB().
// in that case, they will not throw. Throwing can only happen if called
// directly.
void DB::close(bool allow_open_read_transactions)
{
// make helper thread terminate
if (m_commit_helper) {
m_commit_helper->close();
m_commit_helper.reset();
}

if (m_fake_read_lock_if_immutable) {
if (!is_attached())
return;
{
CheckedLockGuard local_lock(m_mutex);
if (!allow_open_read_transactions && m_transaction_count)
throw WrongTransactionState("Closing with open read transactions");
}
if (m_alloc.is_attached())
m_alloc.detach();
m_fake_read_lock_if_immutable.reset();
}
else {
close_internal(std::unique_lock<InterprocessMutex>(m_controlmutex, std::defer_lock),
allow_open_read_transactions);
}
}


bool DB::has_changed(TransactionRef& tr)
{
if (m_fake_read_lock_if_immutable)
Expand Down Expand Up @@ -2687,49 +2729,60 @@ TransactionRef DB::start_write(bool nonblocking)
return tr;
}

void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
void DB::async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
{
REALM_ASSERT(tr && tr->db.get() == this);
std::chrono::steady_clock::time_point request_start;
{
util::CheckedLockGuard lck(tr->m_async_mutex);
REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle);
tr->m_async_stage = Transaction::AsyncState::Requesting;
tr->m_request_time_point = std::chrono::steady_clock::now();
if (tr->db->m_logger) {
tr->db->m_logger->log(util::Logger::Level::trace, "Async request write lock");
if (m_logger) {
request_start = std::chrono::steady_clock::now();
m_logger->log(util::Logger::Level::trace, "Async request write lock");
}
}
std::weak_ptr<Transaction> weak_tr = tr;
async_begin_write([weak_tr, cb = std::move(when_acquired)]() {
if (auto tr = weak_tr.lock()) {
util::CheckedLockGuard lck(tr->m_async_mutex);
// If a synchronous transaction happened while we were pending
// we may be in HasCommits
if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
tr->m_async_stage = Transaction::AsyncState::HasLock;
}
if (tr->db->m_logger) {
auto t2 = std::chrono::steady_clock::now();
tr->db->m_logger->log(
util::Logger::Level::trace, "Got write lock in %1 us",
std::chrono::duration_cast<std::chrono::microseconds>(t2 - tr->m_request_time_point).count());
}
if (tr->m_waiting_for_write_lock) {
tr->m_waiting_for_write_lock = false;
tr->m_async_cv.notify_one();
}
else if (cb) {
cb();
}
tr.reset(); // Release pointer while lock is held
async_begin_write([this, weak_tr, cb = std::move(when_acquired), request_start]() {
// Check if the Transaction is still valid and still wants the write
// lock. If may no longer be if it was closed while waiting for the lock,
// in which case we'll either move onto the next waiting Transaction or
// just release the lock
auto tr = weak_tr.lock();
if (!tr) {
return false;
}
util::CheckedLockGuard lck(tr->m_async_mutex);
if (tr->m_async_stage == Transaction::AsyncState::Idle) {
return false;
}

// If a synchronous transaction happened while we were pending
// we may be in HasCommits
if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
tr->m_async_stage = Transaction::AsyncState::HasLock;
}
if (m_logger) {
auto t2 = std::chrono::steady_clock::now();
m_logger->log(util::Logger::Level::trace, "Got write lock in %1 us",
std::chrono::duration_cast<std::chrono::microseconds>(t2 - request_start).count());
}
if (tr->m_waiting_for_write_lock) {
tr->m_waiting_for_write_lock = false;
tr->m_async_cv.notify_one();
}
else if (cb) {
cb();
}
return true;
});
}

inline DB::DB(const DBOptions& options)
: m_upgrade_callback(std::move(options.upgrade_callback))
{
if (options.enable_async_writes) {
m_commit_helper = std::make_unique<AsyncCommitHelper>(this);
m_commit_helper = std::make_shared<AsyncCommitHelper>(this);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class DB : public std::enable_shared_from_this<DB> {
// ask for write mutex. Callback takes place when mutex has been acquired.
// callback may occur on ANOTHER THREAD. Must not be called if write mutex
// has already been acquired.
void async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired);
void async_request_write_mutex(const TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired);

// report statistics of last commit done on THIS DB.
// The free space reported is what can be expected to be freed
Expand Down Expand Up @@ -500,7 +500,7 @@ class DB : public std::enable_shared_from_this<DB> {
util::InterprocessCondVar m_pick_next_writer;
std::function<void(int, int)> m_upgrade_callback;
std::shared_ptr<metrics::Metrics> m_metrics;
std::unique_ptr<AsyncCommitHelper> m_commit_helper;
std::shared_ptr<AsyncCommitHelper> m_commit_helper;
std::shared_ptr<util::Logger> m_logger;
bool m_is_sync_agent = false;

Expand Down Expand Up @@ -612,7 +612,7 @@ class DB : public std::enable_shared_from_this<DB> {
void close_internal(std::unique_lock<util::InterprocessMutex>, bool allow_open_read_transactions)
REQUIRES(!m_mutex);

void async_begin_write(util::UniqueFunction<void()> fn);
void async_begin_write(util::UniqueFunction<bool()> fn);
void async_end_write();
void async_sync_to_disk(util::UniqueFunction<void()> fn);

Expand Down
10 changes: 1 addition & 9 deletions src/realm/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,15 +766,7 @@ void Transaction::prepare_for_close()
break;

case AsyncState::Requesting:
// We don't have the ability to cancel a wait on the write lock, so
// unfortunately we have to wait for it to be acquired.
REALM_ASSERT(m_transact_stage == DB::transact_Reading);
REALM_ASSERT(!m_oldest_version_not_persisted);
m_waiting_for_write_lock = true;
m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
return !m_waiting_for_write_lock;
});
db->end_write_on_correct_thread();
m_async_stage = AsyncState::Idle;
break;

case AsyncState::HasLock:
Expand Down
1 change: 0 additions & 1 deletion src/realm/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ class Transaction : public Group {
util::CheckedMutex m_async_mutex;
std::condition_variable m_async_cv GUARDED_BY(m_async_mutex);
AsyncState m_async_stage GUARDED_BY(m_async_mutex) = AsyncState::Idle;
std::chrono::steady_clock::time_point m_request_time_point;
bool m_waiting_for_write_lock GUARDED_BY(m_async_mutex) = false;
bool m_waiting_for_sync GUARDED_BY(m_async_mutex) = false;

Expand Down
Loading