From 0561139f52451fda25f921f83c5d12c4c701c752 Mon Sep 17 00:00:00 2001 From: Sylvain Date: Mon, 4 May 2020 21:06:51 -0300 Subject: [PATCH] fix default event port behavior along with FileDescriptorActivity With the introduction of the work() method, which allowed to be more precise on what gets called in which condition, an event port under FileDescriptorActivity would not make updateHook get called anymore. This broke the Rock workflow with file descriptor activities. (Un)fortunately, the effect has been subtle enough to not be noticed readily. Or people started deploying tasks under the normal Activity where before it would work with the FDA (I'm guilty of that). This is a rather convoluted code path (the characters in the play are a FileDescriptorActivity 'FDA', ExecutionEngine 'EE' and a TaskContext 'TC' - the port signal calls FDA::trigger() - FDA::trigger() wakes-up the FDA loop, which calls EE::work(Trigger) - EE::work(Trigger) calls EE::processPortCallbacks but NOT EE::processHooks (expected from the work() refactoring) At that point, we have to remember that TC::addEventPort registers a port callback that by default calls TC::trigger(). The comment at that point in the code itself says "default schedules an updateHook" (which we're indeed expecting with a FDA) - so, EE::processPortCallbacks in the end calls TC::trigger - which, by default calls FDA::timeout And FDA::timeout() was not implemented. --- rtt/extras/FileDescriptorActivity.cpp | 90 ++++++++++++++++++--------- rtt/extras/FileDescriptorActivity.hpp | 13 +++- tests/specialized_activities.cpp | 23 ++++++- 3 files changed, 92 insertions(+), 34 deletions(-) diff --git a/rtt/extras/FileDescriptorActivity.cpp b/rtt/extras/FileDescriptorActivity.cpp index ff890d043..460d8c917 100644 --- a/rtt/extras/FileDescriptorActivity.cpp +++ b/rtt/extras/FileDescriptorActivity.cpp @@ -84,6 +84,7 @@ FileDescriptorActivity::FileDescriptorActivity(int priority, RunnableInterface* , m_has_timeout(false) , m_break_loop(false) , m_trigger(false) + , m_user_timeout(false) , m_update_sets(false) { FD_ZERO(&m_fd_set); @@ -109,6 +110,7 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Runn , m_has_timeout(false) , m_break_loop(false) , m_trigger(false) + , m_user_timeout(false) , m_update_sets(false) { FD_ZERO(&m_fd_set); @@ -125,6 +127,7 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seco , m_has_timeout(false) , m_break_loop(false) , m_trigger(false) + , m_user_timeout(false) , m_update_sets(false) { FD_ZERO(&m_fd_set); @@ -141,6 +144,7 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seco , m_has_timeout(false) , m_break_loop(false) , m_trigger(false) + , m_user_timeout(false) , m_update_sets(false) { FD_ZERO(&m_fd_set); @@ -214,8 +218,7 @@ void FileDescriptorActivity::triggerUpdateSets() { RTT::os::MutexLock lock(m_command_mutex); m_update_sets = true; } - int unused; (void)unused; - unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1); + writeInterruptPipe(); } bool FileDescriptorActivity::isUpdated(int fd) const { return FD_ISSET(fd, &m_fd_work); } @@ -257,6 +260,7 @@ bool FileDescriptorActivity::start() // reset flags m_break_loop = false; m_trigger = false; + m_user_timeout = false; m_update_sets = false; if (!Activity::start()) @@ -271,13 +275,12 @@ bool FileDescriptorActivity::start() } bool FileDescriptorActivity::trigger() -{ - if (isActive() ) { +{ + if (isActive()) { { RTT::os::MutexLock lock(m_command_mutex); m_trigger = true; } - int unused; (void)unused; - unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1); + writeInterruptPipe(); return true; } else return false; @@ -285,7 +288,14 @@ bool FileDescriptorActivity::trigger() bool FileDescriptorActivity::timeout() { - return false; + if (isActive()) { + { RTT::os::MutexLock lock(m_command_mutex); + m_user_timeout = true; + } + writeInterruptPipe(); + return true; + } else + return false; } @@ -294,7 +304,7 @@ struct fd_watch { fd_watch(int& fd) : fd(fd) {} ~fd_watch() { - if (fd != -1) + if (fd != -1) close(fd); fd = -1; } @@ -327,9 +337,9 @@ void FileDescriptorActivity::loop() } else { - static const int USECS_PER_SEC = 1000000; + static const int USECS_PER_SEC = 1000000; timeval timeout = { m_timeout_us / USECS_PER_SEC, - m_timeout_us % USECS_PER_SEC}; + m_timeout_us % USECS_PER_SEC }; ret = select(max_fd + 1, &m_fd_work, NULL, NULL, &timeout); } @@ -337,7 +347,8 @@ void FileDescriptorActivity::loop() m_has_timeout = false; if (ret == -1) { - log(Error) << "FileDescriptorActivity: error in select(), errno = " << errno << endlog(); + log(Error) << "FileDescriptorActivity: error in select(), errno = " + << errno << endlog(); m_has_error = true; } else if (ret == 0) @@ -351,33 +362,27 @@ void FileDescriptorActivity::loop() { // These variables are used in order to loop with select(). See the // while() condition below. - fd_set watch_pipe; - timeval timeout; - char dummy; - do - { - int unused; (void)unused; - unused = read(pipe, &dummy, 1); - - // Initialize the values for the next select() call - FD_ZERO(&watch_pipe); - FD_SET(pipe, &watch_pipe); - timeout.tv_sec = 0; - timeout.tv_usec = 0; - } - while(select(pipe + 1, &watch_pipe, NULL, NULL, &timeout) > 0); + clearInterruptPipe(); } - // We check the flags after the command queue was emptied as we could miss commands otherwise: + // We check the flags after the command queue was emptied as we could + // miss commands otherwise: bool do_trigger = true; bool user_trigger = false; + bool user_timeout = false; { RTT::os::MutexLock lock(m_command_mutex); - // This section should be really fast to not block threads calling trigger(), breakLoop() or watch(). + // This section should be really fast to not block threads calling + // trigger(), breakLoop() or watch(). if (m_trigger) { do_trigger = true; user_trigger = true; m_trigger = false; } + if (m_user_timeout) { + do_trigger = true; + user_timeout = true; + m_user_timeout = false; + } if (m_update_sets) { m_update_sets = false; do_trigger = false; @@ -396,6 +401,8 @@ void FileDescriptorActivity::loop() step(); if (m_has_timeout) work(RunnableInterface::TimeOut); + else if ( user_timeout ) + work(RunnableInterface::TimeOut); else if ( user_trigger ) work(RunnableInterface::Trigger); else @@ -411,13 +418,36 @@ void FileDescriptorActivity::loop() } } +void FileDescriptorActivity::clearInterruptPipe() { + int pipe = m_interrupt_pipe[0]; + + fd_set watch_pipe; + timeval timeout; + char dummy; + do + { + int unused; (void)unused; + unused = read(pipe, &dummy, 1); + + // Initialize the values for the next select() call + FD_ZERO(&watch_pipe); + FD_SET(m_interrupt_pipe[0], &watch_pipe); + timeout.tv_sec = 0; + timeout.tv_usec = 0; + } + while (select(pipe + 1, &watch_pipe, NULL, NULL, &timeout) > 0); +} +void FileDescriptorActivity::writeInterruptPipe() { + int unused; (void)unused; // avoid the "return value not used" warning + unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1); +} + bool FileDescriptorActivity::breakLoop() { { RTT::os::MutexLock lock(m_command_mutex); m_break_loop = true; } - int unused; (void)unused; - unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1); + writeInterruptPipe(); return true; } diff --git a/rtt/extras/FileDescriptorActivity.hpp b/rtt/extras/FileDescriptorActivity.hpp index 1a67c92ed..d847f6fce 100644 --- a/rtt/extras/FileDescriptorActivity.hpp +++ b/rtt/extras/FileDescriptorActivity.hpp @@ -120,6 +120,7 @@ namespace RTT { namespace extras { RTT::os::Mutex m_command_mutex; bool m_break_loop; bool m_trigger; + bool m_user_timeout; bool m_update_sets; /** Internal method that makes sure loop() takes into account @@ -127,6 +128,16 @@ namespace RTT { namespace extras { */ void triggerUpdateSets(); + /** Internal method that writes on an internal pipe to wake up + * the main loop + */ + void writeInterruptPipe(); + + /** Internal method that clears the interrupt pipe used to wake + * up the main loop + */ + void clearInterruptPipe(); + public: /** * Create a FileDescriptorActivity with a given priority and base::RunnableInterface @@ -278,7 +289,7 @@ namespace RTT { namespace extras { virtual void loop(); virtual bool breakLoop(); virtual bool stop(); - + /** Called by loop() when data is available on the file descriptor. By * default, it calls step() on the associated runner interface (if any) */ diff --git a/tests/specialized_activities.cpp b/tests/specialized_activities.cpp index ad154bb68..15d5d005f 100644 --- a/tests/specialized_activities.cpp +++ b/tests/specialized_activities.cpp @@ -25,12 +25,17 @@ struct TestFDActivity : public FileDescriptorActivity int fd, other_fd, result; bool do_read; + base::RunnableInterface::WorkReason work_reason; RTT::os::Mutex mutex; TestFDActivity() : FileDescriptorActivity(0), step_count(0), count(0), other_count(0), do_read(false) {} + void work(base::RunnableInterface::WorkReason reason) + { + work_reason = reason; + } void step() { RTT::os::MutexLock lock(mutex); @@ -97,6 +102,17 @@ BOOST_AUTO_TEST_CASE( testFileDescriptorActivity ) BOOST_CHECK_EQUAL(0, activity->count); BOOST_CHECK_EQUAL(0, activity->other_count); BOOST_CHECK( !activity->isRunning() && activity->isActive() ); + BOOST_CHECK_EQUAL(base::RunnableInterface::Trigger, activity->work_reason); + + // Check timeout(). Disable reading as there won't be any data on the FD + activity->do_read = false; + BOOST_CHECK( activity->timeout() ); + usleep(USLEEP); + BOOST_CHECK_EQUAL(2, activity->step_count); + BOOST_CHECK_EQUAL(0, activity->count); + BOOST_CHECK_EQUAL(0, activity->other_count); + BOOST_CHECK( !activity->isRunning() && activity->isActive() ); + BOOST_CHECK_EQUAL(base::RunnableInterface::TimeOut, activity->work_reason); // Check normal operations. Re-enable reading. activity->do_read = true; @@ -104,15 +120,16 @@ BOOST_AUTO_TEST_CASE( testFileDescriptorActivity ) result = write(writer, &buffer, 2); BOOST_CHECK( result == 2 ); usleep(USLEEP); - BOOST_CHECK_EQUAL(3, activity->step_count); + BOOST_CHECK_EQUAL(4, activity->step_count); BOOST_CHECK_EQUAL(2, activity->count); BOOST_CHECK_EQUAL(0, activity->other_count); BOOST_CHECK( !activity->isRunning() && activity->isActive() ); + BOOST_CHECK_EQUAL(base::RunnableInterface::IOReady, activity->work_reason); result = write(other_writer, &buffer, 2); BOOST_CHECK( result == 2 ); usleep(USLEEP); - BOOST_CHECK_EQUAL(5, activity->step_count); + BOOST_CHECK_EQUAL(6, activity->step_count); BOOST_CHECK_EQUAL(2, activity->count); BOOST_CHECK_EQUAL(2, activity->other_count); BOOST_CHECK( !activity->isRunning() && activity->isActive() ); @@ -143,11 +160,11 @@ BOOST_AUTO_TEST_CASE( testFileDescriptorActivity ) // step is blocking now // trigger another 65537 times for(std::size_t i = 0; i < 65537; ++i) activity->trigger(); + BOOST_CHECK_EQUAL(base::RunnableInterface::TimeOut, activity->work_reason); activity->mutex.unlock(); sleep(1); BOOST_CHECK_EQUAL(2, activity->step_count); BOOST_CHECK( activity->stop() ); - }