Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix default event port behavior along with FileDescriptorActivity #320

Merged
merged 2 commits into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 60 additions & 30 deletions rtt/extras/FileDescriptorActivity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ FileDescriptorActivity::FileDescriptorActivity(int priority, RunnableInterface*
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_user_timeout(false)
, m_update_sets(false)
{
FD_ZERO(&m_fd_set);
Expand All @@ -109,6 +110,7 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Runn
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_user_timeout(false)
, m_update_sets(false)
{
FD_ZERO(&m_fd_set);
Expand All @@ -125,6 +127,7 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seco
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_user_timeout(false)
, m_update_sets(false)
{
FD_ZERO(&m_fd_set);
Expand All @@ -141,6 +144,7 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seco
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_user_timeout(false)
, m_update_sets(false)
{
FD_ZERO(&m_fd_set);
Expand Down Expand Up @@ -214,8 +218,7 @@ void FileDescriptorActivity::triggerUpdateSets()
{ RTT::os::MutexLock lock(m_command_mutex);
m_update_sets = true;
}
int unused; (void)unused;
unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
writeInterruptPipe();
}
bool FileDescriptorActivity::isUpdated(int fd) const
{ return FD_ISSET(fd, &m_fd_work); }
Expand Down Expand Up @@ -257,6 +260,7 @@ bool FileDescriptorActivity::start()
// reset flags
m_break_loop = false;
m_trigger = false;
m_user_timeout = false;
m_update_sets = false;

if (!Activity::start())
Expand All @@ -271,21 +275,27 @@ bool FileDescriptorActivity::start()
}

bool FileDescriptorActivity::trigger()
{
if (isActive() ) {
{
if (isActive()) {
{ RTT::os::MutexLock lock(m_command_mutex);
m_trigger = true;
}
int unused; (void)unused;
unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
writeInterruptPipe();
return true;
} else
return false;
}

bool FileDescriptorActivity::timeout()
{
return false;
if (isActive()) {
{ RTT::os::MutexLock lock(m_command_mutex);
m_user_timeout = true;
}
writeInterruptPipe();
return true;
} else
return false;
}


Expand All @@ -294,7 +304,7 @@ struct fd_watch {
fd_watch(int& fd) : fd(fd) {}
~fd_watch()
{
if (fd != -1)
if (fd != -1)
close(fd);
fd = -1;
}
Expand Down Expand Up @@ -327,17 +337,18 @@ void FileDescriptorActivity::loop()
}
else
{
static const int USECS_PER_SEC = 1000000;
static const int USECS_PER_SEC = 1000000;
timeval timeout = { m_timeout_us / USECS_PER_SEC,
m_timeout_us % USECS_PER_SEC};
m_timeout_us % USECS_PER_SEC };
ret = select(max_fd + 1, &m_fd_work, NULL, NULL, &timeout);
}

m_has_error = false;
m_has_timeout = false;
if (ret == -1)
{
log(Error) << "FileDescriptorActivity: error in select(), errno = " << errno << endlog();
log(Error) << "FileDescriptorActivity: error in select(), errno = "
<< errno << endlog();
m_has_error = true;
}
else if (ret == 0)
Expand All @@ -351,33 +362,27 @@ void FileDescriptorActivity::loop()
{
// These variables are used in order to loop with select(). See the
// while() condition below.
fd_set watch_pipe;
timeval timeout;
char dummy;
do
{
int unused; (void)unused;
unused = read(pipe, &dummy, 1);

// Initialize the values for the next select() call
FD_ZERO(&watch_pipe);
FD_SET(pipe, &watch_pipe);
timeout.tv_sec = 0;
timeout.tv_usec = 0;
}
while(select(pipe + 1, &watch_pipe, NULL, NULL, &timeout) > 0);
clearInterruptPipe();
}

// We check the flags after the command queue was emptied as we could miss commands otherwise:
// We check the flags after the command queue was emptied as we could
// miss commands otherwise:
bool do_trigger = true;
bool user_trigger = false;
bool user_timeout = false;
{ RTT::os::MutexLock lock(m_command_mutex);
// This section should be really fast to not block threads calling trigger(), breakLoop() or watch().
// This section should be really fast to not block threads calling
// trigger(), breakLoop() or watch().
if (m_trigger) {
do_trigger = true;
user_trigger = true;
m_trigger = false;
}
if (m_user_timeout) {
do_trigger = true;
user_timeout = true;
m_user_timeout = false;
}
if (m_update_sets) {
m_update_sets = false;
do_trigger = false;
Expand All @@ -396,6 +401,8 @@ void FileDescriptorActivity::loop()
step();
if (m_has_timeout)
work(RunnableInterface::TimeOut);
else if ( user_timeout )
work(RunnableInterface::TimeOut);
else if ( user_trigger )
work(RunnableInterface::Trigger);
else
Expand All @@ -411,13 +418,36 @@ void FileDescriptorActivity::loop()
}
}

void FileDescriptorActivity::clearInterruptPipe() {
int pipe = m_interrupt_pipe[0];

fd_set watch_pipe;
timeval timeout;
char dummy;
do
{
int unused; (void)unused;
unused = read(pipe, &dummy, 1);

// Initialize the values for the next select() call
FD_ZERO(&watch_pipe);
FD_SET(m_interrupt_pipe[0], &watch_pipe);
timeout.tv_sec = 0;
timeout.tv_usec = 0;
}
while (select(pipe + 1, &watch_pipe, NULL, NULL, &timeout) > 0);
}
void FileDescriptorActivity::writeInterruptPipe() {
int unused; (void)unused; // avoid the "return value not used" warning
unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
}

bool FileDescriptorActivity::breakLoop()
{
{ RTT::os::MutexLock lock(m_command_mutex);
m_break_loop = true;
}
int unused; (void)unused;
unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
writeInterruptPipe();
return true;
}

Expand Down
54 changes: 46 additions & 8 deletions rtt/extras/FileDescriptorActivity.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,24 @@ namespace RTT { namespace extras {
RTT::os::Mutex m_command_mutex;
bool m_break_loop;
bool m_trigger;
bool m_user_timeout;
bool m_update_sets;

/** Internal method that makes sure loop() takes into account
* modifications in the set of watched FDs
*/
void triggerUpdateSets();

/** Internal method that writes on an internal pipe to wake up
* the main loop
*/
void writeInterruptPipe();

/** Internal method that clears the interrupt pipe used to wake
* up the main loop
*/
void clearInterruptPipe();

public:
/**
* Create a FileDescriptorActivity with a given priority and base::RunnableInterface
Expand Down Expand Up @@ -274,28 +285,55 @@ namespace RTT { namespace extras {
*/
int getTimeout_us() const;

/** Start the underlying thread and make it call \c loop
*/
virtual bool start();

/** The main loop, listening to various wake-up events
*
* The loop can be broken by calling \c breakLoop. \c timeout and \c trigger
* will wake it up and make it call work with resp. a TimeOut and Trigger
* reason. Available I/O on watched file descriptors will wake it up and
* make it call \c work with a IOReady reason
*/
virtual void loop();

/** Wake-up \c loop and make it return */
virtual bool breakLoop();
virtual bool stop();

/** Called by loop() when data is available on the file descriptor. By
* default, it calls step() on the associated runner interface (if any)

/** @deprecated does nothing, FileDescriptorActivity uses the \c work interface
*/
virtual void step();

/** Called by loop() when data is available on the file descriptor. By
* default, it calls step() on the associated runner interface (if any)
/** Called by loop() when it is woken up
*
* The reason parameter allows to know why the loop was woken up:
* - TimeOut if the activity time out has been reached or if \c timeout
* has been called. In the former case, \c hasTimeout will return true.
* - IOReady is some I/O has been received on the watched file descriptors
* - Trigger if trigger() has been called
*
* Calls runner->work with the same reason. By default \c
* ExecutionEngine will call all messages, port callbacks, functions
* and hooks in IOReady and TimeOut, but only messages and port
* callbacks in Trigger
*/
virtual void work(base::RunnableInterface::WorkReason reason);

/** Force calling step() even if no data is available on the file
* descriptor, and returns true if the signalling was successful
/**
* Wake up the main thread (in \c loop) and call \c work with Trigger as reason
*
* @return true if the activity is active, that is if it is started and
* will process the trigger. false otherwise.
*/
virtual bool trigger();

/**
* Always returns false.
* Wake up the main thread (in \c loop) and call \c work with TimeOut as reason
*
* @return true if the activity is active, that is if it is started and
* will process the trigger. false otherwise.
*/
virtual bool timeout();
};
Expand Down
23 changes: 20 additions & 3 deletions tests/specialized_activities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ struct TestFDActivity : public FileDescriptorActivity
int fd, other_fd, result;

bool do_read;
base::RunnableInterface::WorkReason work_reason;

RTT::os::Mutex mutex;

TestFDActivity()
: FileDescriptorActivity(0), step_count(0), count(0), other_count(0), do_read(false) {}

void work(base::RunnableInterface::WorkReason reason)
{
work_reason = reason;
}
void step()
{
RTT::os::MutexLock lock(mutex);
Expand Down Expand Up @@ -97,22 +102,34 @@ BOOST_AUTO_TEST_CASE( testFileDescriptorActivity )
BOOST_CHECK_EQUAL(0, activity->count);
BOOST_CHECK_EQUAL(0, activity->other_count);
BOOST_CHECK( !activity->isRunning() && activity->isActive() );
BOOST_CHECK_EQUAL(base::RunnableInterface::Trigger, activity->work_reason);

// Check timeout(). Disable reading as there won't be any data on the FD
activity->do_read = false;
BOOST_CHECK( activity->timeout() );
usleep(USLEEP);
BOOST_CHECK_EQUAL(2, activity->step_count);
BOOST_CHECK_EQUAL(0, activity->count);
BOOST_CHECK_EQUAL(0, activity->other_count);
BOOST_CHECK( !activity->isRunning() && activity->isActive() );
BOOST_CHECK_EQUAL(base::RunnableInterface::TimeOut, activity->work_reason);

// Check normal operations. Re-enable reading.
activity->do_read = true;
int buffer, result;
result = write(writer, &buffer, 2);
BOOST_CHECK( result == 2 );
usleep(USLEEP);
BOOST_CHECK_EQUAL(3, activity->step_count);
BOOST_CHECK_EQUAL(4, activity->step_count);
BOOST_CHECK_EQUAL(2, activity->count);
BOOST_CHECK_EQUAL(0, activity->other_count);
BOOST_CHECK( !activity->isRunning() && activity->isActive() );
BOOST_CHECK_EQUAL(base::RunnableInterface::IOReady, activity->work_reason);

result = write(other_writer, &buffer, 2);
BOOST_CHECK( result == 2 );
usleep(USLEEP);
BOOST_CHECK_EQUAL(5, activity->step_count);
BOOST_CHECK_EQUAL(6, activity->step_count);
BOOST_CHECK_EQUAL(2, activity->count);
BOOST_CHECK_EQUAL(2, activity->other_count);
BOOST_CHECK( !activity->isRunning() && activity->isActive() );
Expand Down Expand Up @@ -143,11 +160,11 @@ BOOST_AUTO_TEST_CASE( testFileDescriptorActivity )
// step is blocking now
// trigger another 65537 times
for(std::size_t i = 0; i < 65537; ++i) activity->trigger();
BOOST_CHECK_EQUAL(base::RunnableInterface::TimeOut, activity->work_reason);
activity->mutex.unlock();
sleep(1);
BOOST_CHECK_EQUAL(2, activity->step_count);
BOOST_CHECK( activity->stop() );

}


Expand Down