Skip to content

Commit

Permalink
uxrce_dds_client: optimizations and instrumentation
Browse files Browse the repository at this point in the history
 - skip ping session if data flowing bidirectionally
 - add perf counters for loop time and interval
 - skip blocking poll if there's input data to read
  • Loading branch information
dagar authored and Peize-Liu committed Mar 24, 2024
1 parent ee58274 commit 1d5f527
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 70 deletions.
163 changes: 95 additions & 68 deletions src/modules/uxrce_dds_client/uxrce_dds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@

using namespace time_literals;

void on_time(uxrSession *session, int64_t current_time, int64_t received_timestamp, int64_t transmit_timestamp,
int64_t originate_timestamp, void *args)
static void on_time(uxrSession *session, int64_t current_time, int64_t received_timestamp, int64_t transmit_timestamp,
int64_t originate_timestamp, void *args)
{
// latest round trip time (RTT)
int64_t rtt = current_time - originate_timestamp;
Expand All @@ -76,21 +76,15 @@ void on_time(uxrSession *session, int64_t current_time, int64_t received_timesta
}
}

void on_time_no_sync(uxrSession *session, int64_t current_time, int64_t received_timestamp, int64_t transmit_timestamp,
int64_t originate_timestamp, void *args)
static void on_time_no_sync(uxrSession *session, int64_t current_time, int64_t received_timestamp,
int64_t transmit_timestamp,
int64_t originate_timestamp, void *args)
{
session->time_offset = 0;
}


void on_request(
uxrSession *session,
uxrObjectId object_id,
uint16_t request_id,
SampleIdentity *sample_id,
ucdrBuffer *ub,
uint16_t length,
void *args)
static void on_request(uxrSession *session, uxrObjectId object_id, uint16_t request_id, SampleIdentity *sample_id,
ucdrBuffer *ub, uint16_t length, void *args)
{
(void) request_id;
(void) length;
Expand Down Expand Up @@ -229,6 +223,9 @@ UxrceddsClient::~UxrceddsClient()
delete _transport_serial;
}

perf_free(_loop_perf);
perf_free(_loop_interval_perf);

#if defined(UXRCE_DDS_CLIENT_UDP)

if (_transport_udp) {
Expand Down Expand Up @@ -474,9 +471,9 @@ void UxrceddsClient::run()
}

// create VehicleCommand replier
if (num_of_repliers < MAX_NUM_REPLIERS) {
if (_num_of_repliers < MAX_NUM_REPLIERS) {
if (add_replier(new VehicleCommandSrv(&session, reliable_out, reliable_in, participant_id, _client_namespace,
num_of_repliers))) {
_num_of_repliers))) {
PX4_ERR("replier init failed");
return;
}
Expand Down Expand Up @@ -522,52 +519,64 @@ void UxrceddsClient::run()

while (!should_exit() && _connected) {

/* Wait for topic updates for max 1000 ms (1sec) */
int poll = px4_poll(&_subs->fds[0], (sizeof(_subs->fds) / sizeof(_subs->fds[0])), 1000);
perf_begin(_loop_perf);
perf_count(_loop_interval_perf);

/* Handle the poll results */
if (poll == 0) {
/* Timeout, no updates in selected uorbs */
continue;

} else if (poll < 0) {
/* Error */
if (poll_error_counter < 10 || poll_error_counter % 50 == 0) {
/* Prevent flooding */
PX4_ERR("ERROR while polling uorbs: %d", poll);
}
int orb_poll_timeout_ms = 10;

poll_error_counter++;
continue;
int bytes_available = 0;

if (ioctl(_fd, FIONREAD, (unsigned long)&bytes_available) == OK) {
if (bytes_available > 10) {
orb_poll_timeout_ms = 0;
}
}

_subs->update(&session, reliable_out, best_effort_out, participant_id, _client_namespace);
/* Wait for topic updates for max 10 ms */
int poll = px4_poll(_subs->fds, (sizeof(_subs->fds) / sizeof(_subs->fds[0])), orb_poll_timeout_ms);

// check if there are available replies
process_replies();
/* Handle the poll results */
if (poll > 0) {
_subs->update(&session, reliable_out, best_effort_out, participant_id, _client_namespace);

// Run the session until we receive no more data or up to a maximum number of iterations.
// The maximum observed number of iterations was 6 (SITL). If we were to run only once, data starts to get
// delayed, causing registered flight modes to time out.
for (int i = 0; i < 10; ++i) {
const uint32_t prev_num_payload_received = _pubs->num_payload_received;
uxr_run_session_timeout(&session, 0);
} else {
if (poll < 0) {
// poll error
if (poll_error_counter < 10 || poll_error_counter % 50 == 0) {
// prevent flooding
PX4_ERR("ERROR while polling uorbs: %d", poll);
}

if (_pubs->num_payload_received == prev_num_payload_received) {
break;
poll_error_counter++;
}
}

// run session with 0 timeout (non-blocking)
uxr_run_session_timeout(&session, 0);

// check if there are available replies
process_replies();

// time sync session
if (_synchronize_timestamps && hrt_elapsed_time(&last_sync_session) > 1_s) {
if (uxr_sync_session(&session, 100) && _timesync.sync_converged()) {

if (uxr_sync_session(&session, 10) && _timesync.sync_converged()) {
//PX4_INFO("synchronized with time offset %-5" PRId64 "ns", session.time_offset);
last_sync_session = hrt_absolute_time();

if (_param_uxrce_dds_syncc.get() > 0) {
syncSystemClock(&session);
}
}

if (!_timesync_converged && _timesync.sync_converged()) {
PX4_INFO("time sync converged");

} else if (_timesync_converged && !_timesync.sync_converged()) {
PX4_WARN("time sync no longer converged");
}

_timesync_converged = _timesync.sync_converged();
}

handleMessageFormatRequest();
Expand All @@ -589,27 +598,38 @@ void UxrceddsClient::run()
last_status_update = now;
}

// Handle ping
if (now - last_ping > 500_ms) {
// Handle ping, unless we're actively sending & receiving payloads successfully
if ((_last_payload_tx_rate > 0) && (_last_payload_rx_rate > 0)) {
_connected = true;
num_pings_missed = 0;
last_ping = now;

if (had_ping_reply) {
num_pings_missed = 0;
} else {
if (hrt_elapsed_time(&last_ping) > 1_s) {
last_ping = now;

if (had_ping_reply) {
num_pings_missed = 0;

} else {
++num_pings_missed;
}
} else {
++num_pings_missed;
}

uxr_ping_agent_session(&session, 0, 1);
int timeout_ms = 1'000; // 1 second
uint8_t attempts = 1;
uxr_ping_agent_session(&session, timeout_ms, attempts);

had_ping_reply = false;
}
had_ping_reply = false;
}

if (num_pings_missed > 2) {
PX4_INFO("No ping response, disconnecting");
_connected = false;
if (num_pings_missed >= 3) {
PX4_INFO("No ping response, disconnecting");
_connected = false;
}
}

perf_end(_loop_perf);

}

delete_repliers();
Expand Down Expand Up @@ -761,10 +781,10 @@ bool UxrceddsClient::setBaudrate(int fd, unsigned baud)

bool UxrceddsClient::add_replier(SrvBase *replier)
{
if (num_of_repliers < MAX_NUM_REPLIERS) {
repliers_[num_of_repliers] = replier;
if (_num_of_repliers < MAX_NUM_REPLIERS) {
_repliers[_num_of_repliers] = replier;

num_of_repliers++;
_num_of_repliers++;
}

return false;
Expand All @@ -773,30 +793,32 @@ bool UxrceddsClient::add_replier(SrvBase *replier)
void UxrceddsClient::process_requests(uxrObjectId object_id, SampleIdentity *sample_id, ucdrBuffer *ub,
const int64_t time_offset_us)
{
for (uint8_t i = 0; i < num_of_repliers; i++) {
if (object_id.id == repliers_[i]->replier_id_.id
&& object_id.type == repliers_[i]->replier_id_.type) {
repliers_[i]->process_request(ub, time_offset_us);
memcpy(&(repliers_[i]->sample_id_), sample_id, sizeof(repliers_[i]->sample_id_));
for (uint8_t i = 0; i < _num_of_repliers; i++) {
if (object_id.id == _repliers[i]->replier_id_.id
&& object_id.type == _repliers[i]->replier_id_.type) {

_repliers[i]->process_request(ub, time_offset_us);
memcpy(&(_repliers[i]->sample_id_), sample_id, sizeof(_repliers[i]->sample_id_));
break;
}
}
}

void UxrceddsClient::process_replies()
{
for (uint8_t i = 0; i < num_of_repliers; i++) {
repliers_[i]->process_reply();
for (uint8_t i = 0; i < _num_of_repliers; i++) {
_repliers[i]->process_reply();
}
}

void UxrceddsClient::delete_repliers()
{
for (uint8_t i = 0; i < num_of_repliers; i++) {
delete (repliers_[i]);
for (uint8_t i = 0; i < _num_of_repliers; i++) {
delete (_repliers[i]);
_repliers[i] = nullptr;
}

num_of_repliers = 0;
_num_of_repliers = 0;
}

int UxrceddsClient::custom_command(int argc, char *argv[])
Expand Down Expand Up @@ -845,6 +867,11 @@ int UxrceddsClient::print_status()
PX4_INFO("Payload rx: %i B/s", _last_payload_rx_rate);
}

PX4_INFO("timesync converged: %s", _timesync.sync_converged() ? "true" : "false");

perf_print_counter(_loop_perf);
perf_print_counter(_loop_interval_perf);

return 0;
}

Expand Down
11 changes: 9 additions & 2 deletions src/modules/uxrce_dds_client/uxrce_dds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

#include <lib/timesync/Timesync.hpp>

#include <lib/perf/perf_counter.h>

#if defined(CONFIG_NET) || defined(__PX4_POSIX)
# define UXRCE_DDS_CLIENT_UDP 1
#endif
Expand Down Expand Up @@ -157,8 +159,8 @@ class UxrceddsClient : public ModuleBase<UxrceddsClient>, public ModuleParams
SendTopicsSubs *_subs{nullptr};
RcvTopicsPubs *_pubs{nullptr};

SrvBase *repliers_[MAX_NUM_REPLIERS];
uint8_t num_of_repliers{0};
SrvBase *_repliers[MAX_NUM_REPLIERS];
uint8_t _num_of_repliers{0};

uxrCommunication *_comm{nullptr};
int _fd{-1};
Expand All @@ -167,8 +169,13 @@ class UxrceddsClient : public ModuleBase<UxrceddsClient>, public ModuleParams
int _last_payload_rx_rate{}; ///< in B/s
bool _connected{false};

bool _timesync_converged{false};

Timesync _timesync{timesync_status_s::SOURCE_PROTOCOL_DDS};

perf_counter_t _loop_perf{perf_alloc(PC_ELAPSED, MODULE_NAME": cycle")};
perf_counter_t _loop_interval_perf{perf_alloc(PC_INTERVAL, MODULE_NAME": cycle interval")};

DEFINE_PARAMETERS(
(ParamInt<px4::params::UXRCE_DDS_DOM_ID>) _param_uxrce_dds_dom_id,
(ParamInt<px4::params::UXRCE_DDS_KEY>) _param_uxrce_key,
Expand Down

0 comments on commit 1d5f527

Please sign in to comment.