From 7113c23e35d75fd67ba9dac8a3dd16d82f7baf1b Mon Sep 17 00:00:00 2001 From: EugenioCollado <121509066+EugenioCollado@users.noreply.github.com> Date: Wed, 18 Dec 2024 09:30:03 +0100 Subject: [PATCH] Arithmetic overflow in fragment size calculations (#5464) * Tests arithmetic overflow in fragment size calculations Signed-off-by: Eugenio Collado * Refs #21814. Fix code in BaseWriter.cpp. Signed-off-by: Miguel Company * Fix corner case overhead==max_data_size Signed-off-by: Eugenio Collado * Refs #21814. Fix code in WriterHistory.cpp. Signed-off-by: Miguel Company * Fix corner case overhead==final_high_mark_for_frag Signed-off-by: Eugenio Collado * Uncrustify Signed-off-by: Eugenio Collado * Fix log error message Signed-off-by: Eugenio Collado * Fix test fragments not been dropped Signed-off-by: Eugenio Collado * Fix corner case RTPSParticipantImpl max_data_size < overhead Signed-off-by: Eugenio Collado * Test refactor for windows compilation Signed-off-by: Eugenio Collado * Fix blackbox test Signed-off-by: Eugenio Collado * Applied review suggestions Signed-off-by: EugenioCollado <121509066+EugenioCollado@users.noreply.github.com> --------- Signed-off-by: Eugenio Collado Signed-off-by: Miguel Company Signed-off-by: EugenioCollado <121509066+EugenioCollado@users.noreply.github.com> Co-authored-by: Miguel Company (cherry picked from commit bfc5a530bd523e01e7f9acaa4aeac62b6481cbfe) --- src/cpp/rtps/history/WriterHistory.cpp | 13 +- .../rtps/participant/RTPSParticipantImpl.cpp | 14 ++- src/cpp/rtps/writer/BaseWriter.cpp | 22 ++-- .../common/DDSBlackboxTestsListeners.cpp | 2 +- test/unittest/rtps/history/CMakeLists.txt | 19 +++ .../rtps/history/WriterHistoryTests.cpp | 119 ++++++++++++++++++ 6 files changed, 172 insertions(+), 17 deletions(-) create mode 100644 test/unittest/rtps/history/WriterHistoryTests.cpp diff --git a/src/cpp/rtps/history/WriterHistory.cpp b/src/cpp/rtps/history/WriterHistory.cpp index a7315451854..0b301d3e7d9 100644 --- a/src/cpp/rtps/history/WriterHistory.cpp +++ b/src/cpp/rtps/history/WriterHistory.cpp @@ -484,9 +484,16 @@ void WriterHistory::set_fragments( // If inlineqos for related_sample_identity is required, then remove its size from the final fragment size. if (0 < inline_qos_size) { - final_high_mark_for_frag -= ( - fastdds::dds::ParameterSerializer::PARAMETER_SENTINEL_SIZE + - inline_qos_size); + uint32_t overhead = fastdds::dds::ParameterSerializer::PARAMETER_SENTINEL_SIZE + inline_qos_size; + constexpr uint32_t min_fragment_size = 4; + if (final_high_mark_for_frag < (overhead + min_fragment_size)) + { + final_high_mark_for_frag = min_fragment_size; + } + else + { + final_high_mark_for_frag -= overhead; + } } // If it is big data, fragment it. diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index baa4deecb0b..c6b53aaf31c 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -2301,20 +2301,22 @@ uint32_t RTPSParticipantImpl::getMaxDataSize() uint32_t RTPSParticipantImpl::calculateMaxDataSize( uint32_t length) { - uint32_t maxDataSize = length; - + // RTPS header + uint32_t overhead = RTPSMESSAGE_HEADER_SIZE; #if HAVE_SECURITY // If there is rtps messsage protection, reduce max size for messages, // because extra data is added on encryption. if (security_attributes_.is_rtps_protected) { - maxDataSize -= m_security_manager.calculate_extra_size_for_rtps_message(); + overhead += m_security_manager.calculate_extra_size_for_rtps_message(); } #endif // if HAVE_SECURITY - // RTPS header - maxDataSize -= RTPSMESSAGE_HEADER_SIZE; - return maxDataSize; + if (length <= overhead) + { + return 0; + } + return length - overhead; } bool RTPSParticipantImpl::networkFactoryHasRegisteredTransports() const diff --git a/src/cpp/rtps/writer/BaseWriter.cpp b/src/cpp/rtps/writer/BaseWriter.cpp index 5402955f052..9e79250b56f 100644 --- a/src/cpp/rtps/writer/BaseWriter.cpp +++ b/src/cpp/rtps/writer/BaseWriter.cpp @@ -189,30 +189,38 @@ uint32_t BaseWriter::calculate_max_payload_size( constexpr uint32_t heartbeat_message_length = 32; uint32_t max_data_size = mp_RTPSParticipant->calculateMaxDataSize(datagram_length); - - max_data_size -= info_dst_message_length + + uint32_t overhead = info_dst_message_length + info_ts_message_length + data_frag_submessage_header_length + heartbeat_message_length; - //TODO(Ricardo) inlineqos in future. - #if HAVE_SECURITY if (getAttributes().security_attributes().is_submessage_protected) { - max_data_size -= mp_RTPSParticipant->security_manager().calculate_extra_size_for_rtps_submessage(m_guid); + overhead += mp_RTPSParticipant->security_manager().calculate_extra_size_for_rtps_submessage(m_guid); } if (getAttributes().security_attributes().is_payload_protected) { - max_data_size -= mp_RTPSParticipant->security_manager().calculate_extra_size_for_encoded_payload(m_guid); + overhead += mp_RTPSParticipant->security_manager().calculate_extra_size_for_encoded_payload(m_guid); } #endif // if HAVE_SECURITY #ifdef FASTDDS_STATISTICS - max_data_size -= eprosima::fastdds::statistics::rtps::statistics_submessage_length; + overhead += eprosima::fastdds::statistics::rtps::statistics_submessage_length; #endif // FASTDDS_STATISTICS + constexpr uint32_t min_fragment_size = 4; + if ((overhead + min_fragment_size) > max_data_size) + { + auto min_datagram_length = overhead + min_fragment_size + 1 + (datagram_length - max_data_size); + EPROSIMA_LOG_ERROR(RTPS_WRITER, "Datagram length '" << datagram_length << "' is too small." << + "At least " << min_datagram_length << " bytes are needed to send a message. Fixing fragments to " << + min_fragment_size << " bytes."); + return min_fragment_size; + } + + max_data_size -= overhead; return max_data_size; } diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 83f13b811c2..513de562d92 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -2984,7 +2984,7 @@ TEST(DDSStatus, sample_rejected_waitset) .disable_heartbeat_piggyback(true) .asynchronously(eprosima::fastdds::dds::PublishModeQosPolicyKind::ASYNCHRONOUS_PUBLISH_MODE) .add_flow_controller_descriptor_to_pparams( // Be sure are sent in separate submessage each DATA. - eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::FIFO, 100, 50) + eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::FIFO, 300, 300) // Be sure the first message is processed before sending the second. .init(); reader.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) diff --git a/test/unittest/rtps/history/CMakeLists.txt b/test/unittest/rtps/history/CMakeLists.txt index c5f8fb1401b..1fe07d13ee6 100644 --- a/test/unittest/rtps/history/CMakeLists.txt +++ b/test/unittest/rtps/history/CMakeLists.txt @@ -76,6 +76,8 @@ set(TOPICPAYLOADPOOLTESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp) +set(WRITERHISTORYTESTS_SOURCE WriterHistoryTests.cpp) + if(WIN32) add_definitions(-D_WIN32_WINNT=0x0601) endif() @@ -164,3 +166,20 @@ target_link_libraries(TopicPayloadPoolTests GTest::gtest ${CMAKE_DL_LIBS}) gtest_discover_tests(TopicPayloadPoolTests) + + + +add_executable(WriterHistoryTests ${WRITERHISTORYTESTS_SOURCE}) +target_compile_definitions(WriterHistoryTests PRIVATE + BOOST_ASIO_STANDALONE + ASIO_STANDALONE + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. + ) +target_link_libraries(WriterHistoryTests + fastcdr + fastdds + foonathan_memory + GTest::gtest + ${CMAKE_DL_LIBS}) +gtest_discover_tests(WriterHistoryTests) diff --git a/test/unittest/rtps/history/WriterHistoryTests.cpp b/test/unittest/rtps/history/WriterHistoryTests.cpp new file mode 100644 index 00000000000..3082120e7d4 --- /dev/null +++ b/test/unittest/rtps/history/WriterHistoryTests.cpp @@ -0,0 +1,119 @@ +// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include + + +namespace eprosima { +namespace fastdds { +namespace rtps { + +using namespace testing; + +#define MAX_MESSAGE_SIZE 300 + +void cache_change_fragment( + uint32_t max_message_size, + uint32_t inline_qos_length, + bool expected_fragmentation) +{ + uint32_t domain_id = 0; + uint32_t initial_reserved_caches = 10; + std::string max_message_size_str = std::to_string(max_message_size); + + RTPSParticipantAttributes p_attr; + p_attr.properties.properties().emplace_back("fastdds.max_message_size", max_message_size_str); + RTPSParticipant* participant = RTPSDomain::createParticipant( + domain_id, true, p_attr); + + ASSERT_NE(participant, nullptr); + + HistoryAttributes h_attr; + h_attr.memoryPolicy = DYNAMIC_RESERVE_MEMORY_MODE; + h_attr.initialReservedCaches = initial_reserved_caches; + h_attr.payloadMaxSize = 250; + WriterHistory* history = new WriterHistory(h_attr); + + WriterAttributes w_attr; + RTPSWriter* writer = RTPSDomain::createRTPSWriter(participant, w_attr, history); + + ASSERT_NE(writer, nullptr); + + CacheChange_t* change = history->create_change(ALIVE); + if (expected_fragmentation) + { + change->serializedPayload.length = 3 * max_message_size; + } + else + { + change->serializedPayload.length = max_message_size / 3; + } + change->inline_qos.length = inline_qos_length; + history->add_change(change); + + auto result = change->getFragmentSize(); + std::cout << "Fragment size: " << result << std::endl; + if (expected_fragmentation) + { + ASSERT_NE(result, 0); + } + else + { + ASSERT_EQ(result, 0); + } +} + +/** + * This test checks the get_max_allowed_payload_size() method of the BaseWriter class. + * When setting the RTPS Participant Attribute property fastdds.max_message_size to a value lower than the + * message overhead, if the method does not overflow the fragment size will be set. + * If the max_message_size is big enough for the overhead, inline_qos and serializedPayload, + * then no fragmentation will occur. + */ +TEST(WriterHistoryTests, get_max_allowed_payload_size_overflow) +{ + cache_change_fragment(100, 0, true); + cache_change_fragment(MAX_MESSAGE_SIZE, 0, false); +} + +/** + * This test checks the fragment size calculation for a cache change depending on the inline qos length. + * The change.serializedPayload.length is set to 3 times the max_allowed_payload_size, so the fragment size should always be set. + * In case of an overflow in the attribute high_mark_for_frag_ the fragment size will not be set, which is an error. + */ +TEST(WriterHistoryTests, final_high_mark_for_frag_overflow) +{ + for (uint32_t inline_qos_length = 0; inline_qos_length < MAX_MESSAGE_SIZE; inline_qos_length += 40) + { + cache_change_fragment(MAX_MESSAGE_SIZE, inline_qos_length, true); + } +} + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}