Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract some duplicated code for sync triggers and timers #7912

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
-----------

### Internals
* None.
* Refactor the implementation of sync triggers and timers to eliminate some duplicated code. ([PR #7912](https://github.com/realm/realm-core/pull/7912))

----------------------------------------------

Expand Down
23 changes: 7 additions & 16 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,9 @@ void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
return;
}

REALM_ASSERT(m_actualize_and_finalize);
m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
}
m_actualize_and_finalize->trigger();
m_actualize_and_finalize.trigger();
}


Expand All @@ -506,7 +505,6 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrappe
// Thread safety required.
{
util::CheckedLockGuard lock{m_mutex};
REALM_ASSERT(m_actualize_and_finalize);
// The wrapper may have already been finalized before being abandoned
// if we were stopped when it was created.
if (wrapper->mark_abandoned())
Expand All @@ -522,7 +520,7 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrappe
}
m_abandoned_session_wrappers.push(std::move(wrapper));
}
m_actualize_and_finalize->trigger();
m_actualize_and_finalize.trigger();
}


Expand Down Expand Up @@ -1731,23 +1729,12 @@ ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ide
, m_ssl_verify_callback{std::move(ssl_verify_callback)} // DEPRECATED
, m_proxy_config{std::move(proxy_config)} // DEPRECATED
, m_reconnect_info{reconnect_info}
, m_on_idle{m_client, &Connection::on_idle, this}
, m_ident{ident}
, m_server_endpoint{std::move(endpoint)}
, m_authorization_header_name{authorization_header_name} // DEPRECATED
, m_custom_http_headers{custom_http_headers} // DEPRECATED
{
m_on_idle = m_client.create_trigger([this](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

REALM_ASSERT(m_activated);
if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
on_idle(); // Throws
// Connection object may be destroyed now.
}
});
}

inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
Expand Down Expand Up @@ -1779,6 +1766,10 @@ void ClientImpl::Connection::resume_active_sessions()

void ClientImpl::Connection::on_idle()
{
REALM_ASSERT(m_activated);
if (m_state != ConnectionState::disconnected || m_num_active_sessions != 0)
return;

logger.debug(util::LogCategory::session, "Destroying connection object");
ClientImpl& client = get_client();
client.remove_connection(*this);
Expand Down
104 changes: 26 additions & 78 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,8 @@ ClientImpl::ClientImpl(ClientConfig config)
, m_fix_up_object_ids{config.fix_up_object_ids}
, m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
, m_socket_provider{std::move(config.socket_provider)}
, m_client_protocol{} // Throws
, m_one_connection_per_session{config.one_connection_per_session}
, m_random{}
, m_actualize_and_finalize{*this, &ClientImpl::actualize_and_finalize_session_wrappers, this}
{
// FIXME: Would be better if seeding was up to the application.
util::seed_prng_nondeterministically(m_random); // Throws
Expand Down Expand Up @@ -213,14 +212,6 @@ ClientImpl::ClientImpl(ClientConfig config)
logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
"never do this in production");
}

m_actualize_and_finalize = create_trigger([this](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);
actualize_and_finalize_session_wrappers(); // Throws
});
}

void ClientImpl::incr_outstanding_posts()
Expand Down Expand Up @@ -290,25 +281,23 @@ void ClientImpl::drain_connections()


SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
SyncSocketProvider::FunctionHandler&& handler)
util::UniqueFunction<void()>&& handler)
{
REALM_ASSERT(m_socket_provider);
incr_outstanding_posts();
return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
auto decr_guard = util::make_scope_exit([&]() noexcept {
ScopeExit decr_guard([&]() noexcept {
decr_outstanding_posts();
});
handler(status);
if (status == ErrorCodes::OperationAborted)
return;
if (!status.is_ok())
throw Exception(status);
handler();
});
}


ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
{
REALM_ASSERT(m_socket_provider);
return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
}

Connection::~Connection()
{
if (m_websocket_sentinel) {
Expand All @@ -319,10 +308,9 @@ Connection::~Connection()

void Connection::activate()
{
REALM_ASSERT(m_on_idle);
m_activated = true;
if (m_num_active_sessions == 0)
m_on_idle->trigger();
m_on_idle.trigger();
// We cannot in general connect immediately, because a prior failure to
// connect may require a delay before reconnecting (see `m_reconnect_info`).
initiate_reconnect_wait(); // Throws
Expand Down Expand Up @@ -364,7 +352,7 @@ void Connection::initiate_session_deactivation(Session* sess)
}
if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
if (m_activated && m_state == ConnectionState::disconnected)
m_on_idle->trigger();
m_on_idle.trigger();
}
}

Expand Down Expand Up @@ -679,22 +667,14 @@ void Connection::initiate_reconnect_wait()
// We create a timer for the reconnect_disconnect timer even if the delay is zero because
// we need it to be cancelable in case the connection is terminated before the timer
// callback is run.
m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
// If the operation is aborted, the connection object may have been
// destroyed.
if (status != ErrorCodes::OperationAborted)
handle_reconnect_wait(status); // Throws
}); // Throws
m_reconnect_disconnect_timer = m_client.create_timer(delay, [this] {
handle_reconnect_wait(); // Throws
}); // Throws
}


void Connection::handle_reconnect_wait(Status status)
void Connection::handle_reconnect_wait()
{
if (!status.is_ok()) {
REALM_ASSERT(status != ErrorCodes::OperationAborted);
throw Exception(status);
}

REALM_ASSERT(m_reconnect_delay_in_progress);
m_reconnect_delay_in_progress = false;

Expand Down Expand Up @@ -806,24 +786,15 @@ void Connection::initiate_connect_wait()
// fully establish the connection (including SSL and WebSocket
// handshakes). Without such a watchdog, connect operations could take very
// long, or even indefinite time.
milliseconds_type time = m_client.m_connect_timeout;

m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
// If the operation is aborted, the connection object may have been
// destroyed.
if (status != ErrorCodes::OperationAborted)
handle_connect_wait(status); // Throws
}); // Throws
std::chrono::milliseconds time(m_client.m_connect_timeout);
m_connect_timer = m_client.create_timer(time, [this] {
handle_connect_wait(); // Throws
}); // Throws
}


void Connection::handle_connect_wait(Status status)
void Connection::handle_connect_wait()
{
if (!status.is_ok()) {
REALM_ASSERT(status != ErrorCodes::OperationAborted);
throw Exception(status);
}

REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
logger.info("Connect timeout"); // Throws
SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
Expand Down Expand Up @@ -917,12 +888,7 @@ void Connection::initiate_ping_delay(milliseconds_type now)

m_ping_delay_in_progress = true;

m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this] {
handle_ping_delay(); // Throws
}); // Throws
logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
Expand Down Expand Up @@ -952,12 +918,7 @@ void Connection::initiate_pong_timeout()
m_pong_wait_started_at = monotonic_clock_now();

milliseconds_type time = m_client.m_pong_keepalive_timeout;
m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] {
handle_pong_timeout(); // Throws
}); // Throws
}
Expand Down Expand Up @@ -1108,23 +1069,15 @@ void Connection::initiate_disconnect_wait()

milliseconds_type time = m_client.m_connection_linger_time;

m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
// If the operation is aborted, the connection object may have been
// destroyed.
if (status != ErrorCodes::OperationAborted)
handle_disconnect_wait(status); // Throws
}); // Throws
m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] {
handle_disconnect_wait(); // Throws
}); // Throws
m_disconnect_delay_in_progress = true;
}


void Connection::handle_disconnect_wait(Status status)
void Connection::handle_disconnect_wait()
{
if (!status.is_ok()) {
REALM_ASSERT(status != ErrorCodes::OperationAborted);
throw Exception(status);
}

m_disconnect_delay_in_progress = false;

REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
Expand Down Expand Up @@ -2649,12 +2602,7 @@ void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
try_again_interval = std::chrono::milliseconds{1000};
}
logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this] {
m_try_again_activation_timer.reset();
cancel_resumption_delay();
});
Expand Down
15 changes: 6 additions & 9 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,8 @@ class ClientImpl {
void post(SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex);
void post(util::UniqueFunction<void()>&& handler) REQUIRES(!m_drain_mutex);
SyncSocketProvider::SyncTimer create_timer(std::chrono::milliseconds delay,
SyncSocketProvider::FunctionHandler&& handler)
REQUIRES(!m_drain_mutex);
using SyncTrigger = std::unique_ptr<Trigger<ClientImpl>>;
SyncTrigger create_trigger(SyncSocketProvider::FunctionHandler&& handler);
util::UniqueFunction<void()>&& handler) REQUIRES(!m_drain_mutex);
using SyncTrigger = Trigger<ClientImpl>;

RandomEngine& get_random() noexcept;

Expand Down Expand Up @@ -534,10 +532,10 @@ class ClientImpl::Connection {
std::string get_http_request_path() const;

void initiate_reconnect_wait();
void handle_reconnect_wait(Status status);
void handle_reconnect_wait();
void initiate_reconnect();
void initiate_connect_wait();
void handle_connect_wait(Status status);
void handle_connect_wait();

void handle_connection_established();
void schedule_urgent_ping();
Expand All @@ -553,7 +551,7 @@ class ClientImpl::Connection {
void handle_write_ping();
void handle_message_received(util::Span<const char> data);
void initiate_disconnect_wait();
void handle_disconnect_wait(Status status);
void handle_disconnect_wait();
void close_due_to_protocol_error(Status status);
void close_due_to_client_side_error(Status, IsFatal is_fatal, ConnectionTerminationReason reason);
void close_due_to_transient_error(Status status, ConnectionTerminationReason reason);
Expand Down Expand Up @@ -1227,12 +1225,11 @@ inline void ClientImpl::Connection::involuntary_disconnect(const SessionErrorInf

inline void ClientImpl::Connection::change_state_to_disconnected() noexcept
{
REALM_ASSERT(m_on_idle);
REALM_ASSERT(m_state != ConnectionState::disconnected);
m_state = ConnectionState::disconnected;

if (m_num_active_sessions == 0)
m_on_idle->trigger();
m_on_idle.trigger();

REALM_ASSERT(!m_reconnect_delay_in_progress);
if (m_disconnect_delay_in_progress) {
Expand Down
Loading
Loading