Skip to content

Commit da35c5c

Browse files
committed
add support for loaned messages and compute throughput stat
1 parent 45eb987 commit da35c5c

File tree

7 files changed

+127
-65
lines changed

7 files changed

+127
-65
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
performance_test_msgs/PerformanceHeader header
2-
int64 data
2+
int64[1] data

performance_metrics/include/performance_metrics/tracker.hpp

+33-24
Original file line numberDiff line numberDiff line change
@@ -46,62 +46,71 @@ class Tracker
4646
const std::string & node_name,
4747
const std::string & topic_srv_name,
4848
const Options & tracking_options)
49-
: _node_name(node_name), _topic_srv_name(topic_srv_name), _tracking_options(tracking_options)
49+
: m_node_name(node_name), m_topic_srv_name(topic_srv_name), m_tracking_options(tracking_options)
5050
{}
5151

5252
void scan(
5353
const performance_test_msgs::msg::PerformanceHeader & header,
5454
const rclcpp::Time & now,
5555
std::shared_ptr<EventsLogger> elog = nullptr);
5656

57-
void add_sample(uint64_t latency_sample);
57+
void add_sample(
58+
const rclcpp::Time & now,
59+
uint64_t latency_sample,
60+
size_t size,
61+
float frequency);
5862

5963
uint32_t get_and_update_tracking_number();
6064

61-
uint64_t lost() const {return _lost_messages;}
65+
uint64_t lost() const {return m_lost_messages;}
6266

63-
uint64_t late() const {return _late_messages;}
67+
uint64_t late() const {return m_late_messages;}
6468

65-
uint64_t too_late() const {return _too_late_messages;}
69+
uint64_t too_late() const {return m_too_late_messages;}
6670

67-
uint64_t received() const {return _received_messages;}
71+
uint64_t received() const {return m_received_messages;}
6872

69-
size_t size() const {return _size;}
73+
size_t size() const {return m_data_size;}
7074

7175
float frequency() const {return m_frequency;}
7276

73-
Stat<uint64_t> stat() const {return _stat;}
77+
Stat<uint64_t> stat() const {return m_stat;}
78+
79+
double throughput() const;
7480

7581
void set_frequency(float f) {m_frequency = f;}
7682

77-
void set_size(size_t s) {_size = s;}
83+
void set_size(size_t s) {m_data_size = s;}
7884

79-
uint64_t last() const {return _last_latency;}
85+
uint64_t last() const {return m_last_latency;}
8086

8187
std::string get_node_name() const
8288
{
83-
return _node_name;
89+
return m_node_name;
8490
}
8591

8692
std::string get_entity_name() const
8793
{
88-
return _topic_srv_name;
94+
return m_topic_srv_name;
8995
}
9096

9197
private:
92-
std::string _node_name;
93-
std::string _topic_srv_name;
94-
95-
uint64_t _last_latency = 0;
96-
uint64_t _lost_messages = 0;
97-
uint64_t _received_messages = 0;
98-
uint64_t _late_messages = 0;
99-
uint64_t _too_late_messages = 0;
100-
size_t _size = 0;
98+
std::string m_node_name;
99+
std::string m_topic_srv_name;
100+
Options m_tracking_options;
101+
102+
uint64_t m_last_latency = 0;
103+
uint64_t m_lost_messages = 0;
104+
uint64_t m_received_messages = 0;
105+
uint64_t m_late_messages = 0;
106+
uint64_t m_too_late_messages = 0;
107+
size_t m_data_size = 0;
101108
float m_frequency = 0;
102-
Stat<uint64_t> _stat;
103-
uint32_t _tracking_number_count = 0;
104-
Options _tracking_options;
109+
Stat<uint64_t> m_stat;
110+
uint32_t m_tracking_number_count = 0;
111+
112+
rclcpp::Time m_first_msg_time;
113+
rclcpp::Time m_last_msg_time;
105114
};
106115

107116
} // namespace performance_metrics

performance_metrics/src/stat_logger.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ void log_latency_all_stats(
8888
stream << std::left << std::setw(narrow_space) << std::setfill(separator) << "min[us]";
8989
stream << std::left << std::setw(narrow_space) << std::setfill(separator) << "max[us]";
9090
stream << std::left << std::setw(narrow_space) << std::setfill(separator) << "freq[hz]";
91+
stream << std::left << std::setw(wide_space) << std::setfill(separator) <<
92+
"throughput[Mb/s]";
9193

9294
stream << std::endl;
9395
};
@@ -119,6 +121,8 @@ void log_latency_all_stats(
119121
std::round(tracker.stat().max());
120122
stream << std::left << std::setw(narrow_space) << std::setfill(separator) <<
121123
tracker.frequency();
124+
stream << std::left << std::setw(wide_space) << std::setfill(separator) <<
125+
(tracker.throughput() / (1024 * 1024));
122126

123127
stream << std::endl;
124128
};

performance_metrics/src/tracker.cpp

+51-29
Original file line numberDiff line numberDiff line change
@@ -21,38 +21,32 @@ void Tracker::scan(
2121
const rclcpp::Time & now,
2222
std::shared_ptr<EventsLogger> elog)
2323
{
24-
// If this is first message received store some info about it
25-
if (stat().n() == 0) {
26-
this->set_size(header.size);
27-
this->set_frequency(header.frequency);
28-
}
29-
3024
// Compute latency
3125
rclcpp::Time stamp(header.stamp.sec, header.stamp.nanosec, RCL_ROS_TIME);
3226
auto lat = std::chrono::nanoseconds((now - stamp).nanoseconds());
3327
uint64_t lat_us = lat.count() / 1000;
3428
// store the last latency to be read from node
35-
_last_latency = lat_us;
29+
m_last_latency = lat_us;
3630

3731
bool late = false;
3832
bool too_late = false;
3933

40-
if (_tracking_options.is_enabled) {
34+
if (m_tracking_options.is_enabled) {
4135
// Check if we received the correct message. The assumption here is
4236
// that the messages arrive in chronological order
43-
if (header.tracking_number == _tracking_number_count) {
44-
_tracking_number_count++;
37+
if (header.tracking_number == m_tracking_number_count) {
38+
m_tracking_number_count++;
4539
} else {
4640
// We missed some mesages...
47-
int64_t n_lost = header.tracking_number - _tracking_number_count;
48-
_lost_messages += n_lost;
49-
_tracking_number_count = header.tracking_number + 1;
41+
int64_t n_lost = header.tracking_number - m_tracking_number_count;
42+
m_lost_messages += n_lost;
43+
m_tracking_number_count = header.tracking_number + 1;
5044

5145
// Log the event
5246
if (elog != nullptr) {
5347
EventsLogger::Event ev;
5448
std::stringstream description;
55-
ev.caller_name = _topic_srv_name + "->" + _node_name;
49+
ev.caller_name = m_topic_srv_name + "->" + m_node_name;
5650
ev.code = EventsLogger::EventCode::lost_messages;
5751

5852
if (n_lost == 1) {
@@ -67,13 +61,13 @@ void Tracker::scan(
6761
}
6862

6963
// Check if the message latency qualifies the message as a lost or late message.
70-
const int period_us = 1000000 / m_frequency;
64+
const int period_us = 1000000 / header.frequency;
7165
const unsigned int latency_late_threshold_us = std::min(
72-
_tracking_options.late_absolute_us,
73-
_tracking_options.late_percentage * period_us / 100);
66+
m_tracking_options.late_absolute_us,
67+
m_tracking_options.late_percentage * period_us / 100);
7468
const unsigned int latency_too_late_threshold_us = std::min(
75-
_tracking_options.too_late_absolute_us,
76-
_tracking_options.too_late_percentage * period_us / 100);
69+
m_tracking_options.too_late_absolute_us,
70+
m_tracking_options.too_late_percentage * period_us / 100);
7771

7872
too_late = lat_us > latency_too_late_threshold_us;
7973
late = lat_us > latency_late_threshold_us && !too_late;
@@ -86,13 +80,13 @@ void Tracker::scan(
8680
lat_us << "us > " << latency_late_threshold_us << "us";
8781

8882
EventsLogger::Event ev;
89-
ev.caller_name = _topic_srv_name + "->" + _node_name;
83+
ev.caller_name = m_topic_srv_name + "->" + m_node_name;
9084
ev.code = EventsLogger::EventCode::late_message;
9185
ev.description = description.str();
9286

9387
elog->write_event(ev);
9488
}
95-
_late_messages++;
89+
m_late_messages++;
9690
}
9791

9892
if (too_late) {
@@ -103,35 +97,63 @@ void Tracker::scan(
10397
lat_us << "us > " << latency_too_late_threshold_us << "us";
10498

10599
EventsLogger::Event ev;
106-
ev.caller_name = _topic_srv_name + "->" + _node_name;
100+
ev.caller_name = m_topic_srv_name + "->" + m_node_name;
107101
ev.code = EventsLogger::EventCode::too_late_message;
108102
ev.description = description.str();
109103

110104
elog->write_event(ev);
111105
}
112-
_too_late_messages++;
106+
m_too_late_messages++;
113107
}
114108
}
115109

116110
if (!too_late) {
117111
// Compute statistics with new sample. Don't add to this the msgs
118112
// that arrived too late.
119-
this->add_sample(lat_us);
113+
this->add_sample(now, lat_us, header.size, header.frequency);
120114
}
121115

122-
_received_messages++;
116+
m_received_messages++;
123117
}
124118

125-
void Tracker::add_sample(uint64_t latency_sample)
119+
void Tracker::add_sample(
120+
const rclcpp::Time & now,
121+
uint64_t latency_sample,
122+
size_t size,
123+
float frequency)
126124
{
127-
_stat.add_sample(latency_sample);
125+
// If this is first message received store some info about it
126+
if (m_stat.n() == 0) {
127+
m_data_size = size;
128+
m_frequency = frequency;
129+
m_first_msg_time = now;
130+
}
131+
132+
m_last_msg_time = now;
133+
m_stat.add_sample(latency_sample);
128134
}
129135

130136
uint32_t Tracker::get_and_update_tracking_number()
131137
{
132-
uint32_t old_number = _tracking_number_count;
133-
_tracking_number_count++;
138+
uint32_t old_number = m_tracking_number_count;
139+
m_tracking_number_count++;
134140
return old_number;
135141
}
136142

143+
double Tracker::throughput() const
144+
{
145+
// We compute max because currently publishers update only n() and not m_received_messages,
146+
// but for subscriptions n() will not include messages received too late.
147+
uint64_t num_msg_received = std::max(m_stat.n(), m_received_messages);
148+
if (num_msg_received < 2) {
149+
return 0.0;
150+
}
151+
152+
rclcpp::Duration msgs_received_interval = m_last_msg_time - m_first_msg_time;
153+
double sample_rate_sec = (num_msg_received - 1) / msgs_received_interval.seconds();
154+
double throughput = sample_rate_sec * m_data_size;
155+
156+
return throughput;
157+
}
158+
137159
} // namespace performance_metrics

performance_test/include/performance_test/communication.hpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
enum msg_pass_by_t
1414
{
1515
PASS_BY_UNIQUE_PTR,
16-
PASS_BY_SHARED_PTR
16+
PASS_BY_SHARED_PTR,
17+
PASS_BY_LOANED_MSG,
1718
};
1819

1920
#endif // PERFORMANCE_TEST__COMMUNICATION_HPP_

performance_test/include/performance_test/performance_node_base_impl.hpp

+35-9
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ void PerformanceNodeBase::add_subscriber(
4545
{
4646
switch (msg_pass_by) {
4747
case PASS_BY_SHARED_PTR:
48+
case PASS_BY_LOANED_MSG:
4849
add_subscriber_by_msg_variant<Msg, typename Msg::ConstSharedPtr>(
4950
topic_name,
5051
tracking_options,
@@ -205,15 +206,16 @@ void PerformanceNodeBase::publish_msg(
205206
float pub_frequency = 1000000.0 / period.count();
206207

207208
auto tracking_number = tracker.get_and_update_tracking_number();
208-
uint64_t pub_time_us = 0;
209+
rclcpp::Time publish_time;
210+
uint64_t pub_duration_us = 0;
209211
size_t msg_size = 0;
210212
switch (msg_pass_by) {
211213
case PASS_BY_SHARED_PTR:
212214
{
213215
// create a message and eventually resize it
214216
auto msg = std::make_shared<Msg>();
215217
msg_size = resize_msg(msg->data, size);
216-
auto publish_time = m_node_interfaces.clock->get_clock()->now();
218+
publish_time = m_node_interfaces.clock->get_clock()->now();
217219

218220
msg->header = create_msg_header(
219221
publish_time,
@@ -224,7 +226,7 @@ void PerformanceNodeBase::publish_msg(
224226
pub->publish(*msg);
225227

226228
auto end_time = m_node_interfaces.clock->get_clock()->now();
227-
pub_time_us = (end_time - publish_time).nanoseconds() / 1000.0f;
229+
pub_duration_us = (end_time - publish_time).nanoseconds() / 1000.0f;
228230

229231
break;
230232
}
@@ -233,7 +235,7 @@ void PerformanceNodeBase::publish_msg(
233235
// create a message and eventually resize it
234236
auto msg = std::make_unique<Msg>();
235237
msg_size = resize_msg(msg->data, size);
236-
auto publish_time = m_node_interfaces.clock->get_clock()->now();
238+
publish_time = m_node_interfaces.clock->get_clock()->now();
237239

238240
msg->header = create_msg_header(
239241
publish_time,
@@ -244,19 +246,43 @@ void PerformanceNodeBase::publish_msg(
244246
pub->publish(std::move(msg));
245247

246248
auto end_time = m_node_interfaces.clock->get_clock()->now();
247-
pub_time_us = (end_time - publish_time).nanoseconds() / 1000.0f;
249+
pub_duration_us = (end_time - publish_time).nanoseconds() / 1000.0f;
250+
251+
break;
252+
}
253+
case PASS_BY_LOANED_MSG:
254+
{
255+
// create a message and eventually resize it
256+
std::allocator<void> allocator;
257+
rclcpp::LoanedMessage<Msg> loaned_msg(*pub, allocator);
258+
auto & msg_ref = loaned_msg.get();
259+
msg_size = resize_msg(msg_ref.data, size);
260+
// Fill the loaned msg with 1s.
261+
// This simulates a real application: users are expected to populate the loaned msg with new data every time.
262+
std::fill(std::begin(msg_ref.data), std::end(msg_ref.data), 1);
263+
264+
publish_time = m_node_interfaces.clock->get_clock()->now();
265+
266+
msg_ref.header = create_msg_header(
267+
publish_time,
268+
pub_frequency,
269+
tracking_number,
270+
msg_size);
271+
272+
pub->publish(std::move(loaned_msg));
273+
274+
auto end_time = m_node_interfaces.clock->get_clock()->now();
275+
pub_duration_us = (end_time - publish_time).nanoseconds() / 1000.0f;
248276

249277
break;
250278
}
251279
}
252280

253-
tracker.set_frequency(pub_frequency);
254-
tracker.set_size(msg_size);
255-
tracker.add_sample(pub_time_us);
281+
tracker.add_sample(publish_time, pub_duration_us, msg_size, pub_frequency);
256282

257283
RCLCPP_DEBUG(
258284
this->get_node_logger(),
259-
"Publishing to %s msg number %d took %lu us", name.c_str(), tracking_number, pub_time_us);
285+
"Publishing to %s msg number %d took %lu us", name.c_str(), tracking_number, pub_duration_us);
260286
}
261287

262288
template<typename DataT>

performance_test_msgs/msg/Sample.msg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
PerformanceHeader header
2-
byte data
2+
byte[1] data

0 commit comments

Comments
 (0)