Skip to content

Commit

Permalink
Merge pull request #214 from lbr-stack/dev-rolling-worker
Browse files Browse the repository at this point in the history
Adds a worker class for re-use in asynchronous FT estimation
  • Loading branch information
mhubii authored Oct 22, 2024
2 parents 2171bd8 + 74afbb8 commit ebe74f6
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 49 deletions.
1 change: 1 addition & 0 deletions lbr_fri_ros2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ add_library(lbr_fri_ros2
src/filters.cpp
src/ft_estimator.cpp
src/kinematics.cpp
src/worker.cpp
)

target_include_directories(lbr_fri_ros2
Expand Down
20 changes: 11 additions & 9 deletions lbr_fri_ros2/include/lbr_fri_ros2/app.hpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
#ifndef LBR_FRI_ROS2__APP_HPP_
#define LBR_FRI_ROS2__APP_HPP_

#include <atomic>
#include <memory>
#include <thread>

#include "rclcpp/logger.hpp"
#include "rclcpp/logging.hpp"
#include "realtime_tools/thread_priority.hpp"

#include "friClientApplication.h"
#include "friUdpConnection.h"

#include "lbr_fri_ros2/async_client.hpp"
#include "lbr_fri_ros2/formatting.hpp"
#include "lbr_fri_ros2/worker.hpp"

namespace lbr_fri_ros2 {
class App {
class App : public Worker {
/**
* @brief This clas provides utilities to run the KUKA::FRI::ClientApplication asynchronously.
* Note that the rate at which the application runs is determined by the robot. This is because
* the run_thread_ uses blocking function calls from the FRI client SDK, i.e.
* KUKA::FRI::ClientApplication::step() (this is by KUKA's design).
*
*/
protected:
static constexpr char LOGGER_NAME[] = "lbr_fri_ros2::App";

Expand All @@ -26,15 +31,12 @@ class App {

bool open_udp_socket(const int &port_id = 30200, const char *const remote_host = NULL);
bool close_udp_socket();
void run_async(int rt_prio = 80);
void request_stop();
void run_async(int rt_prio = 80) override;

protected:
void perform_work_() override;
bool valid_port_(const int &port_id);

std::atomic_bool should_stop_, running_;
std::thread run_thread_;

std::shared_ptr<AsyncClient> async_client_ptr_;
std::unique_ptr<KUKA::FRI::UdpConnection> connection_ptr_;
std::unique_ptr<KUKA::FRI::ClientApplication> app_ptr_;
Expand Down
32 changes: 32 additions & 0 deletions lbr_fri_ros2/include/lbr_fri_ros2/worker.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#ifndef LBR_FRI_ROS2__WORKER_HPP_
#define LBR_FRI_ROS2__WORKER_HPP_

#include <atomic>
#include <thread>

#include "rclcpp/logger.hpp"
#include "rclcpp/logging.hpp"
#include "realtime_tools/thread_priority.hpp"

#include "lbr_fri_ros2/formatting.hpp"

namespace lbr_fri_ros2 {
class Worker {
protected:
static constexpr char LOGGER_NAME[] = "lbr_fri_ros2::Worker";

public:
Worker();
~Worker();

virtual void run_async(int rt_prio = 80);
void request_stop();

protected:
virtual void perform_work_() = 0;

std::atomic_bool should_stop_, running_;
std::thread run_thread_;
};
} // namespace lbr_fri_ros2
#endif // LBR_FRI_ROS2__WORKER_HPP_
55 changes: 15 additions & 40 deletions lbr_fri_ros2/src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

namespace lbr_fri_ros2 {
App::App(const std::shared_ptr<AsyncClient> async_client_ptr)
: should_stop_(true), running_(false), async_client_ptr_(nullptr), connection_ptr_(nullptr),
app_ptr_(nullptr) {
: async_client_ptr_(nullptr), connection_ptr_(nullptr), app_ptr_(nullptr) {
async_client_ptr_ = async_client_ptr;
connection_ptr_ = std::make_unique<KUKA::FRI::UdpConnection>();
app_ptr_ = std::make_unique<KUKA::FRI::ClientApplication>(*connection_ptr_, *async_client_ptr_);
Expand Down Expand Up @@ -83,48 +82,24 @@ void App::run_async(int rt_prio) {
ColorScheme::ERROR << "App not configured" << ColorScheme::ENDC);
return;
}
if (running_) {
RCLCPP_WARN_STREAM(rclcpp::get_logger(LOGGER_NAME),
ColorScheme::WARNING << "App already running" << ColorScheme::ENDC);
return;
}
run_thread_ = std::thread([this, rt_prio]() {
if (!realtime_tools::configure_sched_fifo(rt_prio)) {
RCLCPP_WARN_STREAM(rclcpp::get_logger(LOGGER_NAME),
ColorScheme::WARNING
<< "Failed to set FIFO realtime scheduling policy. Refer to "
"[https://control.ros.org/master/doc/ros2_control/"
"controller_manager/doc/userdoc.html]."
<< ColorScheme::ENDC);
} else {
RCLCPP_INFO_STREAM(rclcpp::get_logger(LOGGER_NAME),
ColorScheme::OKGREEN
<< "Realtime scheduling policy set to FIFO with priority '" << rt_prio
<< "'" << ColorScheme::ENDC);
}

RCLCPP_INFO(rclcpp::get_logger(LOGGER_NAME), "Starting run thread");
should_stop_ = false;
bool success = true;
while (rclcpp::ok() && success && !should_stop_) {
success = app_ptr_->step(); // stuck if never connected, we thus detach the thread as join may
// never return
running_ = true;
if (async_client_ptr_->robotState().getSessionState() == KUKA::FRI::ESessionState::IDLE) {
RCLCPP_INFO(rclcpp::get_logger(LOGGER_NAME), "LBR in session state idle, exiting");
break;
}
}
async_client_ptr_->get_state_interface()->uninitialize();
running_ = false;
RCLCPP_INFO(rclcpp::get_logger(LOGGER_NAME), "Exiting run thread");
});
// call base class run_async post checks
Worker::run_async(rt_prio);
run_thread_.detach();
}

void App::request_stop() {
RCLCPP_INFO(rclcpp::get_logger(LOGGER_NAME), "Requesting run thread stop");
should_stop_ = true;
void App::perform_work_() {
bool success = true;
while (rclcpp::ok() && success && !should_stop_) {
success = app_ptr_->step(); // stuck if never connected, we thus detach the run_thread_ as join
// may never return
running_ = true;
if (async_client_ptr_->robotState().getSessionState() == KUKA::FRI::ESessionState::IDLE) {
RCLCPP_INFO(rclcpp::get_logger(LOGGER_NAME), "LBR in session state idle, exiting");
break;
}
}
async_client_ptr_->get_state_interface()->uninitialize();
}

bool App::valid_port_(const int &port_id) {
Expand Down
53 changes: 53 additions & 0 deletions lbr_fri_ros2/src/worker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include "lbr_fri_ros2/worker.hpp"

namespace lbr_fri_ros2 {
Worker::Worker() : should_stop_(true), running_(false) {}

Worker::~Worker() {
this->request_stop();
if (run_thread_.joinable()) {
run_thread_.join();
}
}

void Worker::run_async(int rt_prio) {
if (running_) {
RCLCPP_WARN_STREAM(rclcpp::get_logger(LOGGER_NAME),
ColorScheme::WARNING << "Worker already running" << ColorScheme::ENDC);
return;
}
run_thread_ = std::thread([this, rt_prio]() {
if (!realtime_tools::configure_sched_fifo(rt_prio)) {
RCLCPP_WARN_STREAM(rclcpp::get_logger(LOGGER_NAME),
ColorScheme::WARNING
<< "Failed to set FIFO realtime scheduling policy. Refer to "
"[https://control.ros.org/master/doc/ros2_control/"
"controller_manager/doc/userdoc.html]."
<< ColorScheme::ENDC);
} else {
RCLCPP_INFO_STREAM(rclcpp::get_logger(LOGGER_NAME),
ColorScheme::OKGREEN
<< "Realtime scheduling policy set to FIFO with priority '" << rt_prio
<< "'" << ColorScheme::ENDC);
}

RCLCPP_INFO(rclcpp::get_logger(LOGGER_NAME), "Starting run thread");
should_stop_ = false;

// perform work in child-class
this->perform_work_();
// perform work end

running_ = false;
RCLCPP_INFO(rclcpp::get_logger(LOGGER_NAME), "Exiting run thread");
});
}

void Worker::request_stop() {
if (!running_) {
return;
}
RCLCPP_INFO(rclcpp::get_logger(LOGGER_NAME), "Requesting run thread stop");
should_stop_ = true;
}
} // namespace lbr_fri_ros2

0 comments on commit ebe74f6

Please sign in to comment.