Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21314] Improve resilience against clock adjustments (backport #5018) #5189

Open
wants to merge 5 commits into
base: 2.14.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,8 @@ class StatefulWriter : public RTPSWriter
bool disable_heartbeat_piggyback_;
//! True to disable positive ACKs
bool disable_positive_acks_;
//! Keep duration for disable positive ACKs QoS, in microseconds
std::chrono::duration<double, std::ratio<1, 1000000>> keep_duration_us_;
//! Keep duration for disable positive ACKs QoS
Duration_t keep_duration_;
//! Last acknowledged cache change (only used if using disable positive ACKs QoS)
SequenceNumber_t last_sequence_number_;
//! Biggest sequence number removed from history
Expand Down
9 changes: 1 addition & 8 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1221,14 +1221,7 @@ bool DomainParticipantImpl::contains_entity(
ReturnCode_t DomainParticipantImpl::get_current_time(
fastrtps::Time_t& current_time) const
{
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
duration -= seconds;
auto nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(duration);

current_time.seconds = static_cast<int32_t>(seconds.count());
current_time.nanosec = static_cast<uint32_t>(nanos.count());
fastrtps::Time_t::now(current_time);

return ReturnCode_t::RETCODE_OK;
}
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ class DomainParticipantImpl
DomainParticipantListener* listener,
const std::chrono::seconds timeout = std::chrono::seconds::max())
{
auto time_out = std::chrono::time_point<std::chrono::system_clock>::max();
auto time_out = std::chrono::time_point<std::chrono::steady_clock>::max();
if (timeout < std::chrono::seconds::max())
{
auto now = std::chrono::system_clock::now();
auto now = std::chrono::steady_clock::now();
time_out = now + timeout;
}

Expand Down
36 changes: 12 additions & 24 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <fastdds/publisher/filtering/DataWriterFilteredChangePool.hpp>
#include <fastdds/publisher/PublisherImpl.hpp>
#include <fastdds/rtps/builtin/liveliness/WLP.h>
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/resources/ResourceEvent.h>
#include <fastdds/rtps/resources/TimedEvent.h>
Expand Down Expand Up @@ -1027,7 +1028,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
{
if (!history_.set_next_deadline(
handle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
}
Expand Down Expand Up @@ -1499,7 +1500,7 @@ bool DataWriterImpl::deadline_missed()

if (!history_.set_next_deadline(
timer_owner_,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
return false;
Expand Down Expand Up @@ -1549,39 +1550,26 @@ bool DataWriterImpl::lifespan_expired()
{
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

fastrtps::Time_t current_ts;
fastrtps::Time_t::now(current_ts);

CacheChange_t* earliest_change;
while (history_.get_earliest_change(&earliest_change))
{
auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastrtps::Time_t expiration_ts(earliest_change->sourceTimestamp.seconds(),
earliest_change->sourceTimestamp.nanosec());
expiration_ts = expiration_ts + qos_.lifespan().duration;

// Check that the earliest change has expired (the change which started the timer could have been removed from the history)
if (now - source_timestamp < lifespan_duration_us_)
if (current_ts < expiration_ts)
{
auto interval = source_timestamp - now + lifespan_duration_us_;
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
fastrtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
return true;
}

// The earliest change has expired
history_.remove_change_pub(earliest_change);

// Set the timer for the next change if there is one
if (!history_.get_earliest_change(&earliest_change))
{
return false;
}

// Calculate when the next change is due to expire and restart
source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
now = system_clock::now();
auto interval = source_timestamp - now + lifespan_duration_us_;

if (interval.count() > 0)
{
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
return true;
}
}

return false;
Expand Down
49 changes: 19 additions & 30 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <fastdds/dds/topic/Topic.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/domain/DomainParticipantImpl.hpp>
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastdds/rtps/resources/ResourceEvent.h>
Expand Down Expand Up @@ -1103,7 +1104,7 @@ bool DataReaderImpl::on_new_cache_change_added(
{
if (!history_.set_next_deadline(
change->instanceHandle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history");
}
Expand All @@ -1122,12 +1123,14 @@ bool DataReaderImpl::on_new_cache_change_added(
return true;
}

auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastrtps::Time_t expiration_ts(change->sourceTimestamp.seconds(), change->sourceTimestamp.nanosec());
expiration_ts = expiration_ts + qos_.lifespan().duration;
fastrtps::Time_t current_ts;
fastrtps::Time_t::now(current_ts);

// The new change could have expired if it arrived too late
// If so, remove it from the history and return false to avoid notifying the listener
if (now - source_timestamp >= lifespan_duration_us_)
if (expiration_ts < current_ts)
{
history_.remove_change_sub(new_change);
return false;
Expand All @@ -1149,11 +1152,10 @@ bool DataReaderImpl::on_new_cache_change_added(
EPROSIMA_LOG_ERROR(SUBSCRIBER, "A change was added to history that could not be retrieved");
}

auto interval = source_timestamp - now + duration_cast<nanoseconds>(lifespan_duration_us_);

// Update and restart the timer
// If the timer is already running this will not have any effect
lifespan_timer_->update_interval_millisec(interval.count() * 1e-6);
fastrtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
lifespan_timer_->restart_timer();
return true;
}
Expand Down Expand Up @@ -1241,7 +1243,7 @@ bool DataReaderImpl::deadline_missed()

if (!history_.set_next_deadline(
timer_owner_,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_), true))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_), true))
{
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history");
return false;
Expand Down Expand Up @@ -1272,41 +1274,28 @@ bool DataReaderImpl::lifespan_expired()
{
std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex());

fastrtps::Time_t current_ts;
fastrtps::Time_t::now(current_ts);

CacheChange_t* earliest_change;
while (history_.get_earliest_change(&earliest_change))
{
auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastrtps::Time_t expiration_ts(earliest_change->sourceTimestamp.seconds(),
earliest_change->sourceTimestamp.nanosec());
expiration_ts = expiration_ts + qos_.lifespan().duration;

// Check that the earliest change has expired (the change which started the timer could have been removed from the history)
if (now - source_timestamp < lifespan_duration_us_)
if (current_ts < expiration_ts)
{
auto interval = source_timestamp - now + lifespan_duration_us_;
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
fastrtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
return true;
}

// The earliest change has expired
history_.remove_change_sub(earliest_change);

try_notify_read_conditions();

// Set the timer for the next change if there is one
if (!history_.get_earliest_change(&earliest_change))
{
return false;
}

// Calculate when the next change is due to expire and restart
source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
now = system_clock::now();
auto interval = source_timestamp - now + lifespan_duration_us_;

if (interval.count() > 0)
{
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
return true;
}
}

return false;
Expand Down
52 changes: 26 additions & 26 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ StatefulWriter::StatefulWriter(
, may_remove_change_(0)
, disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback)
, disable_positive_acks_(att.disable_positive_acks)
, keep_duration_us_(att.keep_duration.to_ns() * 1e-3)
, keep_duration_(att.keep_duration)
, last_sequence_number_()
, biggest_removed_sequence_number_()
, sendBufferSize_(pimpl->get_min_network_send_buffer_size())
Expand Down Expand Up @@ -218,7 +218,7 @@ StatefulWriter::StatefulWriter(
, may_remove_change_(0)
, disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback)
, disable_positive_acks_(att.disable_positive_acks)
, keep_duration_us_(att.keep_duration.to_ns() * 1e-3)
, keep_duration_(att.keep_duration)
, last_sequence_number_()
, biggest_removed_sequence_number_()
, sendBufferSize_(pimpl->get_min_network_send_buffer_size())
Expand Down Expand Up @@ -254,7 +254,7 @@ StatefulWriter::StatefulWriter(
, may_remove_change_(0)
, disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback)
, disable_positive_acks_(att.disable_positive_acks)
, keep_duration_us_(att.keep_duration.to_ns() * 1e-3)
, keep_duration_(att.keep_duration)
, last_sequence_number_()
, biggest_removed_sequence_number_()
, sendBufferSize_(pimpl->get_min_network_send_buffer_size())
Expand Down Expand Up @@ -432,12 +432,13 @@ void StatefulWriter::unsent_change_added_to_history(

if (disable_positive_acks_)
{
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);
Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
Time_t current_ts;
Time_t::now(current_ts);
assert(expiration_ts >= current_ts);
auto interval = (expiration_ts - current_ts).to_duration_t();

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->update_interval(interval);
ack_event_->restart_timer(max_blocking_time);
}

Expand Down Expand Up @@ -953,12 +954,13 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
if ( !(ack_event_->getRemainingTimeMilliSec() > 0))
{
// Restart ack_timer
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);
Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
Time_t current_ts;
Time_t::now(current_ts);
assert(expiration_ts >= current_ts);
auto interval = (expiration_ts - current_ts).to_duration_t();

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->update_interval(interval);
ack_event_->restart_timer(max_blocking_time);
}
}
Expand Down Expand Up @@ -1670,13 +1672,9 @@ void StatefulWriter::updatePositiveAcks(
const WriterAttributes& att)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3))
{
// Implicit conversion to microseconds
keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()};
}
keep_duration_ = att.keep_duration;
// Restart ack timer with new duration
ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3);
ack_event_->update_interval(keep_duration_);
ack_event_->restart_timer();
}

Expand Down Expand Up @@ -2089,14 +2087,16 @@ bool StatefulWriter::ack_timer_expired()
// The timer has expired so the earliest non-acked change must be marked as acknowledged
// This will be done in the first while iteration, as we start with a negative interval

auto interval = -keep_duration_us_;
Time_t expiration_ts;
Time_t current_ts;
Time_t::now(current_ts);

// On the other hand, we've seen in the tests that if samples are sent very quickly with little
// time between consecutive samples, the timer interval could end up being negative
// In this case, we keep marking changes as acknowledged until the timer is able to keep up, hence the while
// loop

while (interval.count() < 0)
do
{
bool acks_flag = false;
for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
Expand Down Expand Up @@ -2135,13 +2135,13 @@ bool StatefulWriter::ack_timer_expired()
return false;
}

auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
interval = source_timestamp - now + keep_duration_us_;
Time_t::now(current_ts);
expiration_ts = change->sourceTimestamp + keep_duration_;
}
assert(interval.count() >= 0);
while (expiration_ts < current_ts);

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
auto interval = (expiration_ts - current_ts).to_duration_t();
ack_event_->update_interval(interval);
return true;
}

Expand Down
8 changes: 4 additions & 4 deletions src/cpp/utils/SystemInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ bool SystemInfo::wait_for_file_closure(
const std::string& filename,
const std::chrono::seconds timeout)
{
auto start = std::chrono::system_clock::now();
auto start = std::chrono::steady_clock::now();

#ifdef _MSC_VER
std::ofstream os;
Expand All @@ -175,7 +175,7 @@ bool SystemInfo::wait_for_file_closure(
os.open(filename, std::ios::out | std::ios::app, _SH_DENYWR);
if (!os.is_open()
// If the file is lock-opened in an external editor do not hang
&& (std::chrono::system_clock::now() - start) < timeout )
&& (std::chrono::steady_clock::now() - start) < timeout )
{
std::this_thread::yield();
}
Expand All @@ -190,7 +190,7 @@ bool SystemInfo::wait_for_file_closure(

while (flock(fd, LOCK_EX | LOCK_NB)
// If the file is lock-opened in an external editor do not hang
&& (std::chrono::system_clock::now() - start) < timeout )
&& (std::chrono::steady_clock::now() - start) < timeout )
{
std::this_thread::yield();
}
Expand All @@ -205,7 +205,7 @@ bool SystemInfo::wait_for_file_closure(
(void)filename;
#endif // ifdef _MSC_VER

return std::chrono::system_clock::now() - start < timeout;
return std::chrono::steady_clock::now() - start < timeout;
}

ReturnCode_t SystemInfo::set_environment_file()
Expand Down
Loading