diff --git a/clients/roscpp/src/libros/callback_queue.cpp b/clients/roscpp/src/libros/callback_queue.cpp index 165b3357ab..9a3cfa3656 100644 --- a/clients/roscpp/src/libros/callback_queue.cpp +++ b/clients/roscpp/src/libros/callback_queue.cpp @@ -166,9 +166,10 @@ void CallbackQueue::removeByID(uint64_t removal_id) } { - boost::unique_lock rw_lock(id_info->calling_rw_mutex, boost::defer_lock); - if (rw_lock.try_lock()) + // Unless we're removing from callback, we lock the calling mutex to ensure callback is not being executed. + if (tls_->calling_in_this_thread != id_info->id) { + boost::unique_lock rw_lock(id_info->calling_rw_mutex); boost::mutex::scoped_lock lock(mutex_); D_CallbackInfo::iterator it = callbacks_.begin(); for (; it != callbacks_.end();) @@ -186,8 +187,8 @@ void CallbackQueue::removeByID(uint64_t removal_id) } else { - // We failed to acquire the lock, it can be that we are trying to remove something from the callback queue - // while it is being executed. Mark it for removal and let it be cleaned up later. + // Since we're removing from callback, locking twice would deadlock. + // Instead, mark callback for removal and let it be cleaned up later. boost::mutex::scoped_lock lock(mutex_); for (D_CallbackInfo::iterator it = callbacks_.begin(); it != callbacks_.end(); it++) { diff --git a/test/test_roscpp/test/test_callback_queue.cpp b/test/test_roscpp/test/test_callback_queue.cpp index a642e60a7e..2f6607446f 100644 --- a/test/test_roscpp/test/test_callback_queue.cpp +++ b/test/test_roscpp/test/test_callback_queue.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include using namespace ros; @@ -66,6 +67,26 @@ class CountingCallback : public CallbackInterface }; typedef boost::shared_ptr CountingCallbackPtr; +struct CustomCallback : public CallbackInterface +{ + template + CustomCallback(Func fun) + : function(fun), count(0) + {} + + virtual CallResult call() + { + function(); + ++count; + + return Success; + } + + boost::function function; + size_t count; +}; +typedef boost::shared_ptr CustomCallbackPtr; + void callAvailableThread(CallbackQueue* queue, bool& done) { while (!done) @@ -177,80 +198,90 @@ TEST(CallbackQueue, removeSelf) queue.callOne(); queue.addCallback(cb2, 1); - + queue.callAvailable(); EXPECT_EQ(cb1->count, 1U); EXPECT_EQ(cb2->count, 1U); } -class BlockingCallback : public CallbackInterface -{ -public: - BlockingCallback() - : count(0) - {} - - virtual CallResult call() - { - mutex_.lock(); - ++count; - - return Success; - } - - boost::mutex mutex_; - size_t count; -}; -typedef boost::shared_ptr BlockingCallbackPtr; - - -// This test checks whether removing callbacks by an id doesn't block if one of those callback is being executed. -TEST(CallbackQueue, removeCallbackWhileExecuting) +// This test checks whether self-removing callbacks by an id doesn't block if one of those callback is being executed. +TEST(CallbackQueue, selfRemoveCallbackWhileExecuting) { const uint64_t owner_id = 1; const uint64_t unrelated_id = 2; + boost::mutex external_mtx; + CallbackQueue queue; - BlockingCallbackPtr cb1(boost::make_shared()); + CustomCallbackPtr cb1(boost::make_shared([&]() { + boost::unique_lock external_lock(external_mtx); + boost::this_thread::sleep_for(boost::chrono::milliseconds(300)); + + { + boost::reverse_lock> unlocker(external_lock); + queue.removeByID(owner_id); // external thread blocks here, spinner doesn't + } + })); CountingCallbackPtr cb2(boost::make_shared()); CountingCallbackPtr cb3(boost::make_shared()); - cb1->mutex_.lock(); // lock the mutex to ensure the blocking callback will stall processing of callback queue. - - queue.addCallback(cb1, owner_id); // Add the blocking callback. - - // Now, we need to serve the callback queue from another thread. - bool done = false; - boost::thread t = boost::thread(boost::bind(&callAvailableThread, &queue, boost::ref(done))); - - ros::WallDuration(1.0).sleep(); // Callback 1 should be being served now. - + queue.addCallback(cb1, owner_id); // Add the self-removing callback. queue.addCallback(cb2, unrelated_id); // Add a second callback with different owner. queue.addCallback(cb3, owner_id); // Add a third with same owner, this one should never get executed. - // Now try to remove the callback that's being executed. - queue.removeByID(owner_id); // This should not block because cb1 is being served, it should prevent cb3 from running. + // Let's use an external thread to execute cb function and hold its external lock + boost::thread t1([&]() { cb1->call(); }); + boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); - ros::WallDuration(1.0).sleep(); - - // The removeByID should not block, so now we can unblock the blocking callback. - cb1->mutex_.unlock(); // This allows processing of cb1 to continue. + // Now, we need to serve the callback queue from another thread. + bool done = false; + boost::thread t2([&]() { callAvailableThread(&queue, done); }); while (!queue.isEmpty()) // Wait until the queue is empty. { ros::WallDuration(0.01).sleep(); } - // Properly shut down our callback serving thread. + // Properly shut down our threads. done = true; - t.join(); + t2.join(); + t1.join(); - EXPECT_EQ(cb1->count, 1U); + EXPECT_EQ(cb1->count, 2U); EXPECT_EQ(cb2->count, 1U); EXPECT_EQ(cb3->count, 0U); +} + +// This test checks whether non-spinner thread blocks on removeByID until currently executing callback finishes +TEST(CallbackQueue, removeCallbackWhileExecuting) +{ + const uint64_t cb_id = 1; + boost::barrier barrier(2); + + CallbackQueue queue; + CustomCallbackPtr cb(boost::make_shared([&]() { + barrier.wait(); + barrier.wait(); + })); + queue.addCallback(cb, cb_id); + + // Let's ensure spinner thread executes callback now + bool done = false; + boost::thread t1([&]() { callAvailableThread(&queue, done); }); + barrier.wait(); - cb1->mutex_.unlock(); // Ensure the mutex is unlocked before destruction. + // External removing thread blocks on removeByID + boost::thread t2([&]() { queue.removeByID(cb_id); }); + EXPECT_FALSE(t2.try_join_for(boost::chrono::milliseconds(200))); // removebyID blocks until cb finishes + + // When callback finishes, external thread proceeds + barrier.wait(); + t2.join(); + EXPECT_EQ(cb->count, 1U); + + done = true; + t1.join(); } class RecursiveCallback : public CallbackInterface @@ -549,6 +580,3 @@ int main(int argc, char** argv) testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } - - -