Skip to content

Commit

Permalink
Improve resilience against clock adjustments (#5018)
Browse files Browse the repository at this point in the history
* Use steady_clock instead of high_resolution_clock for status checks (high_resolution_clock might not be steady depending on STL impl)

Signed-off-by: Matthias Schneider <[email protected]>

* Use steady_clock instead for system_clock for calculating timeouts

Signed-off-by: Matthias Schneider <[email protected]>

* Use correct clock's duration for duration_cast

Signed-off-by: Matthias Schneider <[email protected]>

* Use Time_t::now()

Signed-off-by: Matthias Schneider <[email protected]>

* Fix build.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21314. Refactor on DataWriterImpl.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21314. Refactor on DataReaderImpl.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21314. Refactor on StatefulWriter.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21314. Protect current_time_since_unix_epoch against clock adjustments.

Signed-off-by: Miguel Company <[email protected]>

* Revert "Use steady_clock instead of high_resolution_clock for status checks (high_resolution_clock might not be steady depending on STL impl)"

This reverts commit d69eb91.

---------

Signed-off-by: Matthias Schneider <[email protected]>
Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: Miguel Company <[email protected]>
(cherry picked from commit ccc690c)

# Conflicts:
#	include/fastdds/rtps/writer/StatefulWriter.h
#	src/cpp/fastdds/publisher/DataWriterImpl.cpp
#	src/cpp/fastdds/subscriber/DataReaderImpl.cpp
#	src/cpp/utils/time_t_helpers.hpp
  • Loading branch information
ma30002000 authored and mergify[bot] committed Sep 2, 2024
1 parent 6ec6574 commit 0eed014
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 75 deletions.
6 changes: 6 additions & 0 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,15 @@ class StatefulWriter : public RTPSWriter
bool disable_heartbeat_piggyback_;
//! True to disable positive ACKs
bool disable_positive_acks_;
<<<<<<< HEAD:include/fastdds/rtps/writer/StatefulWriter.h
//! Keep duration for disable positive ACKs QoS, in microseconds
std::chrono::duration<double, std::ratio<1, 1000000>> keep_duration_us_;
//! Last acknowledged cache change (only used if using disable positive ACKs QoS)
=======
/// Keep duration for disable positive ACKs QoS
fastdds::dds::Duration_t keep_duration_;
/// Last acknowledged cache change (only used if using disable positive ACKs QoS)
>>>>>>> ccc690c97 (Improve resilience against clock adjustments (#5018)):src/cpp/rtps/writer/StatefulWriter.hpp
SequenceNumber_t last_sequence_number_;
//! Biggest sequence number removed from history
SequenceNumber_t biggest_removed_sequence_number_;
Expand Down
9 changes: 1 addition & 8 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1221,14 +1221,7 @@ bool DomainParticipantImpl::contains_entity(
ReturnCode_t DomainParticipantImpl::get_current_time(
fastrtps::Time_t& current_time) const
{
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
duration -= seconds;
auto nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(duration);

current_time.seconds = static_cast<int32_t>(seconds.count());
current_time.nanosec = static_cast<uint32_t>(nanos.count());
fastdds::dds::Time_t::now(current_time);

return ReturnCode_t::RETCODE_OK;
}
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ class DomainParticipantImpl
DomainParticipantListener* listener,
const std::chrono::seconds timeout = std::chrono::seconds::max())
{
auto time_out = std::chrono::time_point<std::chrono::system_clock>::max();
auto time_out = std::chrono::time_point<std::chrono::steady_clock>::max();
if (timeout < std::chrono::seconds::max())
{
auto now = std::chrono::system_clock::now();
auto now = std::chrono::steady_clock::now();
time_out = now + timeout;
}

Expand Down
28 changes: 21 additions & 7 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <fastdds/domain/DomainParticipantImpl.hpp>
#include <fastdds/publisher/filtering/DataWriterFilteredChangePool.hpp>
#include <fastdds/publisher/PublisherImpl.hpp>
<<<<<<< HEAD
#include <fastdds/rtps/builtin/liveliness/WLP.h>
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/resources/ResourceEvent.h>
Expand All @@ -43,6 +44,13 @@
#include <fastrtps/attributes/TopicAttributes.h>
#include <fastrtps/config.h>
#include <fastrtps/utils/TimeConversion.h>
=======
#include <fastdds/rtps/builtin/data/TopicDescription.hpp>
#include <fastdds/rtps/common/Time_t.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.hpp>
#include <fastdds/rtps/RTPSDomain.hpp>
#include <fastdds/rtps/writer/RTPSWriter.hpp>
>>>>>>> ccc690c97 (Improve resilience against clock adjustments (#5018))

#include <rtps/DataSharing/DataSharingPayloadPool.hpp>
#include <rtps/history/CacheChangePool.h>
Expand Down Expand Up @@ -1027,7 +1035,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
{
if (!history_.set_next_deadline(
handle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
}
Expand Down Expand Up @@ -1499,7 +1507,7 @@ bool DataWriterImpl::deadline_missed()

if (!history_.set_next_deadline(
timer_owner_,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
return false;
Expand Down Expand Up @@ -1549,21 +1557,24 @@ bool DataWriterImpl::lifespan_expired()
{
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

fastdds::rtps::Time_t current_ts;
fastdds::rtps::Time_t::now(current_ts);

CacheChange_t* earliest_change;
while (history_.get_earliest_change(&earliest_change))
{
auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastdds::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration;

// Check that the earliest change has expired (the change which started the timer could have been removed from the history)
if (now - source_timestamp < lifespan_duration_us_)
if (current_ts < expiration_ts)
{
auto interval = source_timestamp - now + lifespan_duration_us_;
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
fastdds::rtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
return true;
}

// The earliest change has expired
<<<<<<< HEAD
history_.remove_change_pub(earliest_change);

// Set the timer for the next change if there is one
Expand All @@ -1582,6 +1593,9 @@ bool DataWriterImpl::lifespan_expired()
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
return true;
}
=======
history_->remove_change_pub(earliest_change);
>>>>>>> ccc690c97 (Improve resilience against clock adjustments (#5018))
}

return false;
Expand Down
54 changes: 24 additions & 30 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,20 @@
#include <fastdds/dds/topic/Topic.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/domain/DomainParticipantImpl.hpp>
<<<<<<< HEAD
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastdds/rtps/resources/ResourceEvent.h>
#include <fastdds/rtps/resources/TimedEvent.h>
#include <fastdds/rtps/RTPSDomain.h>
=======
#include <fastdds/rtps/builtin/data/TopicDescription.hpp>
#include <fastdds/rtps/common/Time_t.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.hpp>
#include <fastdds/rtps/reader/RTPSReader.hpp>
#include <fastdds/rtps/RTPSDomain.hpp>
#include <fastdds/subscriber/DataReaderImpl.hpp>
>>>>>>> ccc690c97 (Improve resilience against clock adjustments (#5018))
#include <fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp>
#include <fastdds/subscriber/DataReaderImpl/StateFilter.hpp>
#include <fastdds/subscriber/ReadConditionImpl.hpp>
Expand Down Expand Up @@ -1103,7 +1112,7 @@ bool DataReaderImpl::on_new_cache_change_added(
{
if (!history_.set_next_deadline(
change->instanceHandle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history");
}
Expand All @@ -1122,12 +1131,13 @@ bool DataReaderImpl::on_new_cache_change_added(
return true;
}

auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastdds::rtps::Time_t expiration_ts = change->sourceTimestamp + qos_.lifespan().duration;
fastdds::rtps::Time_t current_ts;
fastdds::rtps::Time_t::now(current_ts);

// The new change could have expired if it arrived too late
// If so, remove it from the history and return false to avoid notifying the listener
if (now - source_timestamp >= lifespan_duration_us_)
if (expiration_ts < current_ts)
{
history_.remove_change_sub(new_change);
return false;
Expand All @@ -1149,11 +1159,10 @@ bool DataReaderImpl::on_new_cache_change_added(
EPROSIMA_LOG_ERROR(SUBSCRIBER, "A change was added to history that could not be retrieved");
}

auto interval = source_timestamp - now + duration_cast<nanoseconds>(lifespan_duration_us_);

// Update and restart the timer
// If the timer is already running this will not have any effect
lifespan_timer_->update_interval_millisec(interval.count() * 1e-6);
fastdds::rtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
lifespan_timer_->restart_timer();
return true;
}
Expand Down Expand Up @@ -1241,7 +1250,7 @@ bool DataReaderImpl::deadline_missed()

if (!history_.set_next_deadline(
timer_owner_,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_), true))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_), true))
{
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history");
return false;
Expand Down Expand Up @@ -1272,41 +1281,26 @@ bool DataReaderImpl::lifespan_expired()
{
std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex());

fastdds::rtps::Time_t current_ts;
fastdds::rtps::Time_t::now(current_ts);

CacheChange_t* earliest_change;
while (history_.get_earliest_change(&earliest_change))
{
auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastdds::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration;

// Check that the earliest change has expired (the change which started the timer could have been removed from the history)
if (now - source_timestamp < lifespan_duration_us_)
if (current_ts < expiration_ts)
{
auto interval = source_timestamp - now + lifespan_duration_us_;
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
fastdds::rtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
return true;
}

// The earliest change has expired
history_.remove_change_sub(earliest_change);

try_notify_read_conditions();

// Set the timer for the next change if there is one
if (!history_.get_earliest_change(&earliest_change))
{
return false;
}

// Calculate when the next change is due to expire and restart
source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
now = system_clock::now();
auto interval = source_timestamp - now + lifespan_duration_us_;

if (interval.count() > 0)
{
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
return true;
}
}

return false;
Expand Down
48 changes: 24 additions & 24 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ StatefulWriter::StatefulWriter(
, may_remove_change_(0)
, disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback)
, disable_positive_acks_(att.disable_positive_acks)
, keep_duration_us_(att.keep_duration.to_ns() * 1e-3)
, keep_duration_(att.keep_duration)
, last_sequence_number_()
, biggest_removed_sequence_number_()
, sendBufferSize_(pimpl->get_min_network_send_buffer_size())
Expand Down Expand Up @@ -431,12 +431,13 @@ void StatefulWriter::unsent_change_added_to_history(

if (disable_positive_acks_)
{
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);
Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
Time_t current_ts;
Time_t::now(current_ts);
assert(expiration_ts >= current_ts);
auto interval = (expiration_ts - current_ts).to_duration_t();

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->update_interval(interval);
ack_event_->restart_timer(max_blocking_time);
}

Expand Down Expand Up @@ -951,12 +952,13 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
if ( !(ack_event_->getRemainingTimeMilliSec() > 0))
{
// Restart ack_timer
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);
Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
Time_t current_ts;
Time_t::now(current_ts);
assert(expiration_ts >= current_ts);
auto interval = (expiration_ts - current_ts).to_duration_t();

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->update_interval(interval);
ack_event_->restart_timer(max_blocking_time);
}
}
Expand Down Expand Up @@ -1668,13 +1670,9 @@ void StatefulWriter::updatePositiveAcks(
const WriterAttributes& att)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3))
{
// Implicit conversion to microseconds
keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()};
}
keep_duration_ = att.keep_duration;
// Restart ack timer with new duration
ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3);
ack_event_->update_interval(keep_duration_);
ack_event_->restart_timer();
}

Expand Down Expand Up @@ -2087,14 +2085,16 @@ bool StatefulWriter::ack_timer_expired()
// The timer has expired so the earliest non-acked change must be marked as acknowledged
// This will be done in the first while iteration, as we start with a negative interval

auto interval = -keep_duration_us_;
Time_t expiration_ts;
Time_t current_ts;
Time_t::now(current_ts);

// On the other hand, we've seen in the tests that if samples are sent very quickly with little
// time between consecutive samples, the timer interval could end up being negative
// In this case, we keep marking changes as acknowledged until the timer is able to keep up, hence the while
// loop

while (interval.count() < 0)
do
{
bool acks_flag = false;
for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
Expand Down Expand Up @@ -2133,13 +2133,13 @@ bool StatefulWriter::ack_timer_expired()
return false;
}

auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
interval = source_timestamp - now + keep_duration_us_;
Time_t::now(current_ts);
expiration_ts = change->sourceTimestamp + keep_duration_;
}
assert(interval.count() >= 0);
while (expiration_ts < current_ts);

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
auto interval = (expiration_ts - current_ts).to_duration_t();
ack_event_->update_interval(interval);
return true;
}

Expand Down
8 changes: 4 additions & 4 deletions src/cpp/utils/SystemInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ bool SystemInfo::wait_for_file_closure(
const std::string& filename,
const std::chrono::seconds timeout)
{
auto start = std::chrono::system_clock::now();
auto start = std::chrono::steady_clock::now();

#ifdef _MSC_VER
std::ofstream os;
Expand All @@ -175,7 +175,7 @@ bool SystemInfo::wait_for_file_closure(
os.open(filename, std::ios::out | std::ios::app, _SH_DENYWR);
if (!os.is_open()
// If the file is lock-opened in an external editor do not hang
&& (std::chrono::system_clock::now() - start) < timeout )
&& (std::chrono::steady_clock::now() - start) < timeout )
{
std::this_thread::yield();
}
Expand All @@ -190,7 +190,7 @@ bool SystemInfo::wait_for_file_closure(

while (flock(fd, LOCK_EX | LOCK_NB)
// If the file is lock-opened in an external editor do not hang
&& (std::chrono::system_clock::now() - start) < timeout )
&& (std::chrono::steady_clock::now() - start) < timeout )
{
std::this_thread::yield();
}
Expand All @@ -205,7 +205,7 @@ bool SystemInfo::wait_for_file_closure(
(void)filename;
#endif // ifdef _MSC_VER

return std::chrono::system_clock::now() - start < timeout;
return std::chrono::steady_clock::now() - start < timeout;
}

ReturnCode_t SystemInfo::set_environment_file()
Expand Down
Loading

0 comments on commit 0eed014

Please sign in to comment.