Skip to content

Commit

Permalink
Integration of Suspend/Resume related fixes
Browse files Browse the repository at this point in the history
Mutex and flag changes:

Move endpoints state_ update to be after the shutdown_and_close as this function
forces the state_ to be CLOSED. This could lead to re connections not happening;
Set linger to the tcp and local_tcp server endpoint connections,
as this is not longer done on suspend;
Removed routing_stop_mutex_ (from rmc::stop, rmc::on_net_state_change, rmc::on_message)
if_state_running_ and sd_route_set_ are now atomic;
Removed state_mutex_ and state_ is now atomic.
This was needed as it could cause a lock inversion with the sender_mutex_ and receiver_mutex_;
state_mutex_ was being reused for several other collections, and so the following mutexes were added:
pending_offers_mutex_, requests_mutex_, requests_to_debounce_mutex_,
pending_event_registrations_mutex_, state_condition_mutex_, register_application_timer_mutex_;
pending_sd_offers_mutex_ is now recursive_mutex because it is also needed in the
rmi::set_routing_state, to resolve any offers that were left pending after resuming;
Fix deadlock on rmc::on_net_state_change, which could lock sender_mutex_ and then call
on_disconnect which would lock the same mutex;

Functional changes:

Added function remove_known_client() which erases Known_clients_.
routing_manager_base::remove_local will now call remove_known_client to keep the list updated;
rmi::set_routing_state will now initialize the service info of any remaining pending_sd_offers_.
Before, services that were internally offered while the resume was not ready were not being
offered externally;
Added a new state RS_DELAYED_RESUME. This state will be set instead of RS_RESUMED
if either the if_state_running_ or (when service discovery is enabled) sd_route_set_ are not set.
On state RS_SUSPEND the endpoint_manager_impl::suspend will no longer be called,
as this could lead to sockets getting suck in ACCEPT4 (after resuming) if the ecu suspended
while vsomeip was stil closing sockets;
On state RS_RESUME, in endpoint_manager_impl::resume only the client endpoints will be restarted.
Restarting server endpoints resulted in a service discovery reboot detection being wrongly triggered;
Refactor rmi::on_net_interface_or_route_state_changed, to both simplify the logic and integrated the
new RS_DELAYED_RESUME state;
Created new function rmi::is_external_routing_ready() to simplify checks on if_state_running_
and sd_route_set_.
  • Loading branch information
Rui Graça authored and Duarte Fonseca committed Jan 9, 2025
1 parent 72d7492 commit ade0d62
Show file tree
Hide file tree
Showing 18 changed files with 355 additions and 383 deletions.
27 changes: 14 additions & 13 deletions implementation/endpoints/src/endpoint_manager_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,36 @@ endpoint_manager_base::endpoint_manager_base(
}

std::shared_ptr<endpoint> endpoint_manager_base::create_local(client_t _client) {
std::lock_guard<std::mutex> its_lock(local_endpoint_mutex_);
std::scoped_lock its_lock(local_endpoint_mutex_);
return create_local_unlocked(_client);
}

void endpoint_manager_base::remove_local(const client_t _client) {
std::shared_ptr<endpoint> its_endpoint(find_local(_client));
std::scoped_lock its_lock(local_endpoint_mutex_);
VSOMEIP_INFO << "emb::" << __func__ << ": client " << std::hex << _client;
std::shared_ptr<endpoint> its_endpoint(find_local_unlocked(_client));
if (its_endpoint) {
its_endpoint->register_error_handler(nullptr);
its_endpoint->stop();
VSOMEIP_INFO << "Client [" << std::hex << rm_->get_client() << "] is closing connection to ["
<< std::hex << _client << "]" << " endpoint > " << its_endpoint;
std::lock_guard<std::mutex> its_lock(local_endpoint_mutex_);
VSOMEIP_INFO << "Client [" << std::hex << rm_->get_client()
<< "] is closing connection to [" << std::hex << _client << "]"
<< " endpoint > " << its_endpoint;
local_endpoints_.erase(_client);
}
}

std::shared_ptr<endpoint> endpoint_manager_base::find_or_create_local(client_t _client) {
std::shared_ptr<endpoint> its_endpoint {nullptr};
{
std::scoped_lock its_lock {local_endpoint_mutex_};
its_endpoint = find_local_unlocked(_client);
if (!its_endpoint) {
its_endpoint = create_local_unlocked(_client);
}
std::scoped_lock its_lock {local_endpoint_mutex_};
std::shared_ptr<endpoint> its_endpoint {find_local_unlocked(_client)};
if (!its_endpoint) {
VSOMEIP_INFO << "emb::" << __func__ << ": create_client " << std::hex << _client;
its_endpoint = create_local_unlocked(_client);
}
if (its_endpoint) {
its_endpoint->start();
} else {
VSOMEIP_ERROR << __func__ << ": couldn't find or create endpoint for client " << _client;
VSOMEIP_ERROR << "emb::" << __func__ << ": couldn't find or create endpoint for client "
<< std::hex << _client;
}
return its_endpoint;
}
Expand Down
123 changes: 5 additions & 118 deletions implementation/endpoints/src/endpoint_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_or_create_remote_client(
}
}
if (start_endpoint && its_endpoint && configuration_->is_someip(_service, _instance)
&& rm_->get_routing_state() != routing_state_e::RS_SUSPENDED) {
&& !rm_->is_suspended()) {
its_endpoint->start();
}
return its_endpoint;
Expand All @@ -97,7 +97,7 @@ void endpoint_manager_impl::find_or_create_remote_client(
}
}
const bool is_someip {configuration_->is_someip(_service, _instance)};
const bool is_suspended {rm_->get_routing_state() == routing_state_e::RS_SUSPENDED};
const bool is_suspended {rm_->is_suspended()};

if (start_reliable_endpoint && its_reliable_endpoint && is_someip && !is_suspended) {
its_reliable_endpoint->start();
Expand Down Expand Up @@ -289,7 +289,7 @@ endpoint_manager_impl::create_server_endpoint(uint16_t _port, bool _reliable, bo

if (its_server_endpoint) {
server_endpoints_[_port][_reliable] = its_server_endpoint;
if (rm_->get_routing_state() != routing_state_e::RS_SUSPENDED) {
if (!rm_->is_suspended()) {
its_server_endpoint->start();
}
} else {
Expand Down Expand Up @@ -1395,140 +1395,27 @@ bool endpoint_manager_impl::is_used_endpoint(endpoint* const _endpoint) const {
}

void endpoint_manager_impl::suspend() {
client_endpoints_t its_client_endpoints;
server_endpoints_t its_server_endpoints;

{
std::scoped_lock its_lock {endpoint_mutex_};
its_client_endpoints = client_endpoints_;
its_server_endpoints = server_endpoints_;
}

// stop client endpoints
std::set<std::shared_ptr<client_endpoint>> its_suspended_client_endpoints;
for (const auto& [its_address, ports] : its_client_endpoints) {
for (const auto& [its_port, protocols] : ports) {
for (const auto& [its_protocol, partitions] : protocols) {
for (const auto& [its_partition, its_endpoint] : partitions) {
its_endpoint->stop();
auto its_client_endpoint {
std::dynamic_pointer_cast<client_endpoint>(its_endpoint)};
if (its_client_endpoint) {
its_suspended_client_endpoints.insert(its_client_endpoint);
}
}
}
}
}

// start server endpoints
for (const auto& [its_port, protocols] : its_server_endpoints) {
for (const auto& [its_protocol, its_endpoint] : protocols) {
its_endpoint->stop();
}
}
// check that the clients are established again
size_t its_interval {MIN_ENDPOINT_WAIT_INTERVAL};
size_t its_sum {0};
const size_t its_max {SUM_ENDPOINT_WAIT_INTERVAL};
bool is_done;
do {
is_done = true;
std::this_thread::sleep_for(std::chrono::milliseconds(its_interval));
for (auto& its_endpoint : its_suspended_client_endpoints) {
is_done = is_done && its_endpoint->is_closed();
if (!is_done)
break;
}
if (its_interval < MAX_ENDPOINT_WAIT_INTERVAL) {
its_interval <<= 1;
}
its_sum += its_interval;
} while (!is_done && its_sum < its_max);

if (!is_done) {
for (const auto& its_endpoint : its_suspended_client_endpoints) {
if (!its_endpoint->is_closed()) {
boost::asio::ip::address its_address;
(void)its_endpoint->get_remote_address(its_address);

VSOMEIP_WARNING << "endpoint_manager_impl::" << __func__
<< ": Suspending client port [" << std::dec
<< its_endpoint->get_local_port() << "] --> ["
<< its_address.to_string() << ":" << its_endpoint->get_remote_port()
<< "] failed.";
}
}
}
// do nothing
}

void endpoint_manager_impl::resume() {
client_endpoints_t its_client_endpoints;
server_endpoints_t its_server_endpoints;

{
std::scoped_lock its_lock {endpoint_mutex_};
its_client_endpoints = client_endpoints_;
its_server_endpoints = server_endpoints_;
}

// start server endpoints
for (const auto& [its_port, protocols] : its_server_endpoints) {
for (const auto& [its_protocol, its_endpoint] : protocols) {
its_endpoint->restart();
}
}

// start client endpoints
// restart client endpoints
std::set<std::shared_ptr<client_endpoint>> its_resumed_client_endpoints;
for (const auto& [its_address, ports] : its_client_endpoints) {
for (const auto& [its_port, protocols] : ports) {
for (const auto& [its_protocol, partitions] : protocols) {
for (const auto& [its_partition, its_endpoint] : partitions) {
its_endpoint->restart();
auto its_client_endpoint {
std::dynamic_pointer_cast<client_endpoint>(its_endpoint)};
if (its_client_endpoint) {
its_resumed_client_endpoints.insert(its_client_endpoint);
}
}
}
}
}

// check that the clients are established again
size_t its_interval {MIN_ENDPOINT_WAIT_INTERVAL};
size_t its_sum {0};
const size_t its_max {SUM_ENDPOINT_WAIT_INTERVAL};
bool is_done;
do {
is_done = true;
std::this_thread::sleep_for(std::chrono::milliseconds(its_interval));
for (const auto& its_endpoint : its_resumed_client_endpoints) {
is_done = is_done && its_endpoint->is_established();
if (!is_done)
break;
}
if (its_interval < MAX_ENDPOINT_WAIT_INTERVAL) {
its_interval <<= 1;
}
its_sum += its_interval;
} while (!is_done && its_sum < its_max);

if (!is_done) {
for (const auto& its_endpoint : its_resumed_client_endpoints) {
if (!its_endpoint->is_established()) {
boost::asio::ip::address its_address;
(void)its_endpoint->get_remote_address(its_address);

VSOMEIP_WARNING << "endpoint_manager_impl::" << __func__
<< ": Resuming client port [" << std::dec
<< its_endpoint->get_local_port() << "] --> ["
<< its_address.to_string() << ":" << its_endpoint->get_remote_port()
<< "] failed.";
}
}
}
}

} // namespace vsomeip_v3
44 changes: 22 additions & 22 deletions implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ void local_tcp_client_endpoint_impl::restart(bool _force) {
if (!_force && state_ == cei_state_e::CONNECTING) {
return;
}
state_ = cei_state_e::CONNECTING;
{
std::lock_guard<std::recursive_mutex> its_lock(mutex_);
sending_blocked_ = false;
Expand All @@ -57,6 +56,7 @@ void local_tcp_client_endpoint_impl::restart(bool _force) {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
shutdown_and_close_socket_unlocked(true);
}
state_ = cei_state_e::CONNECTING;
was_not_connected_ = true;
reconnect_counter_ = 0;
start_connect_timer();
Expand Down Expand Up @@ -119,45 +119,45 @@ void local_tcp_client_endpoint_impl::connect() {
if (its_error) {
VSOMEIP_WARNING << "ltcei::connect: couldn't disable "
<< "Nagle algorithm: " << its_error.message()
<< " remote:" << remote_.port()
<< " endpoint > " << this << " state_ > " << static_cast<int>(state_.load());
<< " remote: " << remote_.port() << " endpoint: " << this
<< " state_: " << static_cast<int>(state_.load());
}
socket_->set_option(boost::asio::socket_base::keep_alive(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "ltcei::connect: couldn't enable "
<< "keep_alive: " << its_error.message()
<< " remote:" << remote_.port()
<< " endpoint > " << this << " state_ > " << static_cast<int>(state_.load());
<< "keep_alive: " << its_error.message() << " remote:" << remote_.port()
<< " endpoint > " << this << " state_ > "
<< static_cast<int>(state_.load());
}
// Setting the TIME_WAIT to 0 seconds forces RST to always be sent in reponse to a FIN
// Since this is endpoint for internal communication, setting the TIME_WAIT to 5 seconds
// should be enough to ensure the ACK to the FIN arrives to the server endpoint.
socket_->set_option(boost::asio::socket_base::linger(true, 5), its_error);
if (its_error) {
VSOMEIP_WARNING << "ltcei::connect: couldn't enable "
<< "SO_LINGER: " << its_error.message()
<< " remote:" << remote_.port()
<< " endpoint > " << this << " state_ > " << static_cast<int>(state_.load());
<< "SO_LINGER: " << its_error.message() << " remote:" << remote_.port()
<< " endpoint > " << this << " state_ > "
<< static_cast<int>(state_.load());
}
socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "ltcei::" << __func__
<< ": Cannot enable SO_REUSEADDR" << "(" << its_error.message() << ")"
<< " endpoint > " << this << " state_ > " << static_cast<int>(state_.load());
VSOMEIP_WARNING << "ltcei::" << __func__ << ": Cannot enable SO_REUSEADDR" << "("
<< its_error.message() << ")"
<< " endpoint > " << this << " state_ > "
<< static_cast<int>(state_.load());
}
socket_->bind(local_, its_error);
if (its_error) {
VSOMEIP_WARNING << "ltcei::" << __func__
<< ": Cannot bind to client port " << local_.port() << "("
<< its_error.message() << ")"
<< " endpoint > " << this << " state_ > " << static_cast<int>(state_.load());
VSOMEIP_WARNING << "ltcei::" << __func__ << ": Cannot bind to client port "
<< local_.port() << "(" << its_error.message() << ")"
<< " endpoint > " << this << " state_ > "
<< static_cast<int>(state_.load());
try {
strand_.post(
std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(),
its_connect_error));
} catch (const std::exception &e) {
VSOMEIP_ERROR << "ltcei::connect: " << e.what()
<< " endpoint > " << this << " state_ > " << static_cast<int>(state_.load());
strand_.post(std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(),
its_connect_error));
} catch (const std::exception& e) {
VSOMEIP_ERROR << "ltcei::connect: " << e.what() << " endpoint > " << this
<< " state_ > " << static_cast<int>(state_.load());
}
return;
}
Expand Down
25 changes: 15 additions & 10 deletions implementation/endpoints/src/local_tcp_server_endpoint_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,27 +241,34 @@ void local_tcp_server_endpoint_impl::accept_cbk(
// Nagle algorithm off
new_connection_socket.set_option(boost::asio::ip::tcp::no_delay(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "ltsei::accept_cbk: couldn't disable "
VSOMEIP_WARNING << "ltsei::" << __func__ << ": couldn't disable "
<< "Nagle algorithm: " << its_error.message()
<< " endpoint > " << this;
}
new_connection_socket.set_option(boost::asio::socket_base::keep_alive(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "ltsei::accept_cbk: couldn't enable "
VSOMEIP_WARNING << "ltsei::" << __func__ << ": couldn't enable "
<< "keep_alive: " << its_error.message()
<< " endpoint > " << this;
}
// Setting the TIME_WAIT to 0 seconds forces RST to always be sent in reponse to a FIN
// Since this is endpoint for internal communication, setting the TIME_WAIT to 5 seconds
// should be enough to ensure the ACK to the FIN arrives to the server endpoint.
new_connection_socket.set_option(boost::asio::socket_base::linger(true, 5), its_error);
if (its_error) {
VSOMEIP_WARNING << "ltsei::" << __func__ << ": setting SO_LINGER failed ("
<< its_error.message() << ") " << this;
}
}
}
if (_error != boost::asio::error::bad_descriptor
&& _error != boost::asio::error::operation_aborted
&& _error != boost::asio::error::no_descriptors) {
start();
} else if (_error == boost::asio::error::no_descriptors) {
VSOMEIP_ERROR << "ltsei::accept_cbk: "
<< _error.message() << " (" << std::dec << _error.value()
<< ") Will try to accept again in 1000ms"
<< " endpoint > " << this;
VSOMEIP_ERROR << "ltsei::" << __func__ << ": " << _error.message() << " (" << std::dec
<< _error.value() << ") Will try to accept again in 1000ms"
<< " endpoint > " << this;
auto its_timer =
std::make_shared<boost::asio::steady_timer>(io_,
std::chrono::milliseconds(1000));
Expand Down Expand Up @@ -723,10 +730,8 @@ void local_tcp_server_endpoint_impl::connection::receive_cbk(
} while (recv_buffer_size_ > 0 && found_message);
}

if (is_stopped_
|| _error == boost::asio::error::eof
|| _error == boost::asio::error::connection_reset
|| is_error) {
if (is_stopped_ || _error == boost::asio::error::eof
|| _error == boost::asio::error::connection_reset || is_error) {
shutdown_and_close();
its_server->remove_connection(bound_client_);
its_server->configuration_->get_policy_manager()->remove_client_to_sec_client_mapping(bound_client_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ void local_uds_client_endpoint_impl::restart(bool _force) {
if (!_force && state_ == cei_state_e::CONNECTING) {
return;
}
state_ = cei_state_e::CONNECTING;
{
std::lock_guard<std::recursive_mutex> its_lock(mutex_);
sending_blocked_ = false;
Expand All @@ -60,6 +59,7 @@ void local_uds_client_endpoint_impl::restart(bool _force) {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
shutdown_and_close_socket_unlocked(true);
}
state_ = cei_state_e::CONNECTING;
was_not_connected_ = true;
reconnect_counter_ = 0;
start_connect_timer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,10 +826,8 @@ void local_uds_server_endpoint_impl::connection::receive_cbk(
} while (recv_buffer_size_ > 0 && found_message);
}

if (is_stopped_
|| _error == boost::asio::error::eof
|| _error == boost::asio::error::connection_reset
|| is_error) {
if (is_stopped_ || _error == boost::asio::error::eof
|| _error == boost::asio::error::connection_reset || is_error) {
shutdown_and_close();
its_server->remove_connection(bound_client_);
its_server->configuration_->get_policy_manager()->remove_client_to_sec_client_mapping(bound_client_);
Expand Down
Loading

0 comments on commit ade0d62

Please sign in to comment.