diff --git a/include/fastdds/rtps/writer/StatefulWriter.h b/include/fastdds/rtps/writer/StatefulWriter.h index 04207f2838b..e1785389252 100644 --- a/include/fastdds/rtps/writer/StatefulWriter.h +++ b/include/fastdds/rtps/writer/StatefulWriter.h @@ -515,8 +515,8 @@ class StatefulWriter : public RTPSWriter bool disable_heartbeat_piggyback_; //! True to disable positive ACKs bool disable_positive_acks_; - //! Keep duration for disable positive ACKs QoS, in microseconds - std::chrono::duration> keep_duration_us_; + //! Keep duration for disable positive ACKs QoS + Duration_t keep_duration_; //! Last acknowledged cache change (only used if using disable positive ACKs QoS) SequenceNumber_t last_sequence_number_; //! Biggest sequence number removed from history diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp index 56e4023b637..eb47373fa09 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp @@ -1221,14 +1221,7 @@ bool DomainParticipantImpl::contains_entity( ReturnCode_t DomainParticipantImpl::get_current_time( fastrtps::Time_t& current_time) const { - auto now = std::chrono::system_clock::now(); - auto duration = now.time_since_epoch(); - auto seconds = std::chrono::duration_cast(duration); - duration -= seconds; - auto nanos = std::chrono::duration_cast(duration); - - current_time.seconds = static_cast(seconds.count()); - current_time.nanosec = static_cast(nanos.count()); + fastrtps::Time_t::now(current_time); return ReturnCode_t::RETCODE_OK; } diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp index f2746ed522c..2daf8a40327 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp @@ -111,10 +111,10 @@ class DomainParticipantImpl DomainParticipantListener* listener, const std::chrono::seconds timeout = std::chrono::seconds::max()) { - auto time_out = std::chrono::time_point::max(); + auto time_out = std::chrono::time_point::max(); if (timeout < std::chrono::seconds::max()) { - auto now = std::chrono::system_clock::now(); + auto now = std::chrono::steady_clock::now(); time_out = now + timeout; } diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 726f899561b..1a49eab3336 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -1027,7 +1028,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change( { if (!history_.set_next_deadline( handle, - steady_clock::now() + duration_cast(deadline_duration_us_))) + steady_clock::now() + duration_cast(deadline_duration_us_))) { EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history"); } @@ -1499,7 +1500,7 @@ bool DataWriterImpl::deadline_missed() if (!history_.set_next_deadline( timer_owner_, - steady_clock::now() + duration_cast(deadline_duration_us_))) + steady_clock::now() + duration_cast(deadline_duration_us_))) { EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history"); return false; @@ -1549,39 +1550,26 @@ bool DataWriterImpl::lifespan_expired() { std::unique_lock lock(writer_->getMutex()); + fastrtps::Time_t current_ts; + fastrtps::Time_t::now(current_ts); + CacheChange_t* earliest_change; while (history_.get_earliest_change(&earliest_change)) { - auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); + fastrtps::Time_t expiration_ts(earliest_change->sourceTimestamp.seconds(), + earliest_change->sourceTimestamp.nanosec()); + expiration_ts = expiration_ts + qos_.lifespan().duration; // Check that the earliest change has expired (the change which started the timer could have been removed from the history) - if (now - source_timestamp < lifespan_duration_us_) + if (current_ts < expiration_ts) { - auto interval = source_timestamp - now + lifespan_duration_us_; - lifespan_timer_->update_interval_millisec(static_cast(duration_cast(interval).count())); + fastrtps::Time_t interval = expiration_ts - current_ts; + lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6); return true; } // The earliest change has expired history_.remove_change_pub(earliest_change); - - // Set the timer for the next change if there is one - if (!history_.get_earliest_change(&earliest_change)) - { - return false; - } - - // Calculate when the next change is due to expire and restart - source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns()); - now = system_clock::now(); - auto interval = source_timestamp - now + lifespan_duration_us_; - - if (interval.count() > 0) - { - lifespan_timer_->update_interval_millisec(static_cast(duration_cast(interval).count())); - return true; - } } return false; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 194a3b0e860..015b9e79e5d 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -1103,7 +1104,7 @@ bool DataReaderImpl::on_new_cache_change_added( { if (!history_.set_next_deadline( change->instanceHandle, - steady_clock::now() + duration_cast(deadline_duration_us_))) + steady_clock::now() + duration_cast(deadline_duration_us_))) { EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history"); } @@ -1122,12 +1123,14 @@ bool DataReaderImpl::on_new_cache_change_added( return true; } - auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); + fastrtps::Time_t expiration_ts(change->sourceTimestamp.seconds(), change->sourceTimestamp.nanosec()); + expiration_ts = expiration_ts + qos_.lifespan().duration; + fastrtps::Time_t current_ts; + fastrtps::Time_t::now(current_ts); // The new change could have expired if it arrived too late // If so, remove it from the history and return false to avoid notifying the listener - if (now - source_timestamp >= lifespan_duration_us_) + if (expiration_ts <= current_ts) { history_.remove_change_sub(new_change); return false; @@ -1149,11 +1152,10 @@ bool DataReaderImpl::on_new_cache_change_added( EPROSIMA_LOG_ERROR(SUBSCRIBER, "A change was added to history that could not be retrieved"); } - auto interval = source_timestamp - now + duration_cast(lifespan_duration_us_); - // Update and restart the timer // If the timer is already running this will not have any effect - lifespan_timer_->update_interval_millisec(interval.count() * 1e-6); + fastrtps::Time_t interval = expiration_ts - current_ts; + lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6); lifespan_timer_->restart_timer(); return true; } @@ -1241,7 +1243,7 @@ bool DataReaderImpl::deadline_missed() if (!history_.set_next_deadline( timer_owner_, - steady_clock::now() + duration_cast(deadline_duration_us_), true)) + steady_clock::now() + duration_cast(deadline_duration_us_), true)) { EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history"); return false; @@ -1272,17 +1274,21 @@ bool DataReaderImpl::lifespan_expired() { std::unique_lock lock(reader_->getMutex()); + fastrtps::Time_t current_ts; + fastrtps::Time_t::now(current_ts); + CacheChange_t* earliest_change; while (history_.get_earliest_change(&earliest_change)) { - auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); + fastrtps::Time_t expiration_ts(earliest_change->sourceTimestamp.seconds(), + earliest_change->sourceTimestamp.nanosec()); + expiration_ts = expiration_ts + qos_.lifespan().duration; // Check that the earliest change has expired (the change which started the timer could have been removed from the history) - if (now - source_timestamp < lifespan_duration_us_) + if (current_ts < expiration_ts) { - auto interval = source_timestamp - now + lifespan_duration_us_; - lifespan_timer_->update_interval_millisec(static_cast(duration_cast(interval).count())); + fastrtps::Time_t interval = expiration_ts - current_ts; + lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6); return true; } @@ -1290,23 +1296,6 @@ bool DataReaderImpl::lifespan_expired() history_.remove_change_sub(earliest_change); try_notify_read_conditions(); - - // Set the timer for the next change if there is one - if (!history_.get_earliest_change(&earliest_change)) - { - return false; - } - - // Calculate when the next change is due to expire and restart - source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns()); - now = system_clock::now(); - auto interval = source_timestamp - now + lifespan_duration_us_; - - if (interval.count() > 0) - { - lifespan_timer_->update_interval_millisec(static_cast(duration_cast(interval).count())); - return true; - } } return false; diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index df8e15a1a22..977de168352 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -183,7 +183,7 @@ StatefulWriter::StatefulWriter( , may_remove_change_(0) , disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback) , disable_positive_acks_(att.disable_positive_acks) - , keep_duration_us_(att.keep_duration.to_ns() * 1e-3) + , keep_duration_(att.keep_duration) , last_sequence_number_() , biggest_removed_sequence_number_() , sendBufferSize_(pimpl->get_min_network_send_buffer_size()) @@ -218,7 +218,7 @@ StatefulWriter::StatefulWriter( , may_remove_change_(0) , disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback) , disable_positive_acks_(att.disable_positive_acks) - , keep_duration_us_(att.keep_duration.to_ns() * 1e-3) + , keep_duration_(att.keep_duration) , last_sequence_number_() , biggest_removed_sequence_number_() , sendBufferSize_(pimpl->get_min_network_send_buffer_size()) @@ -254,7 +254,7 @@ StatefulWriter::StatefulWriter( , may_remove_change_(0) , disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback) , disable_positive_acks_(att.disable_positive_acks) - , keep_duration_us_(att.keep_duration.to_ns() * 1e-3) + , keep_duration_(att.keep_duration) , last_sequence_number_() , biggest_removed_sequence_number_() , sendBufferSize_(pimpl->get_min_network_send_buffer_size()) @@ -432,12 +432,13 @@ void StatefulWriter::unsent_change_added_to_history( if (disable_positive_acks_) { - auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); - auto interval = source_timestamp - now + keep_duration_us_; - assert(interval.count() >= 0); + Time_t expiration_ts = change->sourceTimestamp + keep_duration_; + Time_t current_ts; + Time_t::now(current_ts); + assert(expiration_ts >= current_ts); + auto interval = (expiration_ts - current_ts).to_duration_t(); - ack_event_->update_interval_millisec((double)duration_cast(interval).count()); + ack_event_->update_interval(interval); ack_event_->restart_timer(max_blocking_time); } @@ -953,12 +954,13 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network( if ( !(ack_event_->getRemainingTimeMilliSec() > 0)) { // Restart ack_timer - auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); - auto interval = source_timestamp - now + keep_duration_us_; - assert(interval.count() >= 0); + Time_t expiration_ts = change->sourceTimestamp + keep_duration_; + Time_t current_ts; + Time_t::now(current_ts); + assert(expiration_ts >= current_ts); + auto interval = (expiration_ts - current_ts).to_duration_t(); - ack_event_->update_interval_millisec((double)duration_cast(interval).count()); + ack_event_->update_interval(interval); ack_event_->restart_timer(max_blocking_time); } } @@ -1670,13 +1672,9 @@ void StatefulWriter::updatePositiveAcks( const WriterAttributes& att) { std::lock_guard guard(mp_mutex); - if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3)) - { - // Implicit conversion to microseconds - keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()}; - } + keep_duration_ = att.keep_duration; // Restart ack timer with new duration - ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3); + ack_event_->update_interval(keep_duration_); ack_event_->restart_timer(); } @@ -2089,14 +2087,16 @@ bool StatefulWriter::ack_timer_expired() // The timer has expired so the earliest non-acked change must be marked as acknowledged // This will be done in the first while iteration, as we start with a negative interval - auto interval = -keep_duration_us_; + Time_t expiration_ts; + Time_t current_ts; + Time_t::now(current_ts); // On the other hand, we've seen in the tests that if samples are sent very quickly with little // time between consecutive samples, the timer interval could end up being negative // In this case, we keep marking changes as acknowledged until the timer is able to keep up, hence the while // loop - while (interval.count() < 0) + do { bool acks_flag = false; for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, @@ -2135,13 +2135,13 @@ bool StatefulWriter::ack_timer_expired() return false; } - auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); - interval = source_timestamp - now + keep_duration_us_; + Time_t::now(current_ts); + expiration_ts = change->sourceTimestamp + keep_duration_; } - assert(interval.count() >= 0); + while (expiration_ts < current_ts); - ack_event_->update_interval_millisec((double)duration_cast(interval).count()); + auto interval = (expiration_ts - current_ts).to_duration_t(); + ack_event_->update_interval(interval); return true; } diff --git a/src/cpp/utils/SystemInfo.cpp b/src/cpp/utils/SystemInfo.cpp index 8aab32dbb9d..28c02b27dbd 100644 --- a/src/cpp/utils/SystemInfo.cpp +++ b/src/cpp/utils/SystemInfo.cpp @@ -165,7 +165,7 @@ bool SystemInfo::wait_for_file_closure( const std::string& filename, const std::chrono::seconds timeout) { - auto start = std::chrono::system_clock::now(); + auto start = std::chrono::steady_clock::now(); #ifdef _MSC_VER std::ofstream os; @@ -175,7 +175,7 @@ bool SystemInfo::wait_for_file_closure( os.open(filename, std::ios::out | std::ios::app, _SH_DENYWR); if (!os.is_open() // If the file is lock-opened in an external editor do not hang - && (std::chrono::system_clock::now() - start) < timeout ) + && (std::chrono::steady_clock::now() - start) < timeout ) { std::this_thread::yield(); } @@ -190,7 +190,7 @@ bool SystemInfo::wait_for_file_closure( while (flock(fd, LOCK_EX | LOCK_NB) // If the file is lock-opened in an external editor do not hang - && (std::chrono::system_clock::now() - start) < timeout ) + && (std::chrono::steady_clock::now() - start) < timeout ) { std::this_thread::yield(); } @@ -205,7 +205,7 @@ bool SystemInfo::wait_for_file_closure( (void)filename; #endif // ifdef _MSC_VER - return std::chrono::system_clock::now() - start < timeout; + return std::chrono::steady_clock::now() - start < timeout; } ReturnCode_t SystemInfo::set_environment_file()