From 54b8494ef183275371a7053ff3a5aa6b7c951873 Mon Sep 17 00:00:00 2001 From: Javier Gil Aviles Date: Wed, 11 Dec 2024 09:33:10 +0100 Subject: [PATCH] Test CI for benchmark example and issue fixing Signed-off-by: Javier Gil Aviles --- examples/cpp/benchmark/Benchmark.hpp | 8 +- examples/cpp/benchmark/Benchmark_big.hpp | 15 +- examples/cpp/benchmark/Benchmark_medium.hpp | 15 +- examples/cpp/benchmark/Benchmark_small.hpp | 15 +- examples/cpp/benchmark/CLIParser.hpp | 13 +- examples/cpp/benchmark/PublisherApp.cpp | 290 +++++++++++++------- examples/cpp/benchmark/PublisherApp.hpp | 6 +- examples/cpp/benchmark/README.md | 105 ++++++- examples/cpp/benchmark/SubscriberApp.cpp | 98 ++++++- test/examples/benchmark.compose.yml | 46 ++++ test/examples/test_benchmark.py | 80 ++++++ 11 files changed, 540 insertions(+), 151 deletions(-) create mode 100644 test/examples/benchmark.compose.yml create mode 100644 test/examples/test_benchmark.py diff --git a/examples/cpp/benchmark/Benchmark.hpp b/examples/cpp/benchmark/Benchmark.hpp index 97dc5abba21..3600c5947e4 100644 --- a/examples/cpp/benchmark/Benchmark.hpp +++ b/examples/cpp/benchmark/Benchmark.hpp @@ -22,7 +22,7 @@ #ifndef FAST_DDS_GENERATED__BENCHMARK_HPP #define FAST_DDS_GENERATED__BENCHMARK_HPP -#include +#include #include #if defined(_WIN32) @@ -78,7 +78,7 @@ class BenchMark eProsima_user_DllExport BenchMark( const BenchMark& x) { - m_index = x.m_index; + m_index = x.m_index; } @@ -100,7 +100,7 @@ class BenchMark const BenchMark& x) { - m_index = x.m_index; + m_index = x.m_index; return *this; } @@ -165,8 +165,6 @@ class BenchMark return m_index; } - - private: uint32_t m_index{0}; diff --git a/examples/cpp/benchmark/Benchmark_big.hpp b/examples/cpp/benchmark/Benchmark_big.hpp index 6f21494eb1d..f76f126878b 100644 --- a/examples/cpp/benchmark/Benchmark_big.hpp +++ b/examples/cpp/benchmark/Benchmark_big.hpp @@ -22,7 +22,7 @@ #ifndef FAST_DDS_GENERATED__BENCHMARK_BIG_HPP #define FAST_DDS_GENERATED__BENCHMARK_BIG_HPP -#include +#include #include #include @@ -79,9 +79,9 @@ class BenchMarkBig eProsima_user_DllExport BenchMarkBig( const BenchMarkBig& x) { - m_data = x.m_data; + m_data = x.m_data; - m_index = x.m_index; + m_index = x.m_index; } @@ -104,9 +104,9 @@ class BenchMarkBig const BenchMarkBig& x) { - m_data = x.m_data; + m_data = x.m_data; - m_index = x.m_index; + m_index = x.m_index; return *this; } @@ -132,7 +132,7 @@ class BenchMarkBig const BenchMarkBig& x) const { return (m_data == x.m_data && - m_index == x.m_index); + m_index == x.m_index); } /*! @@ -183,7 +183,6 @@ class BenchMarkBig return m_data; } - /*! * @brief This function sets a value in member index * @param _index New value for member index @@ -212,8 +211,6 @@ class BenchMarkBig return m_index; } - - private: std::array m_data{0}; diff --git a/examples/cpp/benchmark/Benchmark_medium.hpp b/examples/cpp/benchmark/Benchmark_medium.hpp index 36b437c68c1..5721540209f 100644 --- a/examples/cpp/benchmark/Benchmark_medium.hpp +++ b/examples/cpp/benchmark/Benchmark_medium.hpp @@ -22,7 +22,7 @@ #ifndef FAST_DDS_GENERATED__BENCHMARK_MEDIUM_HPP #define FAST_DDS_GENERATED__BENCHMARK_MEDIUM_HPP -#include +#include #include #include @@ -79,9 +79,9 @@ class BenchMarkMedium eProsima_user_DllExport BenchMarkMedium( const BenchMarkMedium& x) { - m_data = x.m_data; + m_data = x.m_data; - m_index = x.m_index; + m_index = x.m_index; } @@ -104,9 +104,9 @@ class BenchMarkMedium const BenchMarkMedium& x) { - m_data = x.m_data; + m_data = x.m_data; - m_index = x.m_index; + m_index = x.m_index; return *this; } @@ -132,7 +132,7 @@ class BenchMarkMedium const BenchMarkMedium& x) const { return (m_data == x.m_data && - m_index == x.m_index); + m_index == x.m_index); } /*! @@ -183,7 +183,6 @@ class BenchMarkMedium return m_data; } - /*! * @brief This function sets a value in member index * @param _index New value for member index @@ -212,8 +211,6 @@ class BenchMarkMedium return m_index; } - - private: std::array m_data{0}; diff --git a/examples/cpp/benchmark/Benchmark_small.hpp b/examples/cpp/benchmark/Benchmark_small.hpp index 087beb44aca..10e48606836 100644 --- a/examples/cpp/benchmark/Benchmark_small.hpp +++ b/examples/cpp/benchmark/Benchmark_small.hpp @@ -22,7 +22,7 @@ #ifndef FAST_DDS_GENERATED__BENCHMARK_SMALL_HPP #define FAST_DDS_GENERATED__BENCHMARK_SMALL_HPP -#include +#include #include #include @@ -79,9 +79,9 @@ class BenchMarkSmall eProsima_user_DllExport BenchMarkSmall( const BenchMarkSmall& x) { - m_array = x.m_array; + m_array = x.m_array; - m_index = x.m_index; + m_index = x.m_index; } @@ -104,9 +104,9 @@ class BenchMarkSmall const BenchMarkSmall& x) { - m_array = x.m_array; + m_array = x.m_array; - m_index = x.m_index; + m_index = x.m_index; return *this; } @@ -132,7 +132,7 @@ class BenchMarkSmall const BenchMarkSmall& x) const { return (m_array == x.m_array && - m_index == x.m_index); + m_index == x.m_index); } /*! @@ -183,7 +183,6 @@ class BenchMarkSmall return m_array; } - /*! * @brief This function sets a value in member index * @param _index New value for member index @@ -212,8 +211,6 @@ class BenchMarkSmall return m_index; } - - private: std::array m_array{0}; diff --git a/examples/cpp/benchmark/CLIParser.hpp b/examples/cpp/benchmark/CLIParser.hpp index e5466b5bc7f..bb39f6cc980 100644 --- a/examples/cpp/benchmark/CLIParser.hpp +++ b/examples/cpp/benchmark/CLIParser.hpp @@ -42,6 +42,7 @@ class CLIParser std::string topic_name = "benchmark_topic"; eprosima::fastdds::rtps::BuiltinTransports transport = eprosima::fastdds::rtps::BuiltinTransports::DEFAULT; ReliabilityQosPolicyKind reliability = ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS; + DurabilityQosPolicyKind durability = DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS; }; public: @@ -70,8 +71,8 @@ class CLIParser struct publisher_config : public entity_config { uint16_t wait = 1000; - uint32_t interval = 100; - uint32_t end = 10000; + uint16_t interval = 100; + uint16_t end = 10000; CLIParser::MsgSizeKind msg_size = CLIParser::MsgSizeKind::NONE; }; @@ -113,6 +114,8 @@ class CLIParser std::cout << " (Default: benchmark_topic)" << std::endl; std::cout << " -r, --reliable Set Reliability QoS as reliable" << std::endl; std::cout << " (Default: best effort)" << std::endl; + std::cout << " --transient-local Set Durability QoS as transient local" << std::endl; + std::cout << " (Default: volatile)" << std::endl; std::cout << " -m , --msg-size Size of the message" << std::endl; std::cout << " · NONE: Only an int value" << std::endl; std::cout << " · SMALL: int value + array of 16Kb" << std::endl; @@ -493,6 +496,12 @@ class CLIParser print_help(EXIT_FAILURE); } } + else if (arg == "--transient-local") + { + config.pub_config.durability = DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS; + config.sub_config.durability = DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS; + + } else { EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing argument: " + arg); diff --git a/examples/cpp/benchmark/PublisherApp.cpp b/examples/cpp/benchmark/PublisherApp.cpp index 74a1cbbd46a..886e898affa 100644 --- a/examples/cpp/benchmark/PublisherApp.cpp +++ b/examples/cpp/benchmark/PublisherApp.cpp @@ -67,6 +67,7 @@ PublisherApp::PublisherApp( , count(0) , vSamples(0) , startTime(std::chrono::steady_clock::now()) + , sent(0) { // Create the participant @@ -90,22 +91,22 @@ PublisherApp::PublisherApp( } // Register and set up the data type with initial values - if(msg_size_ == CLIParser::MsgSizeKind::NONE) + if (msg_size_ == CLIParser::MsgSizeKind::NONE) { benchmark_.index(0); type_ = TypeSupport(new BenchMarkPubSubType()); } - else if(msg_size_ == CLIParser::MsgSizeKind::SMALL) + else if (msg_size_ == CLIParser::MsgSizeKind::SMALL) { benchmark_small_.index(0); type_ = TypeSupport(new BenchMarkSmallPubSubType()); } - else if(msg_size_ == CLIParser::MsgSizeKind::MEDIUM) + else if (msg_size_ == CLIParser::MsgSizeKind::MEDIUM) { benchmark_medium_.index(0); type_ = TypeSupport(new BenchMarkMediumPubSubType()); } - else if(msg_size_ == CLIParser::MsgSizeKind::BIG) + else if (msg_size_ == CLIParser::MsgSizeKind::BIG) { benchmark_big_.index(0); type_ = TypeSupport(new BenchMarkBigPubSubType()); @@ -138,6 +139,7 @@ PublisherApp::PublisherApp( // Create the data writer DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT; writer_qos.reliability().kind = config.reliability; + writer_qos.durability().kind = config.durability; publisher_->get_default_datawriter_qos(writer_qos); writer_ = publisher_->create_datawriter(topic_pub_, writer_qos, this, StatusMask::all()); if (writer_ == nullptr) @@ -158,6 +160,7 @@ PublisherApp::PublisherApp( DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; subscriber_->get_default_datareader_qos(reader_qos); reader_qos.reliability().kind = config.reliability; + reader_qos.durability().kind = config.durability; reader_ = subscriber_->create_datareader(topic_sub_, reader_qos, this, StatusMask::all()); if (reader_ == nullptr) { @@ -183,13 +186,9 @@ void PublisherApp::on_publication_matched( { if (info.current_count_change == 1) { - if(matched_ == 0) - { - std::cout << "Publisher matched. Test starts..." << std::endl; - startTime = std::chrono::steady_clock::now(); - } - matched_ = static_cast(info.current_count); + startTime = std::chrono::steady_clock::now(); std::cout << "Publisher matched." << std::endl; + matched_++; cv_.notify_one(); } else if (info.current_count_change == -1) @@ -211,6 +210,8 @@ void PublisherApp::on_subscription_matched( if (info.current_count_change == 1) { std::cout << "Subscriber matched." << std::endl; + matched_++; + cv_.notify_one(); } else if (info.current_count_change == -1) { @@ -224,139 +225,204 @@ void PublisherApp::on_subscription_matched( } void PublisherApp::on_data_available( - DataReader* reader) + DataReader* reader) { SampleInfo info; + + auto actualTime = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(actualTime - startTime); + switch (msg_size_) { - case CLIParser::MsgSizeKind::NONE: - while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&benchmark_, &info))) - { - if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) + case CLIParser::MsgSizeKind::NONE: + while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&benchmark_, &info))) { - if (benchmark_.index() > count) - { - benchmark_.index(count); - } - else + if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) { - benchmark_.index(benchmark_.index() + 1); + std::cout << "Sample with index: '" << + benchmark_.index() << "' (0 Bytes) RECEIVED" << std::endl; + sent = 0; + if (elapsed.count() >= end_) + { + cv_.notify_one(); + return; + } + if (benchmark_.index() > count) + { + benchmark_.index(count); + } + else + { + benchmark_.index(benchmark_.index() + 1); + } + + count = benchmark_.index() + 1; + if ((RETCODE_OK == writer_->write(&benchmark_)) == true) + { + sent = 1; + std::cout << "Sample with index: '" << + benchmark_.index() << "' (0 Bytes) SENT" << std::endl; + } } - - count = benchmark_.index() + 1; - writer_->write(&benchmark_); } - } - break; + break; - case CLIParser::MsgSizeKind::SMALL: - while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&benchmark_small_, &info))) - { - if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) + case CLIParser::MsgSizeKind::SMALL: + while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&benchmark_small_, &info))) { - if (benchmark_small_.index() > count) + if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) { - benchmark_small_.index(count); + std::cout << "Sample with index: '" << + benchmark_small_.index() << "' (" << static_cast(benchmark_small_.array().size()) << + " Bytes) RECEIVED" << std::endl; + sent = 0; + if (elapsed.count() >= end_) + { + cv_.notify_one(); + return; + } + if (benchmark_small_.index() > count) + { + benchmark_small_.index(count); + } + else + { + benchmark_small_.index(benchmark_small_.index() + 1); + } + + count = benchmark_small_.index() + 1; + if ((RETCODE_OK == writer_->write(&benchmark_small_)) == true) + { + sent = 1; + std::cout << "Sample with index: '" << + benchmark_small_.index() << "' (" << static_cast(benchmark_small_.array().size()) << + " Bytes) SENT" << std::endl; + } } - else - { - benchmark_small_.index(benchmark_small_.index() + 1); - } - - count = benchmark_small_.index() + 1; - writer_->write(&benchmark_small_); } - } - break; + break; - case CLIParser::MsgSizeKind::MEDIUM: - while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&benchmark_medium_, &info))) - { - if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) + case CLIParser::MsgSizeKind::MEDIUM: + while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&benchmark_medium_, &info))) { - if (benchmark_medium_.index() > count) - { - benchmark_medium_.index(count); - } - else + if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) { - benchmark_medium_.index(benchmark_medium_.index() + 1); + std::cout << "Sample with index: '" << + benchmark_medium_.index() << "' (" << static_cast(benchmark_medium_.data().size()) << + " Bytes) RECEIVED" << std::endl; + sent = 0; + if (elapsed.count() >= end_) + { + cv_.notify_one(); + return; + } + if (benchmark_medium_.index() > count) + { + benchmark_medium_.index(count); + } + else + { + benchmark_medium_.index(benchmark_medium_.index() + 1); + } + + count = benchmark_medium_.index() + 1; + if ((RETCODE_OK == writer_->write(&benchmark_medium_)) == true) + { + sent = 1; + std::cout << "Sample with index: '" << + benchmark_medium_.index() << "' (" << static_cast(benchmark_medium_.data().size()) << + " Bytes) SENT" << std::endl; + } } - - count = benchmark_medium_.index() + 1; - writer_->write(&benchmark_medium_); } - } - break; + break; - case CLIParser::MsgSizeKind::BIG: - while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&benchmark_big_, &info))) - { - if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) + case CLIParser::MsgSizeKind::BIG: + while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&benchmark_big_, &info))) { - if (benchmark_big_.index() > count) - { - benchmark_big_.index(count); - } - else + if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) { - benchmark_big_.index(benchmark_big_.index() + 1); + std::cout << "Sample with index: '" << + benchmark_big_.index() << "' (" << static_cast(benchmark_big_.data().size()) << + " Bytes) RECEIVED" << std::endl; + sent = 0; + if (elapsed.count() >= end_) + { + cv_.notify_one(); + return; + } + if (benchmark_big_.index() > count) + { + benchmark_big_.index(count); + } + else + { + benchmark_big_.index(benchmark_big_.index() + 1); + } + + count = benchmark_big_.index() + 1; + if ((RETCODE_OK == writer_->write(&benchmark_big_)) == true) + { + sent = 1; + std::cout << "Sample with index: '" << + benchmark_big_.index() << "' (" << static_cast(benchmark_big_.data().size()) << + " Bytes) SENT" << std::endl; + } } - - count = benchmark_big_.index() + 1; - writer_->write(&benchmark_big_); } - } - break; + break; - default: - throw std::runtime_error("Type invalid"); + default: + throw std::runtime_error("Type invalid"); } } void PublisherApp::run() { - int prevCount = 0; + uint16_t prevCount = 0; while (!is_stopped() && !publish()) { // Wait for period std::unique_lock initial_lock(mutex_); auto check = cv_.wait_for(initial_lock, std::chrono::milliseconds(1), [&]() - { - return is_stopped(); - }); - if (check){ + { + return is_stopped(); + }); + if (check) + { return; } } { - // Wait for period - std::unique_lock wait_lock(mutex_); - auto check = cv_.wait_for(wait_lock, std::chrono::milliseconds(wait_), [&]() + // Wait for period + std::unique_lock wait_lock(mutex_); + auto check = cv_.wait_for(wait_lock, std::chrono::milliseconds(wait_), [&]() + { + return is_stopped(); + }); + if (check) { - return is_stopped(); - }); - if (check){ - return; - } - count = 0; + return; + } + count = 0; } auto actualTime = std::chrono::steady_clock::now(); auto elapsed = std::chrono::duration_cast(actualTime - startTime); while (!is_stopped() && elapsed.count() < end_) { - vSamples.push_back(count - prevCount); - prevCount = count; // Wait for period or stop event std::unique_lock periodic_lock(mutex_); auto check = cv_.wait_for(periodic_lock, std::chrono::milliseconds(period_ms_), [&]() - { - return is_stopped(); - }); - if (check){ + { + return is_stopped(); + }); + if (check) + { return; } + vSamples.push_back(count - prevCount); + prevCount = count; actualTime = std::chrono::steady_clock::now(); elapsed = std::chrono::duration_cast(actualTime - startTime); } @@ -389,6 +455,13 @@ void PublisherApp::run() std::cout << vSamples[i] << ","; } std::cout << std::endl; + + // Wait in case a response is still nedded + std::unique_lock final_lock(mutex_); + cv_.wait(final_lock, [&]() + { + return is_stopped() || (sent == 0); + }); } bool PublisherApp::publish() @@ -397,10 +470,10 @@ bool PublisherApp::publish() // Wait for the data endpoints discovery std::unique_lock matched_lock(mutex_); cv_.wait(matched_lock, [&]() - { - // at least one has been discovered - return ((matched_ > 0) || is_stopped()); - }); + { + // at least one has been discovered + return ((matched_ == 2) || is_stopped()); + }); if (!is_stopped()) { switch (msg_size_) @@ -408,21 +481,44 @@ bool PublisherApp::publish() case CLIParser::MsgSizeKind::NONE: benchmark_.index(0); ret = (RETCODE_OK == writer_->write(&benchmark_)); + if (ret == true) + { + std::cout << "First Sample with index: '" + << benchmark_.index() << "'(0 Bytes) SENT" << std::endl; + } break; case CLIParser::MsgSizeKind::SMALL: benchmark_small_.index(0); ret = (RETCODE_OK == writer_->write(&benchmark_small_)); + if (ret == true) + { + std::cout << "First Sample with index: '" + << benchmark_small_.index() << "' (" << static_cast(benchmark_small_.array().size()) + << " Bytes) SENT" << std::endl; + } break; case CLIParser::MsgSizeKind::MEDIUM: benchmark_medium_.index(0); ret = (RETCODE_OK == writer_->write(&benchmark_medium_)); + if (ret == true) + { + std::cout << "First Sample with index: '" + << benchmark_medium_.index() << "' (" << static_cast(benchmark_medium_.data().size()) + << " Bytes) SENT" << std::endl; + } break; case CLIParser::MsgSizeKind::BIG: benchmark_big_.index(0); ret = (RETCODE_OK == writer_->write(&benchmark_big_)); + if (ret == true) + { + std::cout << "First Sample with index: '" + << benchmark_big_.index() << "' (" << static_cast(benchmark_big_.data().size()) + << " Bytes) SENT" << std::endl; + } break; default: diff --git a/examples/cpp/benchmark/PublisherApp.hpp b/examples/cpp/benchmark/PublisherApp.hpp index 7240e3632f6..fb224610e9e 100644 --- a/examples/cpp/benchmark/PublisherApp.hpp +++ b/examples/cpp/benchmark/PublisherApp.hpp @@ -108,9 +108,9 @@ class PublisherApp : public Application, public DataWriterListener, public DataR std::mutex mutex_; - uint32_t period_ms_; + uint16_t period_ms_; - int16_t wait_; + uint16_t wait_; uint16_t end_; @@ -125,6 +125,8 @@ class PublisherApp : public Application, public DataWriterListener, public DataR std::vector vSamples; std::chrono::time_point startTime; + + uint8_t sent; }; } // namespace benchmark diff --git a/examples/cpp/benchmark/README.md b/examples/cpp/benchmark/README.md index 29f3a0a51ad..3842a96307d 100644 --- a/examples/cpp/benchmark/README.md +++ b/examples/cpp/benchmark/README.md @@ -9,6 +9,7 @@ In this case, the *benchmark* example allows to select between different message * [Description of the example](#description-of-the-example) * [Run the example](#run-the-example) * [Configuration](#configuration) +* [XML profile playground](#xml-profile-playground) ## Description of the example @@ -61,28 +62,53 @@ All the example available flags can be queried running the executable with the ` ### Expected output -Regardless of which application is run first, since the publisher will not start sending data until a subscriber is discovered, the expected output both for publishers and subscribers is a first displayed message acknowledging the match, followed by the amount of samples sent or received until Ctrl+C is pressed. +Regardless of which application is run first, since the publisher will not start sending data until a subscriber is discovered, the expected output both for publishers and subscribers is a first displayed message acknowledging the match, followed by the sent and received data in each application, and finishing showing in the publisher the amount of data shared in each sample time during the running time. ### Benchmark publisher ```shell -Publisher running for 2792 milliseconds. Please press Ctrl+C to stop the Publisher at any time. +Publisher running for 1000 milliseconds. Please press Ctrl+C to stop the Publisher at any time. Subscriber matched. -Publisher matched. Test starts... Publisher matched. -RESULTS after 2792 milliseconds: -COUNT: 970 -SAMPLES: 0,51,54,52,52,54,52,52,54,54,54,56,54,54,54,54,54,56, +First Sample with index: '0' (8388608 Bytes) SENT +Sample with index: '1' (8388608 Bytes) RECEIVED +Sample with index: '0' (8388608 Bytes) SENT +Sample with index: '1' (8388608 Bytes) RECEIVED +Sample with index: '0' (8388608 Bytes) SENT +Sample with index: '1' (8388608 Bytes) RECEIVED +Sample with index: '2' (8388608 Bytes) SENT +Sample with index: '3' (8388608 Bytes) RECEIVED +Sample with index: '4' (8388608 Bytes) SENT +Sample with index: '5' (8388608 Bytes) RECEIVED +... +Sample with index: '646' (8388608 Bytes) SENT +Sample with index: '647' (8388608 Bytes) RECEIVED +RESULTS after 1014 milliseconds: +COUNT: 647 +SAMPLES: 37,46,52,94,86,94,84,62,52,40, ... ``` ### Benchmark subscriber ```shell -Subscriber running. Please press Ctrl+C to stop the Subscriber at any time. -Subscriber matched. Publisher matched. - +Subscriber matched. +Subscriber running. Please press Ctrl+C to stop the Subscriber at any time. +Sample with index: '0' (8388608 Bytes) RECEIVED +Sample with index: '1' (8388608 Bytes) SENT +Sample with index: '0' (8388608 Bytes) RECEIVED +Sample with index: '1' (8388608 Bytes) SENT +Sample with index: '0' (8388608 Bytes) RECEIVED +Sample with index: '1' (8388608 Bytes) SENT +Sample with index: '2' (8388608 Bytes) RECEIVED +Sample with index: '3' (8388608 Bytes) SENT +Sample with index: '4' (8388608 Bytes) RECEIVED +Sample with index: '5' (8388608 Bytes) SENT +... +Sample with index: '646' (8388608 Bytes) RECEIVED +Sample with index: '647' (8388608 Bytes) SENT +Publisher unmatched. ... ``` @@ -93,7 +119,6 @@ The following is a possible output of the publisher application when stopping th ... Publisher running for 10000 milliseconds. Please press Ctrl+C to stop the Publisher at any time. Subscriber matched. -Publisher matched. Test starts... Publisher matched. Publisher unmatched. Subscriber unmatched. @@ -152,6 +177,40 @@ The following table represents the compatibility matrix (compatible ✔️ vs in +### Durability QoS + +Using argument **`--transient-local`** will configure the corresponding endpoint with **`TRANSIENT_LOCAL`** durability QoS. +If the argument is not provided, by default it is configured as **`VOLATILE`**. + +Whereas **`VOLATILE`** does not store samples for late-joining subscribers, **`TRANSIENT_LOCAL`** ensures that samples are stored and delivered to any late-joining subscribers. + +**Note**: **`TRANSIENT_LOCAL`** option may require additional resources to store the samples until they are acknowledged by all subscribers. + +Moreover, there is a compatibility rule between data readers and data writers, where the durability QoS kind is checked to ensure the expected behavior. +The following table represents the compatibility matrix (compatible ✔️ vs incompatible ✖️): + + + + + + + + + + + + + + + + + + + + + +
Data writer durability QoS kind
VolatileTransient Local
Data reader
durability QoS kind
Volatile✔️✔️
Transient Local✖️✔️
+ ### Message Size Using argument **`-m`** `` or **`--msg-size`** `` configures the size of the message payload. @@ -200,3 +259,29 @@ This parameter controls how long the publisher or subscriber remains active. - **Range**: `[1 <= <= 4294967]` - **Default**: `10000` (10 seconds) + +## XML profile playground + +The *eProsima Fast DDS* entities can be configured through an XML profile from the environment. +This is accomplished by setting the environment variable ``FASTDDS_DEFAULT_PROFILES_FILE`` to path to the XML profiles file: + +* Ubuntu ( / MacOS ) + + ```shell + user@machine:example_path$ export FASTDDS_DEFAULT_PROFILES_FILE=hello_world_profile.xml + ``` + +* Windows + + ```powershell + example_path> set FASTDDS_DEFAULT_PROFILES_FILE=hello_world_profile.xml + ``` + +The example provides with an XML profiles files with certain QoS: + +- Reliable reliability: avoid sample loss. +- Transient local durability: enable late-join subscriber applications to receive previous samples. +- Keep-last history with high depth: ensure certain amount of previous samples for late-joiners. + +Applying different configurations to the entities will change to a greater or lesser extent how the application behaves in relation to sample management. +Even when these settings affect the behavior of the sample management, the applications' output will be the similar. diff --git a/examples/cpp/benchmark/SubscriberApp.cpp b/examples/cpp/benchmark/SubscriberApp.cpp index 7721ef27b01..2d45bbdaedb 100644 --- a/examples/cpp/benchmark/SubscriberApp.cpp +++ b/examples/cpp/benchmark/SubscriberApp.cpp @@ -84,22 +84,22 @@ SubscriberApp::SubscriberApp( } // Register and set up the data type with initial values - if(msg_size_ == CLIParser::MsgSizeKind::NONE) + if (msg_size_ == CLIParser::MsgSizeKind::NONE) { benchmark_.index(0); type_ = TypeSupport(new BenchMarkPubSubType()); } - else if(msg_size_ == CLIParser::MsgSizeKind::SMALL) + else if (msg_size_ == CLIParser::MsgSizeKind::SMALL) { benchmark_small_.index(0); type_ = TypeSupport(new BenchMarkSmallPubSubType()); } - else if(msg_size_ == CLIParser::MsgSizeKind::MEDIUM) + else if (msg_size_ == CLIParser::MsgSizeKind::MEDIUM) { benchmark_medium_.index(0); type_ = TypeSupport(new BenchMarkMediumPubSubType()); } - else if(msg_size_ == CLIParser::MsgSizeKind::BIG) + else if (msg_size_ == CLIParser::MsgSizeKind::BIG) { benchmark_big_.index(0); type_ = TypeSupport(new BenchMarkBigPubSubType()); @@ -132,6 +132,7 @@ SubscriberApp::SubscriberApp( // Create the data writer DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT; writer_qos.reliability().kind = config.reliability; + writer_qos.durability().kind = config.durability; publisher_->get_default_datawriter_qos(writer_qos); writer_ = publisher_->create_datawriter(topic_pub_, writer_qos, this, StatusMask::all()); if (writer_ == nullptr) @@ -152,6 +153,7 @@ SubscriberApp::SubscriberApp( DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; subscriber_->get_default_datareader_qos(reader_qos); reader_qos.reliability().kind = config.reliability; + reader_qos.durability().kind = config.durability; reader_ = subscriber_->create_datareader(topic_sub_, reader_qos, this, StatusMask::all()); if (reader_ == nullptr) { @@ -185,6 +187,7 @@ void SubscriberApp::on_publication_matched( { matched_ = static_cast(info.current_count); std::cout << "Publisher unmatched." << std::endl; + stop(); } else { @@ -204,6 +207,7 @@ void SubscriberApp::on_subscription_matched( else if (info.current_count_change == -1) { std::cout << "Subscriber unmatched." << std::endl; + stop(); } else { @@ -223,8 +227,26 @@ void SubscriberApp::on_data_available( { if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) { + std::cout << "Sample with index: '" << + benchmark_.index() << "' (0 Bytes) RECEIVED" << std::endl; benchmark_.index(benchmark_.index() + 1); - writer_->write(&benchmark_); + while (matched_ == 0) + { + std::unique_lock initial_lock(mutex_); + auto check = cv_.wait_for(initial_lock, std::chrono::milliseconds(1), [&]() + { + return is_stopped(); + }); + if (check) + { + return; + } + } + if ((RETCODE_OK == writer_->write(&benchmark_)) == true) + { + std::cout << "Sample with index: '" << + benchmark_.index() << "' (0 Bytes) SENT" << std::endl; + } } } break; @@ -234,8 +256,28 @@ void SubscriberApp::on_data_available( { if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) { + std::cout << "Sample with index: '" << + benchmark_small_.index() << "' (" << static_cast(benchmark_small_.array().size()) << + " Bytes) RECEIVED" << std::endl; benchmark_small_.index(benchmark_small_.index() + 1); - writer_->write(&benchmark_small_); + while (matched_ == 0) + { + std::unique_lock initial_lock(mutex_); + auto check = cv_.wait_for(initial_lock, std::chrono::milliseconds(1), [&]() + { + return is_stopped(); + }); + if (check) + { + return; + } + } + if ((RETCODE_OK == writer_->write(&benchmark_small_)) == true) + { + std::cout << "Sample with index: '" << + benchmark_small_.index() << "' (" << static_cast(benchmark_small_.array().size()) << + " Bytes) SENT" << std::endl; + } } } break; @@ -245,8 +287,28 @@ void SubscriberApp::on_data_available( { if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) { + std::cout << "Sample with index: '" << + benchmark_medium_.index() << "' (" << static_cast(benchmark_medium_.data().size()) << + " Bytes) RECEIVED" << std::endl; benchmark_medium_.index(benchmark_medium_.index() + 1); - writer_->write(&benchmark_medium_); + while (matched_ == 0) + { + std::unique_lock initial_lock(mutex_); + auto check = cv_.wait_for(initial_lock, std::chrono::milliseconds(1), [&]() + { + return is_stopped(); + }); + if (check) + { + return; + } + } + if ((RETCODE_OK == writer_->write(&benchmark_medium_)) == true) + { + std::cout << "Sample with index: '" << + benchmark_medium_.index() << "' (" << static_cast(benchmark_medium_.data().size()) << + " Bytes) SENT" << std::endl; + } } } break; @@ -256,8 +318,28 @@ void SubscriberApp::on_data_available( { if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) { + std::cout << "Sample with index: '" << + benchmark_big_.index() << "' (" << static_cast(benchmark_big_.data().size()) << + " Bytes) RECEIVED" << std::endl; benchmark_big_.index(benchmark_big_.index() + 1); - writer_->write(&benchmark_big_); + while (matched_ == 0) + { + std::unique_lock initial_lock(mutex_); + auto check = cv_.wait_for(initial_lock, std::chrono::milliseconds(1), [&]() + { + return is_stopped(); + }); + if (check) + { + return; + } + } + if ((RETCODE_OK == writer_->write(&benchmark_big_)) == true) + { + std::cout << "Sample with index: '" << + benchmark_big_.index() << "' (" << static_cast(benchmark_big_.data().size()) << + " Bytes) SENT" << std::endl; + } } } break; diff --git a/test/examples/benchmark.compose.yml b/test/examples/benchmark.compose.yml new file mode 100644 index 00000000000..5b5fcda7529 --- /dev/null +++ b/test/examples/benchmark.compose.yml @@ -0,0 +1,46 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +services: + subscriber: + image: @DOCKER_IMAGE_NAME@ + network_mode: host + ipc: host + volumes: + - @PROJECT_BINARY_DIR_COMPOSE_VOLUME@ + - @fastcdr_LIB_DIR_COMPOSE_VOLUME@ + - @CMAKE_INSTALL_PREFIX_COMPOSE_VOLUME@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + @PATH_ENVIRONMENT_VARIABLE_COMPOSE@ + EXAMPLE_DIR: @EXAMPLE_PREFIX_DIR_COMPOSE@/benchmark/@EXAMPLE_SUFFIX_DIR_COMPOSE@ + FASTDDS_DEFAULT_PROFILES_FILE: @FASTDDS_DEFAULT_PROFILES_FILE_PREFIX_COMPOSE@/benchmark/benchmark_profile.xml + SUBSCRIBER_ADDITIONAL_ARGUMENTS: ${SUB_ARGS} + command: @SHELL_EXECUTABLE@ -c "@COMMAND_EXAMPLE_DIR_PREFIX_COMPOSE@/benchmark@FILE_EXTENSION@ subscriber $${SUBSCRIBER_ADDITIONAL_ARGUMENTS}" + + publisher: + image: @DOCKER_IMAGE_NAME@ + network_mode: host + ipc: host + volumes: + - @PROJECT_BINARY_DIR_COMPOSE_VOLUME@ + - @fastcdr_LIB_DIR_COMPOSE_VOLUME@ + - @CMAKE_INSTALL_PREFIX_COMPOSE_VOLUME@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + @PATH_ENVIRONMENT_VARIABLE_COMPOSE@ + EXAMPLE_DIR: @EXAMPLE_PREFIX_DIR_COMPOSE@/benchmark/@EXAMPLE_SUFFIX_DIR_COMPOSE@ + FASTDDS_DEFAULT_PROFILES_FILE: @FASTDDS_DEFAULT_PROFILES_FILE_PREFIX_COMPOSE@/benchmark/benchmark_profile.xml + PUBLISHER_ADDITIONAL_ARGUMENTS: ${PUB_ARGS} + command: @SHELL_EXECUTABLE@ -c "@COMMAND_EXAMPLE_DIR_PREFIX_COMPOSE@/benchmark@FILE_EXTENSION@ publisher $${PUBLISHER_ADDITIONAL_ARGUMENTS}" diff --git a/test/examples/test_benchmark.py b/test/examples/test_benchmark.py new file mode 100644 index 00000000000..b6704d06ba6 --- /dev/null +++ b/test/examples/test_benchmark.py @@ -0,0 +1,80 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +import pytest +import re + +config_test_cases = [ + ('--transport DEFAULT --msg-size NONE', '--transport DEFAULT --msg-size NONE'), # Builtin transports + ('--transport DEFAULT --msg-size SMALL', '--transport DEFAULT --msg-size SMALL'), + ('--transport DEFAULT --msg-size MEDIUM', '--transport DEFAULT --msg-size MEDIUM'), + ('--transport DEFAULT --msg-size BIG', '--transport DEFAULT --msg-size BIG'), + ('--transport DEFAULT --msg-size NONE', '--transport UDPv4 --msg-size NONE'), + ('--transport DEFAULT --msg-size SMALL', '--transport UDPv4 --msg-size SMALL'), + ('--transport DEFAULT --msg-size MEDIUM', '--transport UDPv4 --msg-size MEDIUM'), + ('--transport DEFAULT --msg-size BIG', '--transport UDPv4 --msg-size BIG'), + ('--transport SHM --msg-size NONE', '--transport SHM --msg-size NONE'), + ('--transport SHM --msg-size SMALL', '--transport SHM --msg-size SMALL'), + ('--transport SHM --msg-size MEDIUM', '--transport SHM --msg-size MEDIUM'), + ('--transport SHM --msg-size BIG', '--transport SHM --msg-size BIG'), + ('--transport UDPv4 --msg-size NONE', '--transport UDPv4 --msg-size NONE'), + ('--transport UDPv4 --msg-size SMALL', '--transport UDPv4 --msg-size SMALL'), + ('--transport UDPv4 --msg-size MEDIUM', '--transport UDPv4 --msg-size MEDIUM'), + ('--transport UDPv4 --msg-size BIG', '--transport UDPv4 --msg-size BIG'), + ('--transport LARGE_DATA --msg-size NONE', '--transport LARGE_DATA --msg-size NONE'), + ('--transport LARGE_DATA --msg-size SMALL', '--transport LARGE_DATA --msg-size SMALL'), + ('--transport LARGE_DATA --msg-size MEDIUM', '--transport LARGE_DATA --msg-size MEDIUM'), + ('--transport LARGE_DATA --msg-size BIG', '--transport LARGE_DATA --msg-size BIG'), +] + +@pytest.mark.parametrize("pub_args, sub_args", config_test_cases) +def test_benchmark(pub_args, sub_args): + """.""" + ret = False + out = '' + pub_requirements = '--reliable --transient-local -w 0 -e 1000' + sub_requirements = '--reliable --transient-local' + + command_prerequisites = 'PUB_ARGS="' + pub_requirements + ' ' + pub_args + '" SUB_ARGS="' + sub_requirements + ' ' + sub_args + '" ' + + try: + out = subprocess.check_output(command_prerequisites + '@DOCKER_EXECUTABLE@ compose -f benchmark.compose.yml up', + stderr=subprocess.STDOUT, + shell=True, + timeout=5 + ).decode().split('\n') + + sent = 0 + received = 0 + for line in out: + if 'SENT' in line: + sent += 1 + + if 'RECEIVED' in line: + received += 1 + + if sent != 0 and received != 0 and sent == received: + ret = True + else: + print ('ERROR: sent: ' + str(sent) + ', but received: ' + str(received) + ' (expected: ' + str(sent) + ')') + raise subprocess.CalledProcessError(1, out.decode()) + + except subprocess.CalledProcessError as e: + print (e.output) + except subprocess.TimeoutExpired: + print('TIMEOUT') + print(out) + + assert(ret)