From dfa5f3344bfb8e0dc9369a2b980a401b31f56231 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Mon, 29 Apr 2024 16:52:04 -0700 Subject: [PATCH] Extract some duplicated code for sync triggers and timers --- CHANGELOG.md | 2 +- src/realm/sync/client.cpp | 23 ++--- src/realm/sync/noinst/client_impl_base.cpp | 104 ++++++--------------- src/realm/sync/noinst/client_impl_base.hpp | 15 ++- src/realm/sync/noinst/server/server.cpp | 23 ++--- src/realm/sync/trigger.hpp | 74 +++++++++------ test/test_util_network.cpp | 22 ++--- 7 files changed, 101 insertions(+), 162 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b24470d2676..cc54feb8822 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ ----------- ### Internals -* None. +* Refactor the implementation of sync triggers and timers to eliminate some duplicated code. ([PR #7912](https://github.com/realm/realm-core/pull/7912)) ---------------------------------------------- diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 214a938969c..c9b78d3eb6f 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -494,10 +494,9 @@ void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper) return; } - REALM_ASSERT(m_actualize_and_finalize); m_unactualized_session_wrappers.push(util::bind_ptr(wrapper)); } - m_actualize_and_finalize->trigger(); + m_actualize_and_finalize.trigger(); } @@ -506,7 +505,6 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptrmark_abandoned()) @@ -522,7 +520,7 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptrtrigger(); + m_actualize_and_finalize.trigger(); } @@ -1731,23 +1729,12 @@ ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ide , m_ssl_verify_callback{std::move(ssl_verify_callback)} // DEPRECATED , m_proxy_config{std::move(proxy_config)} // DEPRECATED , m_reconnect_info{reconnect_info} + , m_on_idle{m_client, &Connection::on_idle, this} , m_ident{ident} , m_server_endpoint{std::move(endpoint)} , m_authorization_header_name{authorization_header_name} // DEPRECATED , m_custom_http_headers{custom_http_headers} // DEPRECATED { - m_on_idle = m_client.create_trigger([this](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - - REALM_ASSERT(m_activated); - if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) { - on_idle(); // Throws - // Connection object may be destroyed now. - } - }); } inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept @@ -1779,6 +1766,10 @@ void ClientImpl::Connection::resume_active_sessions() void ClientImpl::Connection::on_idle() { + REALM_ASSERT(m_activated); + if (m_state != ConnectionState::disconnected || m_num_active_sessions != 0) + return; + logger.debug(util::LogCategory::session, "Destroying connection object"); ClientImpl& client = get_client(); client.remove_connection(*this); diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index eb57eafd45c..301486e6768 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -149,9 +149,8 @@ ClientImpl::ClientImpl(ClientConfig config) , m_fix_up_object_ids{config.fix_up_object_ids} , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)} , m_socket_provider{std::move(config.socket_provider)} - , m_client_protocol{} // Throws , m_one_connection_per_session{config.one_connection_per_session} - , m_random{} + , m_actualize_and_finalize{*this, &ClientImpl::actualize_and_finalize_session_wrappers, this} { // FIXME: Would be better if seeding was up to the application. util::seed_prng_nondeterministically(m_random); // Throws @@ -213,14 +212,6 @@ ClientImpl::ClientImpl(ClientConfig config) logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - " "never do this in production"); } - - m_actualize_and_finalize = create_trigger([this](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - actualize_and_finalize_session_wrappers(); // Throws - }); } void ClientImpl::incr_outstanding_posts() @@ -290,25 +281,23 @@ void ClientImpl::drain_connections() SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay, - SyncSocketProvider::FunctionHandler&& handler) + util::UniqueFunction&& handler) { REALM_ASSERT(m_socket_provider); incr_outstanding_posts(); return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) { - auto decr_guard = util::make_scope_exit([&]() noexcept { + ScopeExit decr_guard([&]() noexcept { decr_outstanding_posts(); }); - handler(status); + if (status == ErrorCodes::OperationAborted) + return; + if (!status.is_ok()) + throw Exception(status); + handler(); }); } -ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler) -{ - REALM_ASSERT(m_socket_provider); - return std::make_unique>(this, std::move(handler)); -} - Connection::~Connection() { if (m_websocket_sentinel) { @@ -319,10 +308,9 @@ Connection::~Connection() void Connection::activate() { - REALM_ASSERT(m_on_idle); m_activated = true; if (m_num_active_sessions == 0) - m_on_idle->trigger(); + m_on_idle.trigger(); // We cannot in general connect immediately, because a prior failure to // connect may require a delay before reconnecting (see `m_reconnect_info`). initiate_reconnect_wait(); // Throws @@ -364,7 +352,7 @@ void Connection::initiate_session_deactivation(Session* sess) } if (REALM_UNLIKELY(--m_num_active_sessions == 0)) { if (m_activated && m_state == ConnectionState::disconnected) - m_on_idle->trigger(); + m_on_idle.trigger(); } } @@ -679,22 +667,14 @@ void Connection::initiate_reconnect_wait() // We create a timer for the reconnect_disconnect timer even if the delay is zero because // we need it to be cancelable in case the connection is terminated before the timer // callback is run. - m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) { - // If the operation is aborted, the connection object may have been - // destroyed. - if (status != ErrorCodes::OperationAborted) - handle_reconnect_wait(status); // Throws - }); // Throws + m_reconnect_disconnect_timer = m_client.create_timer(delay, [this] { + handle_reconnect_wait(); // Throws + }); // Throws } -void Connection::handle_reconnect_wait(Status status) +void Connection::handle_reconnect_wait() { - if (!status.is_ok()) { - REALM_ASSERT(status != ErrorCodes::OperationAborted); - throw Exception(status); - } - REALM_ASSERT(m_reconnect_delay_in_progress); m_reconnect_delay_in_progress = false; @@ -806,24 +786,15 @@ void Connection::initiate_connect_wait() // fully establish the connection (including SSL and WebSocket // handshakes). Without such a watchdog, connect operations could take very // long, or even indefinite time. - milliseconds_type time = m_client.m_connect_timeout; - - m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) { - // If the operation is aborted, the connection object may have been - // destroyed. - if (status != ErrorCodes::OperationAborted) - handle_connect_wait(status); // Throws - }); // Throws + std::chrono::milliseconds time(m_client.m_connect_timeout); + m_connect_timer = m_client.create_timer(time, [this] { + handle_connect_wait(); // Throws + }); // Throws } -void Connection::handle_connect_wait(Status status) +void Connection::handle_connect_wait() { - if (!status.is_ok()) { - REALM_ASSERT(status != ErrorCodes::OperationAborted); - throw Exception(status); - } - REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state); logger.info("Connect timeout"); // Throws SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"}, @@ -917,12 +888,7 @@ void Connection::initiate_ping_delay(milliseconds_type now) m_ping_delay_in_progress = true; - m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - + m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this] { handle_ping_delay(); // Throws }); // Throws logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws @@ -952,12 +918,7 @@ void Connection::initiate_pong_timeout() m_pong_wait_started_at = monotonic_clock_now(); milliseconds_type time = m_client.m_pong_keepalive_timeout; - m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - + m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] { handle_pong_timeout(); // Throws }); // Throws } @@ -1108,23 +1069,15 @@ void Connection::initiate_disconnect_wait() milliseconds_type time = m_client.m_connection_linger_time; - m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) { - // If the operation is aborted, the connection object may have been - // destroyed. - if (status != ErrorCodes::OperationAborted) - handle_disconnect_wait(status); // Throws - }); // Throws + m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] { + handle_disconnect_wait(); // Throws + }); // Throws m_disconnect_delay_in_progress = true; } -void Connection::handle_disconnect_wait(Status status) +void Connection::handle_disconnect_wait() { - if (!status.is_ok()) { - REALM_ASSERT(status != ErrorCodes::OperationAborted); - throw Exception(status); - } - m_disconnect_delay_in_progress = false; REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state); @@ -2649,12 +2602,7 @@ void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info) try_again_interval = std::chrono::milliseconds{1000}; } logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count()); - m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - + m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this] { m_try_again_activation_timer.reset(); cancel_resumption_delay(); }); diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index c6dd8f97ca3..c41919bc88a 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -231,10 +231,8 @@ class ClientImpl { void post(SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex); void post(util::UniqueFunction&& handler) REQUIRES(!m_drain_mutex); SyncSocketProvider::SyncTimer create_timer(std::chrono::milliseconds delay, - SyncSocketProvider::FunctionHandler&& handler) - REQUIRES(!m_drain_mutex); - using SyncTrigger = std::unique_ptr>; - SyncTrigger create_trigger(SyncSocketProvider::FunctionHandler&& handler); + util::UniqueFunction&& handler) REQUIRES(!m_drain_mutex); + using SyncTrigger = Trigger; RandomEngine& get_random() noexcept; @@ -534,10 +532,10 @@ class ClientImpl::Connection { std::string get_http_request_path() const; void initiate_reconnect_wait(); - void handle_reconnect_wait(Status status); + void handle_reconnect_wait(); void initiate_reconnect(); void initiate_connect_wait(); - void handle_connect_wait(Status status); + void handle_connect_wait(); void handle_connection_established(); void schedule_urgent_ping(); @@ -553,7 +551,7 @@ class ClientImpl::Connection { void handle_write_ping(); void handle_message_received(util::Span data); void initiate_disconnect_wait(); - void handle_disconnect_wait(Status status); + void handle_disconnect_wait(); void close_due_to_protocol_error(Status status); void close_due_to_client_side_error(Status, IsFatal is_fatal, ConnectionTerminationReason reason); void close_due_to_transient_error(Status status, ConnectionTerminationReason reason); @@ -1227,12 +1225,11 @@ inline void ClientImpl::Connection::involuntary_disconnect(const SessionErrorInf inline void ClientImpl::Connection::change_state_to_disconnected() noexcept { - REALM_ASSERT(m_on_idle); REALM_ASSERT(m_state != ConnectionState::disconnected); m_state = ConnectionState::disconnected; if (m_num_active_sessions == 0) - m_on_idle->trigger(); + m_on_idle.trigger(); REALM_ASSERT(!m_reconnect_delay_in_progress); if (m_disconnect_delay_in_progress) { diff --git a/src/realm/sync/noinst/server/server.cpp b/src/realm/sync/noinst/server/server.cpp index 2aa26b47209..148589948dd 100644 --- a/src/realm/sync/noinst/server/server.cpp +++ b/src/realm/sync/noinst/server/server.cpp @@ -1093,19 +1093,11 @@ class SyncConnection : public websocket::Config { , m_client_user_agent{std::move(client_user_agent)} , m_remote_endpoint{std::move(remote_endpoint)} , m_appservices_request_id{std::move(appservices_request_id)} + , m_send_trigger{m_server.get_service(), &SyncConnection::send_next_message, this} { // Make the output buffer stream throw std::bad_alloc if it fails to // expand the buffer m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit); - - network::Service& service = m_server.get_service(); - auto handler = [this](Status status) { - if (!status.is_ok()) - return; - if (!m_is_sending) - send_next_message(); // Throws - }; - m_send_trigger = std::make_unique>(&service, std::move(handler)); // Throws } ~SyncConnection() noexcept; @@ -1347,7 +1339,7 @@ class SyncConnection : public websocket::Config { bool m_send_pong = false; bool m_sending_pong = false; - std::unique_ptr> m_send_trigger; + Trigger m_send_trigger; milliseconds_type m_last_ping_timestamp = 0; @@ -4228,11 +4220,10 @@ void SyncConnection::terminate_if_dead(SteadyTimePoint now) void SyncConnection::enlist_to_send(Session* sess) noexcept { - REALM_ASSERT(m_send_trigger); REALM_ASSERT(!m_is_closing); REALM_ASSERT(!sess->is_enlisted_to_send()); m_sessions_enlisted_to_send.push_back(sess); - m_send_trigger->trigger(); + m_send_trigger.trigger(); } @@ -4442,7 +4433,7 @@ void SyncConnection::send_log_message(util::Logger::Level level, const std::stri std::lock_guard lock(m_log_mutex); m_log_messages.push(std::move(log_msg)); } - m_send_trigger->trigger(); + m_send_trigger.trigger(); } @@ -4490,7 +4481,8 @@ void SyncConnection::handle_ping_received(const char* data, size_t size) void SyncConnection::send_next_message() { - REALM_ASSERT(!m_is_sending); + if (m_is_sending) + return; REALM_ASSERT(!m_sending_pong); if (m_send_pong) { send_pong(m_last_ping_timestamp); @@ -4681,7 +4673,6 @@ void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_id REALM_ASSERT(is_session_level_error(error_code) == (session_ident != 0)); REALM_ASSERT(!is_session_level_error(error_code)); - REALM_ASSERT(m_send_trigger); REALM_ASSERT(!m_is_closing); m_is_closing = true; @@ -4696,7 +4687,7 @@ void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_id terminate_sessions(); // Throws - m_send_trigger->trigger(); + m_send_trigger.trigger(); } diff --git a/src/realm/sync/trigger.hpp b/src/realm/sync/trigger.hpp index e076f4bd706..483c563719f 100644 --- a/src/realm/sync/trigger.hpp +++ b/src/realm/sync/trigger.hpp @@ -20,7 +20,6 @@ #include #include -#include namespace realm::sync { @@ -40,7 +39,8 @@ namespace realm::sync { template class Trigger final { public: - Trigger(Service* service, SyncSocketProvider::FunctionHandler&& handler); + template + Trigger(Service& service, Handler&& handler, Args&&... args); ~Trigger() noexcept; Trigger() noexcept = delete; @@ -77,67 +77,79 @@ class Trigger final { void trigger(); private: - Service* m_service; + Service& m_service; - struct HandlerInfo : public util::AtomicRefCountBase { + struct HandlerBase : public util::AtomicRefCountBase { enum class State { Idle, Triggered, Destroyed }; + std::mutex mutex; + State state = State::Idle; + virtual void call() = 0; + }; - HandlerInfo(SyncSocketProvider::FunctionHandler&& handler) - : handler(std::move(handler)) - , state(State::Idle) + template + struct HandlerImpl : HandlerBase { + Handler handler; + std::tuple args; + HandlerImpl(Handler&& h, Args&&... a) + : handler(std::forward(h)) + , args(std::forward(a)...) { } - - SyncSocketProvider::FunctionHandler handler; - util::Mutex mutex; - State state; + void call() override + { + std::apply(handler, args); + } }; - util::bind_ptr m_handler_info; + util::bind_ptr m_handler; }; template -inline Trigger::Trigger(Service* service, SyncSocketProvider::FunctionHandler&& handler) +template +inline Trigger::Trigger(Service& service, H&& handler, A&&... args) : m_service(service) - , m_handler_info(new HandlerInfo(std::move(handler))) + , m_handler(new HandlerImpl(std::forward(handler), std::forward(args)...)) { } template inline Trigger::~Trigger() noexcept { - if (m_handler_info) { - util::LockGuard lock{m_handler_info->mutex}; - REALM_ASSERT(m_handler_info->state != HandlerInfo::State::Destroyed); - m_handler_info->state = HandlerInfo::State::Destroyed; + if (m_handler) { + std::lock_guard lock{m_handler->mutex}; + REALM_ASSERT(m_handler->state != HandlerBase::State::Destroyed); + m_handler->state = HandlerBase::State::Destroyed; } } template inline void Trigger::trigger() { - REALM_ASSERT(m_service); - REALM_ASSERT(m_handler_info); + REALM_ASSERT(m_handler); - util::LockGuard lock{m_handler_info->mutex}; - REALM_ASSERT(m_handler_info->state != HandlerInfo::State::Destroyed); + std::lock_guard lock{m_handler->mutex}; + REALM_ASSERT(m_handler->state != HandlerBase::State::Destroyed); - if (m_handler_info->state == HandlerInfo::State::Triggered) { + if (m_handler->state == HandlerBase::State::Triggered) { return; } - m_handler_info->state = HandlerInfo::State::Triggered; + m_handler->state = HandlerBase::State::Triggered; + + m_service.post([handler = util::bind_ptr(m_handler)](Status status) { + if (status == ErrorCodes::OperationAborted) + return; + if (!status.is_ok()) + throw Exception(status); - auto handler = [handler_info = util::bind_ptr(m_handler_info)](Status status) { { - util::LockGuard lock{handler_info->mutex}; + std::lock_guard lock{handler->mutex}; // Do not execute the handler if the Trigger does not exist anymore. - if (handler_info->state == HandlerInfo::State::Destroyed) { + if (handler->state == HandlerBase::State::Destroyed) { return; } - handler_info->state = HandlerInfo::State::Idle; + handler->state = HandlerBase::State::Idle; } - handler_info->handler(status); - }; - m_service->post(std::move(handler)); + handler->call(); + }); } } // namespace realm::sync diff --git a/test/test_util_network.cpp b/test/test_util_network.cpp index d2f7579bf59..8973b0d6cd4 100644 --- a/test/test_util_network.cpp +++ b/test/test_util_network.cpp @@ -1998,10 +1998,10 @@ TEST(Sync_Trigger_Basics) // Check that triggering works bool was_triggered = false; - auto func = [&](realm::Status) { + auto func = [&] { was_triggered = true; }; - Trigger trigger(&service, std::move(func)); + Trigger trigger(service, std::move(func)); trigger.trigger(); service.run(); CHECK(was_triggered); @@ -2020,7 +2020,7 @@ TEST(Sync_Trigger_Basics) // Check that retriggering from triggered function works realm::util::UniqueFunction func_2; - Trigger trigger_2(&service, [&](realm::Status) { + Trigger trigger_2(service, [&] { func_2(); }); was_triggered = false; @@ -2042,10 +2042,10 @@ TEST(Sync_Trigger_Basics) // object was_triggered = false; { - auto func_3 = [&](realm::Status) { + auto func_3 = [&] { was_triggered = true; }; - Trigger trigger_3(&service, std::move(func_3)); + Trigger trigger_3(service, std::move(func_3)); trigger_3.trigger(); } service.run(); @@ -2054,14 +2054,14 @@ TEST(Sync_Trigger_Basics) // Check that two functions can be triggered in an overlapping fashion bool was_triggered_4 = false; bool was_triggered_5 = false; - auto func_4 = [&](realm::Status) { + auto func_4 = [&] { was_triggered_4 = true; }; - auto func_5 = [&](realm::Status) { + auto func_5 = [&] { was_triggered_5 = true; }; - Trigger trigger_4(&service, std::move(func_4)); - Trigger trigger_5(&service, std::move(func_5)); + Trigger trigger_4(service, std::move(func_4)); + Trigger trigger_5(service, std::move(func_5)); trigger_4.trigger(); trigger_5.trigger(); service.run(); @@ -2077,12 +2077,12 @@ TEST(Sync_Trigger_ThreadSafety) keep_alive.async_wait(std::chrono::hours(10000), [](Status) {}); long n_1 = 0, n_2 = 0; std::atomic flag{false}; - auto func = [&](realm::Status) { + auto func = [&] { ++n_1; if (flag) ++n_2; }; - Trigger trigger(&service, std::move(func)); + Trigger trigger(service, std::move(func)); ThreadWrapper thread; thread.start([&] { service.run();