From f6f7ddf5c893ebf79636b97fcc898a1ce606f075 Mon Sep 17 00:00:00 2001 From: jparisu Date: Wed, 23 Nov 2022 13:21:52 +0100 Subject: [PATCH] Implementation of new Thread Pool Signed-off-by: jparisu --- cmake_utils/cmake/test/test_target.cmake | 27 +- .../threading/connector/OneShotConnector.hpp | 49 +++ .../threading/connector/SlotConnector.hpp | 63 ++++ .../connector/impl/OneShotConnector.ipp | 73 ++++ .../connector/impl/SlotConnector.ipp | 60 +++ .../threading/manager/AsyncManager.hpp | 66 ++++ .../cpp_utils/threading/manager/IManager.hpp | 38 ++ .../threading/manager/StdThreadPool.hpp | 84 +++++ .../threading/manager/SyncManager.hpp | 47 +++ .../threading/task/ArgsOwnedTask.hpp | 107 ++++++ .../cpp_utils/threading/task/ITask.hpp | 36 ++ .../cpp_utils/threading/task/OwnedTask.hpp | 49 +++ .../threading/task/ReferenceTask.hpp | 47 +++ .../threading/task/impl/ArgsOwnedTask.ipp | 50 +++ .../threading/thread/CustomThread.hpp | 45 +++ .../cpp_utils/wait/ConsumerWaitHandler.hpp | 22 +- .../cpp_utils/wait/DBQueueWaitHandler.hpp | 6 - .../wait/impl/ConsumerWaitHandler.ipp | 5 +- .../wait/impl/DBQueueWaitHandler.ipp | 18 +- .../cpp/threading/manager/AsyncManager.cpp | 73 ++++ .../cpp/threading/manager/StdThreadPool.cpp | 134 +++++++ .../src/cpp/threading/manager/SyncManager.cpp | 33 ++ .../src/cpp/threading/task/OwnedTask.cpp | 45 +++ .../src/cpp/threading/task/ReferenceTask.cpp | 43 +++ cpp_utils/test/unittest/CMakeLists.txt | 1 + .../test/unittest/threading/CMakeLists.txt | 18 + .../threading/connector/CMakeLists.txt | 80 ++++ .../connector/one_shot_connector_test.cpp | 351 ++++++++++++++++++ .../connector/slot_connector_test.cpp | 335 +++++++++++++++++ .../unittest/threading/manager/CMakeLists.txt | 90 +++++ .../manager/manager_interface_test.cpp | 153 ++++++++ .../manager/std_thread_pool_test.cpp | 168 +++++++++ .../unittest/threading/task/CMakeLists.txt | 48 +++ .../threading/task/task_interface_test.cpp | 159 ++++++++ cpp_utils/test/unittest/wait/CMakeLists.txt | 3 +- .../unittest/wait/DBQueueWaitHandlerTest.cpp | 26 +- 36 files changed, 2577 insertions(+), 75 deletions(-) create mode 100644 cpp_utils/include/cpp_utils/threading/connector/OneShotConnector.hpp create mode 100644 cpp_utils/include/cpp_utils/threading/connector/SlotConnector.hpp create mode 100644 cpp_utils/include/cpp_utils/threading/connector/impl/OneShotConnector.ipp create mode 100644 cpp_utils/include/cpp_utils/threading/connector/impl/SlotConnector.ipp create mode 100644 cpp_utils/include/cpp_utils/threading/manager/AsyncManager.hpp create mode 100644 cpp_utils/include/cpp_utils/threading/manager/IManager.hpp create mode 100644 cpp_utils/include/cpp_utils/threading/manager/StdThreadPool.hpp create mode 100644 cpp_utils/include/cpp_utils/threading/manager/SyncManager.hpp create mode 100644 cpp_utils/include/cpp_utils/threading/task/ArgsOwnedTask.hpp create mode 100644 cpp_utils/include/cpp_utils/threading/task/ITask.hpp create mode 100644 cpp_utils/include/cpp_utils/threading/task/OwnedTask.hpp create mode 100644 cpp_utils/include/cpp_utils/threading/task/ReferenceTask.hpp create mode 100644 cpp_utils/include/cpp_utils/threading/task/impl/ArgsOwnedTask.ipp create mode 100644 cpp_utils/include/cpp_utils/threading/thread/CustomThread.hpp create mode 100644 cpp_utils/src/cpp/threading/manager/AsyncManager.cpp create mode 100644 cpp_utils/src/cpp/threading/manager/StdThreadPool.cpp create mode 100644 cpp_utils/src/cpp/threading/manager/SyncManager.cpp create mode 100644 cpp_utils/src/cpp/threading/task/OwnedTask.cpp create mode 100644 cpp_utils/src/cpp/threading/task/ReferenceTask.cpp create mode 100644 cpp_utils/test/unittest/threading/CMakeLists.txt create mode 100644 cpp_utils/test/unittest/threading/connector/CMakeLists.txt create mode 100644 cpp_utils/test/unittest/threading/connector/one_shot_connector_test.cpp create mode 100644 cpp_utils/test/unittest/threading/connector/slot_connector_test.cpp create mode 100644 cpp_utils/test/unittest/threading/manager/CMakeLists.txt create mode 100644 cpp_utils/test/unittest/threading/manager/manager_interface_test.cpp create mode 100644 cpp_utils/test/unittest/threading/manager/std_thread_pool_test.cpp create mode 100644 cpp_utils/test/unittest/threading/task/CMakeLists.txt create mode 100644 cpp_utils/test/unittest/threading/task/task_interface_test.cpp diff --git a/cmake_utils/cmake/test/test_target.cmake b/cmake_utils/cmake/test/test_target.cmake index 82162512..5645b95b 100644 --- a/cmake_utils/cmake/test/test_target.cmake +++ b/cmake_utils/cmake/test/test_target.cmake @@ -58,15 +58,24 @@ function(add_test_executable TEST_EXECUTABLE_NAME TEST_SOURCES TEST_NAME TEST_LI get_win32_path_dependencies(${TEST_EXECUTABLE_NAME} TEST_FRIENDLY_PATH) - foreach(test_name ${TEST_LIST}) - add_test(NAME ${TEST_NAME}.${test_name} - COMMAND ${TEST_EXECUTABLE_NAME} - --gtest_filter=${TEST_NAME}.${test_name}:**/${TEST_NAME}.${test_name}/**) - - if(TEST_FRIENDLY_PATH) - set_tests_properties(${TEST_NAME}.${test_name} PROPERTIES ENVIRONMENT "PATH=${TEST_FRIENDLY_PATH}") - endif(TEST_FRIENDLY_PATH) - endforeach() + if( TEST_LIST ) + # If list of tests is not empty, add each test separatly + foreach(test_name ${TEST_LIST}) + add_test(NAME ${TEST_NAME}.${test_name} + COMMAND ${TEST_EXECUTABLE_NAME} + --gtest_filter=${TEST_NAME}**.${test_name}:**/${TEST_NAME}**.${test_name}/**) + + if(TEST_FRIENDLY_PATH) + set_tests_properties(${TEST_NAME}.${test_name} PROPERTIES ENVIRONMENT "PATH=${TEST_FRIENDLY_PATH}") + endif(TEST_FRIENDLY_PATH) + endforeach() + else() + # If no tests are provided, create a single test + message(STATUS "Creating general test ${TEST_NAME}.") + add_test(NAME ${TEST_NAME} + COMMAND ${TEST_EXECUTABLE_NAME}) + endif( TEST_LIST ) + target_compile_definitions(${TEST_EXECUTABLE_NAME} PRIVATE FASTDDS_ENFORCE_LOG_INFO diff --git a/cpp_utils/include/cpp_utils/threading/connector/OneShotConnector.hpp b/cpp_utils/include/cpp_utils/threading/connector/OneShotConnector.hpp new file mode 100644 index 00000000..1bc7d974 --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/connector/OneShotConnector.hpp @@ -0,0 +1,49 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OwnedTask.hpp + * + * This file contains class Task definition. + */ + +#pragma once + +#include + +#include +#include + +namespace eprosima { +namespace utils { +namespace threading { + +template +class OneShotConnector +{ +public: + + static void execute(IManager* tp, const std::function& callback, Args... args); + + static void execute(IManager* tp, std::function&& callback, Args... args); + +}; +using SimpleOneShotConnector = OneShotConnector<>; + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ + +// Include implementation template file +#include diff --git a/cpp_utils/include/cpp_utils/threading/connector/SlotConnector.hpp b/cpp_utils/include/cpp_utils/threading/connector/SlotConnector.hpp new file mode 100644 index 00000000..669963ba --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/connector/SlotConnector.hpp @@ -0,0 +1,63 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SlotConnector.hpp + * + * This file contains class SlotConnector definition. + */ + +#pragma once + +#include + +#include +#include + +namespace eprosima { +namespace utils { +namespace threading { + +template +class SlotConnector +{ +public: + + SlotConnector( + IManager* manager, + const std::function& callback); + + SlotConnector( + IManager* manager, + std::function&& callback); + + ~SlotConnector() = default; + + void execute(Args...); + +protected: + + IManager* manager_; + + std::function callback_; + +}; +using SimpleSlotConnector = SlotConnector<>; + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ + +// Include implementation template file +#include diff --git a/cpp_utils/include/cpp_utils/threading/connector/impl/OneShotConnector.ipp b/cpp_utils/include/cpp_utils/threading/connector/impl/OneShotConnector.ipp new file mode 100644 index 00000000..bb4655a5 --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/connector/impl/OneShotConnector.ipp @@ -0,0 +1,73 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OneShotConnector.hpp + * + * This file contains class OneShotConnector implementation. + */ + +#pragma once + +#include + +namespace eprosima { +namespace utils { +namespace threading { + +template +void OneShotConnector::execute( + IManager* manager, + const std::function& callback, + Args... args) +{ + manager->execute( + std::make_unique>( + callback, + args... + ) + ); +} + +template +void OneShotConnector::execute( + IManager* manager, + std::function&& callback, + Args... args) +{ + manager->execute( + std::make_unique>( + std::move(callback), + args... + ) + ); +} + +// template +// void OneShotConnector::execute( +// IManager* manager, +// std::function callback, +// Args... args) +// { +// manager->execute( +// std::make_unique>( +// callback, +// args... +// ) +// ); +// } + +} /* namespace thread */ +} /* namespace event */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/threading/connector/impl/SlotConnector.ipp b/cpp_utils/include/cpp_utils/threading/connector/impl/SlotConnector.ipp new file mode 100644 index 00000000..cfb4c242 --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/connector/impl/SlotConnector.ipp @@ -0,0 +1,60 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SlotConnector.hpp + * + * This file contains class SlotConnector implementation. + */ + +#pragma once + +#include + +namespace eprosima { +namespace utils { +namespace threading { + +template +SlotConnector::SlotConnector( + IManager* manager, + const std::function& callback) + : manager_(manager) + , callback_(callback) +{ +} + +template +SlotConnector::SlotConnector( + IManager* manager, + std::function&& callback) + : manager_(manager) + , callback_(std::move(callback)) +{ +} + +template +void SlotConnector::execute(Args... args) +{ + manager_->execute( + std::make_unique>( + callback_, + args... + ) + ); +} + +} /* namespace thread */ +} /* namespace event */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/threading/manager/AsyncManager.hpp b/cpp_utils/include/cpp_utils/threading/manager/AsyncManager.hpp new file mode 100644 index 00000000..419f1214 --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/manager/AsyncManager.hpp @@ -0,0 +1,66 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file AsyncManager.hpp + * + * This file contains class AsyncManager definition. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace utils { +namespace threading { + +using TasksCollectionType = + Atomicable< + std::vector< + std::pair< + std::unique_ptr, + std::unique_ptr>>>; + +/** + * TODO + */ +class AsyncManager : public IManager +{ +public: + + AsyncManager() = default; + + ~AsyncManager(); + + virtual void execute(std::unique_ptr&& task) override; + + void clean_threads(); + +protected: + + TasksCollectionType tasks_running_; +}; + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/threading/manager/IManager.hpp b/cpp_utils/include/cpp_utils/threading/manager/IManager.hpp new file mode 100644 index 00000000..085dd73b --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/manager/IManager.hpp @@ -0,0 +1,38 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file IManager.hpp + * + * This file contains class SlotThreadPool definition. + */ + +#pragma once + +#include + +namespace eprosima { +namespace utils { +namespace threading { + +class IManager +{ +public: + virtual ~IManager() {}; + virtual void execute(std::unique_ptr&& task) = 0; +}; + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/threading/manager/StdThreadPool.hpp b/cpp_utils/include/cpp_utils/threading/manager/StdThreadPool.hpp new file mode 100644 index 00000000..722a9a47 --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/manager/StdThreadPool.hpp @@ -0,0 +1,84 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file StdThreadPool.hpp + * + * This file contains class StdThreadPool definition. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace utils { +namespace threading { + +/** + * TODO + */ +class StdThreadPool : public IManager +{ +public: + + StdThreadPool( + unsigned int n_threads, + bool start_running = true); + + virtual ~StdThreadPool(); + + void start(); + + void stop(); + + virtual void execute(std::unique_ptr&& task) override; + +protected: + + void thread_routine_(); + + /** + * @brief Double Queue Wait Handler to store task ids + * + * This double queue implement methods \c produce , to add tasks to the queue, and \c consume to wait until any + * task is available, and return the next task available. + * + * It will retrieve tasks in FIFO order. + * Produce and consume methods are not reciprocally blocking. + */ + event::DBQueueWaitHandler> task_queue_; + + /** + * @brief Threads container + * + * @note \c CustomThread are used instead of \c std::thread so some extra logic could be added to threads + * in future implementation (e.g. performance info). + */ + std::vector threads_; + + const unsigned int n_threads_; + +}; + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/threading/manager/SyncManager.hpp b/cpp_utils/include/cpp_utils/threading/manager/SyncManager.hpp new file mode 100644 index 00000000..9e21dd6b --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/manager/SyncManager.hpp @@ -0,0 +1,47 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SyncManager.hpp + * + * This file contains class SyncManager definition. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +namespace eprosima { +namespace utils { +namespace threading { + +/** + * TODO + */ +class SyncManager : public IManager +{ +public: + // virtual void execute(const ITask& task) override; + // virtual void execute(ITask&& task) override; + virtual void execute(std::unique_ptr&& task) override; +}; + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/threading/task/ArgsOwnedTask.hpp b/cpp_utils/include/cpp_utils/threading/task/ArgsOwnedTask.hpp new file mode 100644 index 00000000..4feeb456 --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/task/ArgsOwnedTask.hpp @@ -0,0 +1,107 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ArgsOwnedTask.hpp + * + * This file contains class Task definition. + */ + +#pragma once + +#include +#include + +#include + +namespace eprosima { +namespace utils { +namespace threading { + +namespace helper +{ + template + struct index {}; + + template + struct gen_seq : gen_seq {}; + + template + struct gen_seq<0, Ts...> : index {}; +} + +template +class ArgsOwnedTask : public ITask +{ +public: + + ArgsOwnedTask( + std::function callback, + const Args&... args); + + void operator()() noexcept override; + +protected: + + template + void call_internal_callback_(helper::index) + { + callback_(std::get(args_)...); + } + + std::function callback_; + std::tuple args_; + +}; + + +// template +// class ArgsOwnedTask : public ITask +// { +// public: + +// ArgsOwnedTask( +// const std::function& callback, +// const Args&... args); + +// // ArgsOwnedTask( +// // std::function&& callback, +// // Args... args); + +// virtual ~ArgsOwnedTask() = default; + +// template +// void call_(helper::index) +// { +// callback_(std::get(args_)...); +// } + +// virtual void operator()() noexcept override +// { +// call_(helper::gen_seq{}); +// } + +// protected: + +// std::function callback_; + +// std::tuple args_; +// }; + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ + +// Include implementation template file +#include diff --git a/cpp_utils/include/cpp_utils/threading/task/ITask.hpp b/cpp_utils/include/cpp_utils/threading/task/ITask.hpp new file mode 100644 index 00000000..e3daa7fa --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/task/ITask.hpp @@ -0,0 +1,36 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ITask.hpp + * + * This file contains class Task definition. + */ + +#pragma once + +namespace eprosima { +namespace utils { +namespace threading { + +class ITask +{ +public: + virtual ~ITask() {}; + virtual void operator()() noexcept = 0; +}; + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/threading/task/OwnedTask.hpp b/cpp_utils/include/cpp_utils/threading/task/OwnedTask.hpp new file mode 100644 index 00000000..d60387cd --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/task/OwnedTask.hpp @@ -0,0 +1,49 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OwnedTask.hpp + * + * This file contains class Task definition. + */ + +#pragma once + +#include + +#include + +namespace eprosima { +namespace utils { +namespace threading { + +class OwnedTask : public ITask +{ +public: + + OwnedTask(const std::function& callback); + + OwnedTask(std::function&& callback); + + virtual ~OwnedTask() = default; + + virtual void operator()() noexcept override; + + const std::function callback; +}; + + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/threading/task/ReferenceTask.hpp b/cpp_utils/include/cpp_utils/threading/task/ReferenceTask.hpp new file mode 100644 index 00000000..4761f147 --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/task/ReferenceTask.hpp @@ -0,0 +1,47 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ReferenceTask.hpp + * + * This file contains class Task definition. + */ + +#pragma once + +#include + +#include + +namespace eprosima { +namespace utils { +namespace threading { + +class ReferenceTask : public ITask +{ +public: + + ReferenceTask(const std::function* callback_ptr); + + virtual ~ReferenceTask() = default; + + virtual void operator()() noexcept override; + + const std::function* callback_ptr; + +}; + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/threading/task/impl/ArgsOwnedTask.ipp b/cpp_utils/include/cpp_utils/threading/task/impl/ArgsOwnedTask.ipp new file mode 100644 index 00000000..fbea3d80 --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/task/impl/ArgsOwnedTask.ipp @@ -0,0 +1,50 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ArgsOwnedTask.ipp + * + * This file contains class OneShotConnector implementation. + */ + +#pragma once + +namespace eprosima { +namespace utils { +namespace threading { + +template +ArgsOwnedTask::ArgsOwnedTask( + std::function callback, + const Args&... args) + : callback_(callback) + , args_(args...) +{ +} + +template +void ArgsOwnedTask::operator()() noexcept +{ + call_internal_callback_(helper::gen_seq{}); +} + +// template +// void ArgsOwnedTask::call_internal_callback_(helper::index) +// { +// callback_(std::get(args_)...); +// } + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/threading/thread/CustomThread.hpp b/cpp_utils/include/cpp_utils/threading/thread/CustomThread.hpp new file mode 100644 index 00000000..4d5875b4 --- /dev/null +++ b/cpp_utils/include/cpp_utils/threading/thread/CustomThread.hpp @@ -0,0 +1,45 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomThread.hpp + * + * This file contains class CustomThread definition. + */ + +#pragma once + +#include + +#include + +namespace eprosima { +namespace utils { +namespace threading { + +/** + * This class represents a thread that can be executed by a Thread Pool. + * + * @note this first implementation only uses this class as a \c std::thread for simplicity. + * In future implementations, this could be a more complex class. + */ +class CustomThread : public std::thread +{ +public: + using std::thread::thread; +}; + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/include/cpp_utils/wait/ConsumerWaitHandler.hpp b/cpp_utils/include/cpp_utils/wait/ConsumerWaitHandler.hpp index 4d13b683..23168965 100644 --- a/cpp_utils/include/cpp_utils/wait/ConsumerWaitHandler.hpp +++ b/cpp_utils/include/cpp_utils/wait/ConsumerWaitHandler.hpp @@ -18,7 +18,7 @@ #pragma once -#include +#include namespace eprosima { namespace utils { @@ -73,7 +73,6 @@ class ConsumerWaitHandler : protected CounterWaitHandler void produce( T&& value); - /** * @brief Add a new value to the collection. Use copy constructor. * @@ -122,18 +121,11 @@ class ConsumerWaitHandler : protected CounterWaitHandler virtual void add_value_( T&& value) = 0; - - /** - * @brief Method that adds a new value in the collection. Use copy constructor. - * - * This method must be reimplemented in child classes specialized to the internal collection. - * - * This method is called without any mutex taken and afterwards the internal counter is increased by 1. - * - * @param value new value + /* + * NOTE: + * Function add_value_ called with const reference is not available because of weird behaviour of override methods + * in template classes. */ - virtual void add_value_( - const T& value) = 0; /** * @brief Method that gets next available value from the collection @@ -159,6 +151,4 @@ class ConsumerWaitHandler : protected CounterWaitHandler } /* namespace eprosima */ // Include implementation template file -#include - - +#include diff --git a/cpp_utils/include/cpp_utils/wait/DBQueueWaitHandler.hpp b/cpp_utils/include/cpp_utils/wait/DBQueueWaitHandler.hpp index 7140b43d..4da53cdc 100644 --- a/cpp_utils/include/cpp_utils/wait/DBQueueWaitHandler.hpp +++ b/cpp_utils/include/cpp_utils/wait/DBQueueWaitHandler.hpp @@ -54,10 +54,6 @@ class DBQueueWaitHandler : public ConsumerWaitHandler void add_value_( T&& value) override; - //! Override of ConsumerWaitHandler method to copy a new value into the queue - void add_value_( - const T& value) override; - /** * @brief Override of \c ConsumerWaitHandler method to remove a value from the queue * @@ -83,5 +79,3 @@ class DBQueueWaitHandler : public ConsumerWaitHandler // Include implementation template file #include - - diff --git a/cpp_utils/include/cpp_utils/wait/impl/ConsumerWaitHandler.ipp b/cpp_utils/include/cpp_utils/wait/impl/ConsumerWaitHandler.ipp index 45041de8..69069a68 100644 --- a/cpp_utils/include/cpp_utils/wait/impl/ConsumerWaitHandler.ipp +++ b/cpp_utils/include/cpp_utils/wait/impl/ConsumerWaitHandler.ipp @@ -52,7 +52,8 @@ template void ConsumerWaitHandler::produce( const T& value) { - add_value_(value); + T dummy_copied_value__(value); + add_value_(std::move(dummy_copied_value__)); this->operator ++(); } @@ -81,5 +82,3 @@ T ConsumerWaitHandler::consume( } /* namespace event */ } /* namespace utils */ } /* namespace eprosima */ - - diff --git a/cpp_utils/include/cpp_utils/wait/impl/DBQueueWaitHandler.ipp b/cpp_utils/include/cpp_utils/wait/impl/DBQueueWaitHandler.ipp index a44c6bc3..82b0880a 100644 --- a/cpp_utils/include/cpp_utils/wait/impl/DBQueueWaitHandler.ipp +++ b/cpp_utils/include/cpp_utils/wait/impl/DBQueueWaitHandler.ipp @@ -15,10 +15,10 @@ * @file DBQueueWaitHandler.ipp */ -#include - #pragma once +#include + namespace eprosima { namespace utils { namespace event { @@ -31,14 +31,6 @@ void DBQueueWaitHandler::add_value_( queue_.Push(std::move(value)); } -template -void DBQueueWaitHandler::add_value_( - const T& value) -{ - logDebug(UTILS_WAIT_DBQUEUE, "Copying element to DBQueue."); - queue_.Push(value); -} - template T DBQueueWaitHandler::get_next_value_() { @@ -58,8 +50,8 @@ T DBQueueWaitHandler::get_next_value_() throw utils::InconsistencyException("Empty DBQueue, impossible to get value."); } - // TODO: Do it without copy - auto value = queue_.Front(); + // TODO: Do it with front and pop without copy + auto value = std::move(queue_.Front()); queue_.Pop(); return value; @@ -68,5 +60,3 @@ T DBQueueWaitHandler::get_next_value_() } /* namespace event */ } /* namespace utils */ } /* namespace eprosima */ - - diff --git a/cpp_utils/src/cpp/threading/manager/AsyncManager.cpp b/cpp_utils/src/cpp/threading/manager/AsyncManager.cpp new file mode 100644 index 00000000..2ba25356 --- /dev/null +++ b/cpp_utils/src/cpp/threading/manager/AsyncManager.cpp @@ -0,0 +1,73 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file AsyncManager.cpp + * + */ + +#include + +#include +#include + +namespace eprosima { +namespace utils { +namespace threading { + +AsyncManager::~AsyncManager() +{ + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "Closing Async Manager."); + clean_threads(); + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "Async Manager closed."); +} + +void AsyncManager::execute(std::unique_ptr&& task) +{ + // Lock mutex + std::unique_lock lock(tasks_running_); + + // Get reference to task + ITask* task_reference = task.get(); + + // Create and Insert task in new index + // Being indexed in map the unique ptr will not be erased + tasks_running_.push_back( + std::make_pair( + std::make_unique( + [task_reference](){ + task_reference->operator()(); + } + ), + std::move(task) + ) + ); + + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "New thread executing task."); +} + +void AsyncManager::clean_threads() +{ + std::unique_lock lock(tasks_running_); + for (auto& task : tasks_running_) + { + task.first->join(); + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "Thread finished, removing task and thread."); + } + tasks_running_.clear(); +} + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/src/cpp/threading/manager/StdThreadPool.cpp b/cpp_utils/src/cpp/threading/manager/StdThreadPool.cpp new file mode 100644 index 00000000..00cba3f0 --- /dev/null +++ b/cpp_utils/src/cpp/threading/manager/StdThreadPool.cpp @@ -0,0 +1,134 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file StdThreadPool.cpp + * + */ + +#include + +#include +#include + +namespace eprosima { +namespace utils { +namespace threading { + +StdThreadPool::StdThreadPool( + unsigned int n_threads, + bool start_running /* = true */) + : task_queue_(0, false) + , threads_() + , n_threads_(n_threads) +{ + if (start_running) + { + start(); + } +} + +StdThreadPool::~StdThreadPool() +{ + stop(); +} + +void StdThreadPool::start() +{ + if (!task_queue_.enabled()) + { + logDebug(DDSROUTER_STDTHREADPOOL, "Starting thread pool."); + + task_queue_.enable(); + + // Execute all threads + for (unsigned int i=0; i&& task) +{ + task_queue_.produce(std::move(task)); +} + +void StdThreadPool::thread_routine_() +{ + logDebug(DDSROUTER_STDTHREADPOOL, "Starting thread routine: " << std::this_thread::get_id() << "."); + + try + { + while (true) + { + logDebug( + DDSROUTER_STDTHREADPOOL, + "Thread: " << std::this_thread::get_id() << " free, getting new callback."); + + // Wait till there is a new task available + auto task = task_queue_.consume(); + + logDebug( + DDSROUTER_STDTHREADPOOL, + "Thread: " << std::this_thread::get_id() << " executing callback."); + + // Executing callback + task->operator()(); + + // NOTE: at this point task would not be further referenced and it will be destroyed. + } + } + catch (const utils::DisabledException& e) + { + logDebug(DDSROUTER_STDTHREADPOOL, "Stopping thread: " << std::this_thread::get_id() << "."); + } +} + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/src/cpp/threading/manager/SyncManager.cpp b/cpp_utils/src/cpp/threading/manager/SyncManager.cpp new file mode 100644 index 00000000..ff77a32c --- /dev/null +++ b/cpp_utils/src/cpp/threading/manager/SyncManager.cpp @@ -0,0 +1,33 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SyncManager.cpp + * + */ + +#include + +namespace eprosima { +namespace utils { +namespace threading { + +void SyncManager::execute(std::unique_ptr&& task) +{ + task->operator()(); +} + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/src/cpp/threading/task/OwnedTask.cpp b/cpp_utils/src/cpp/threading/task/OwnedTask.cpp new file mode 100644 index 00000000..d3e6bc66 --- /dev/null +++ b/cpp_utils/src/cpp/threading/task/OwnedTask.cpp @@ -0,0 +1,45 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OwnedTask.cpp + * + */ + +#include + +namespace eprosima { +namespace utils { +namespace threading { + +OwnedTask::OwnedTask(const std::function& callback) + : callback(callback) +{ + +} + +OwnedTask::OwnedTask(std::function&& callback) + : callback(std::move(callback)) +{ + +} + +void OwnedTask::operator()() noexcept +{ + callback.operator()(); +} + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/src/cpp/threading/task/ReferenceTask.cpp b/cpp_utils/src/cpp/threading/task/ReferenceTask.cpp new file mode 100644 index 00000000..a0212732 --- /dev/null +++ b/cpp_utils/src/cpp/threading/task/ReferenceTask.cpp @@ -0,0 +1,43 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ReferenceTask.cpp + * + */ + +#include +#include + +namespace eprosima { +namespace utils { +namespace threading { + +ReferenceTask::ReferenceTask(const std::function* callback_ptr) + : callback_ptr(callback_ptr) +{ + if (!callback_ptr) + { + throw InitializationException(STR_ENTRY << "ReferenceTask must be initialized with a valid ptr."); + } +} + +void ReferenceTask::operator()() noexcept +{ + callback_ptr->operator()(); +} + +} /* namespace threading */ +} /* namespace utils */ +} /* namespace eprosima */ diff --git a/cpp_utils/test/unittest/CMakeLists.txt b/cpp_utils/test/unittest/CMakeLists.txt index 501322b5..cfd8e7bd 100644 --- a/cpp_utils/test/unittest/CMakeLists.txt +++ b/cpp_utils/test/unittest/CMakeLists.txt @@ -22,6 +22,7 @@ add_subdirectory(math/random) add_subdirectory(memory) add_subdirectory(return_code) add_subdirectory(testing) +add_subdirectory(threading) add_subdirectory(thread_pool) add_subdirectory(time) add_subdirectory(utils) diff --git a/cpp_utils/test/unittest/threading/CMakeLists.txt b/cpp_utils/test/unittest/threading/CMakeLists.txt new file mode 100644 index 00000000..6d2cd196 --- /dev/null +++ b/cpp_utils/test/unittest/threading/CMakeLists.txt @@ -0,0 +1,18 @@ +# Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Add test subdirectories +add_subdirectory(connector) +add_subdirectory(manager) +add_subdirectory(task) diff --git a/cpp_utils/test/unittest/threading/connector/CMakeLists.txt b/cpp_utils/test/unittest/threading/connector/CMakeLists.txt new file mode 100644 index 00000000..d8414b63 --- /dev/null +++ b/cpp_utils/test/unittest/threading/connector/CMakeLists.txt @@ -0,0 +1,80 @@ + +################################### +# One Shot Connector Test +################################### + +set(TEST_NAME + OneShotConnectorTest) + +set(TEST_SOURCES + one_shot_connector_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + one_shot_test_no_params + one_shot_test_int + one_shot_test_string + one_shot_test_bool_int_string + one_shot_test_complex_args + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) + +################################### +# Slot Connector Test +################################### + +set(TEST_NAME + SlotConnectorTest) + +set(TEST_SOURCES + slot_connector_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + slot_test_no_params + slot_test_int + slot_test_string + slot_test_bool_int_string + slot_test_complex_args + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) diff --git a/cpp_utils/test/unittest/threading/connector/one_shot_connector_test.cpp b/cpp_utils/test/unittest/threading/connector/one_shot_connector_test.cpp new file mode 100644 index 00000000..5e2b3bca --- /dev/null +++ b/cpp_utils/test/unittest/threading/connector/one_shot_connector_test.cpp @@ -0,0 +1,351 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace eprosima; + +namespace test { + +utils::Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; +unsigned int DEFAULT_THREADS = 3u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + utils::event::IntWaitHandler& counter, + const int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + for (int i = 0; i < increase; ++i) + { + ++counter; + } +} + +void test_lambda_increase_waiter_add_string( + utils::event::IntWaitHandler& counter, + utils::Atomicable& bucket, + std::string string_to_add, + int increase = 1, + bool append_string = true) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + + // Lock string that will be modified + if (append_string) + { + std::unique_lock> lock(bucket); + bucket.append(string_to_add); + } + + for (int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Manager type to use. + * Using \c StdThreadPool because it is the one that will be used the most. + */ +using ManagerType = utils::threading::StdThreadPool; + +utils::threading::IManager* create_manager() +{ + return new ManagerType(DEFAULT_THREADS, true); +} + +} /* namespace eprosima */ + +/** + * Construct a StdThreadPool and uses OneShotConnector to send executions without parameters + * + * STEPS: + * - Create Manager + * - Call a OneShotConnector by copying an already existing std::function + * - Call OneShotConnector N times with a new created lambda each time + * - Check that the final value is the expected + */ +TEST(OneShotConnectorTest, one_shot_test_no_params) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + + // Manager object + utils::threading::IManager* manager = test::create_manager(); + + // Execute lambda by copy increasing in 1 + std::function lambda = [&counter](){ test::test_lambda_increase_waiter(counter, 1); }; + utils::threading::SimpleOneShotConnector::execute( + manager, + lambda); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by moving increasing in n + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + utils::threading::SimpleOneShotConnector::execute( + manager, + [&counter, i](){ test::test_lambda_increase_waiter(counter, i); }); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = + utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Erase Manager + delete manager; +} + +TEST(OneShotConnectorTest, one_shot_test_int) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + + // Manager object + utils::threading::IManager* manager = test::create_manager(); + + // Execute lambda by moving increasing in 1 + std::function lambda_move = [&counter](int x){ test::test_lambda_increase_waiter(counter, x); }; + utils::threading::OneShotConnector::execute( + manager, + std::move(lambda_move), + 1); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by copy increasing in 1 + std::function lambda = [&counter](int x){ test::test_lambda_increase_waiter(counter, x); }; + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + utils::threading::OneShotConnector::execute( + manager, + lambda, + static_cast(i)); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = + utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Erase Manager + delete manager; +} + +TEST(OneShotConnectorTest, one_shot_test_string) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + // String to check result + utils::Atomicable bucket; + + // Manager object + utils::threading::IManager* manager = test::create_manager(); + + // Execute lambda by moving increasing in 1 + std::function lambda_move = + [&counter, &bucket](std::string s){ test::test_lambda_increase_waiter_add_string(counter, bucket, s, 1); }; + utils::threading::OneShotConnector::execute( + manager, + std::move(lambda_move), + "Hello"); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), 1); + ASSERT_EQ(bucket, "Hello"); // It does not require mutex as the modification in test has already been done + // Reset counter + counter.set_value(0); + bucket.erase(); + + // Execute lambda N times by copy increasing in 1 + std::function lambda = + [&counter, &bucket](std::string s){ test::test_lambda_increase_waiter_add_string(counter, bucket, s, 1); }; + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + // Call execute with a character adding 'a' + i (-1 to start from 'a') + utils::threading::OneShotConnector::execute( + manager, + lambda, + std::string(1, static_cast('a' + i - 1))); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = + test::DEFAULT_TIME_REPETITIONS; + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Check the result string. It may not be in the order expected as the order of threads is not deterministic + // Thus, check that every char from 'a' to 'a' + N is in the string + for (char c = 'a'; c < static_cast('a' + test::DEFAULT_TIME_REPETITIONS); ++c) + { + ASSERT_NE(bucket.find(c), std::string::npos) << c; + } + + // Erase Manager + delete manager; +} + +TEST(OneShotConnectorTest, one_shot_test_bool_int_string) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + // String to check result + utils::Atomicable bucket; + + // Manager object + utils::threading::IManager* manager = test::create_manager(); + + // Execute lambda by moving increasing in 1 + std::function lambda_move = + [&counter, &bucket] + (bool b, int i, std::string s) + { test::test_lambda_increase_waiter_add_string(counter, bucket, s, i, b); }; + + utils::threading::OneShotConnector::execute( + manager, + std::move(lambda_move), + true, + 1, + "Hello"); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), 1); + ASSERT_EQ(bucket, "Hello"); // It does not require mutex as the modification in test has already been done + // Reset counter + counter.set_value(0); + bucket.erase(); + + // Execute lambda N times by copy increasing in 1 + std::function lambda = + [&counter, &bucket] + (bool b, int i, std::string s) + { test::test_lambda_increase_waiter_add_string(counter, bucket, s, i, b); }; + + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + // Whether it should add the char. Only add odd number chars + char c = static_cast('a' + i - 1); + bool append_char = static_cast(static_cast(c) % 2); + + // Call execute with a character adding 'a' + i (-1 to start from 'a') + utils::threading::OneShotConnector::execute( + manager, + lambda, + append_char, + i, + std::string(1, c)); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = + utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Check the result string. It may not be in the order expected as the order of threads is not deterministic + // Thus, check that every char from 'a' to 'a' + N is in the string + for (char c = 'a'; c < static_cast('a' + test::DEFAULT_TIME_REPETITIONS); ++c) + { + bool append_char = static_cast(static_cast(c) % 2); + if (append_char) + { + ASSERT_NE(bucket.find(c), std::string::npos); + } + else + { + ASSERT_EQ(bucket.find(c), std::string::npos); + } + } + + // Erase Manager + delete manager; +} + +TEST(OneShotConnectorTest, one_shot_test_complex_args) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + + // Manager object + utils::threading::IManager* manager = test::create_manager(); + + // Use a function reference and already created values to call in the Pool + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + utils::threading::OneShotConnector::execute( + manager, + test::test_lambda_increase_waiter, + counter, + static_cast(i)); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = + utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), target_value); + + delete manager; +} + +int main( + int argc, + char** argv) +{ + // utils::Log::SetVerbosity(utils::Log::Kind::Info); + // utils::Log::Flush(); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/cpp_utils/test/unittest/threading/connector/slot_connector_test.cpp b/cpp_utils/test/unittest/threading/connector/slot_connector_test.cpp new file mode 100644 index 00000000..20fc62f9 --- /dev/null +++ b/cpp_utils/test/unittest/threading/connector/slot_connector_test.cpp @@ -0,0 +1,335 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace eprosima; + +namespace test { + +utils::Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; +unsigned int DEFAULT_THREADS = 3u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + utils::event::IntWaitHandler& counter, + int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + for (int i = 0; i < increase; ++i) + { + ++counter; + } +} + +void test_lambda_increase_waiter_add_string( + utils::event::IntWaitHandler& counter, + utils::Atomicable& bucket, + std::string string_to_add, + int increase = 1, + bool append_string = true) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + + // Lock string that will be modified + if (append_string) + { + std::unique_lock> lock(bucket); + bucket.append(string_to_add); + } + + for (int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Manager type to use. + * Using \c StdThreadPool because it is the one that will be used the most. + */ +using ManagerType = utils::threading::StdThreadPool; + +utils::threading::IManager* create_manager() +{ + return new ManagerType(DEFAULT_THREADS, true); +} + +} /* namespace eprosima */ + +/** + * Construct a StdThreadPool and uses SlotConnector to send executions without parameters + * + * STEPS: + * - Create Manager + * - Call a SlotConnector by copying an already existing std::function + * - Call SlotConnector N times with a new created lambda each time + * - Check that the final value is the expected + */ +TEST(SlotConnectorTest, slot_test_no_params) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + + // Manager object + utils::threading::IManager* manager = test::create_manager(); + + // Create lambda increasing in 1 + std::function lambda = [&counter](){ test::test_lambda_increase_waiter(counter, 1); }; + // Create slot by copy + utils::threading::SimpleSlotConnector once_slot(manager, lambda); + + // Execute slot + once_slot.execute(); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by moving increasing in 1 by moving + utils::threading::SimpleSlotConnector move_slot(manager, std::move(lambda)); + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + move_slot.execute(); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = test::DEFAULT_TIME_REPETITIONS; + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Erase Manager + delete manager; +} + +TEST(SlotConnectorTest, slot_test_int) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + + // Manager object + utils::threading::IManager* manager = test::create_manager(); + + // Create lambda increasing in 1 + std::function lambda = [&counter](int x){ test::test_lambda_increase_waiter(counter, x); }; + // Create slot by copy + utils::threading::SlotConnector once_slot(manager, lambda); + + // Execute slot + once_slot.execute(1); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by moving increasing in 1 by moving + utils::threading::SlotConnector move_slot(manager, std::move(lambda)); + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + move_slot.execute(i); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Erase Manager + delete manager; +} + +TEST(SlotConnectorTest, slot_test_string) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + // String to check result + utils::Atomicable bucket; + + // Manager object + utils::threading::IManager* manager = test::create_manager(); + + // Create lambda increasing in 1 + std::function lambda = + [&counter, &bucket](std::string st){ test::test_lambda_increase_waiter_add_string(counter, bucket, st, 1); }; + // Create slot by copy + utils::threading::SlotConnector once_slot(manager, lambda); + + // Execute slot + once_slot.execute("Hello"); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), 1); + ASSERT_EQ(bucket, "Hello"); // It does not require mutex as the modification in test has already been done + // Reset counter + counter.set_value(0); + bucket.erase(); + + // Execute lambda N times by moving increasing in 1 by moving + utils::threading::SlotConnector move_slot(manager, std::move(lambda)); + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + move_slot.execute(std::string(1, static_cast('a' + i - 1))); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = test::DEFAULT_TIME_REPETITIONS; + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Check the result string. It may not be in the order expected as the order of threads is not deterministic + // Thus, check that every char from 'a' to 'a' + N is in the string + for (char c = 'a'; c < static_cast('a' + test::DEFAULT_TIME_REPETITIONS); ++c) + { + ASSERT_NE(bucket.find(c), std::string::npos) << c; + } + + // Erase Manager + delete manager; +} + +TEST(SlotConnectorTest, slot_test_bool_int_string) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + // String to check result + utils::Atomicable bucket; + + // Manager object + utils::threading::IManager* manager = test::create_manager(); + + // Create lambda increasing in n + std::function lambda = + [&counter, &bucket] + (bool b, int i, std::string s) + { test::test_lambda_increase_waiter_add_string(counter, bucket, s, i, b); }; + + // Create slot by copy + utils::threading::SlotConnector once_slot(manager, lambda); + + // Execute slot + once_slot.execute(true, 1, "Hello"); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), 1); + ASSERT_EQ(bucket, "Hello"); // It does not require mutex as the modification in test has already been done + // Reset counter + counter.set_value(0); + bucket.erase(); + + // Execute lambda N times by moving increasing in n by moving + utils::threading::SlotConnector move_slot(manager, std::move(lambda)); + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + // Whether it should add the char. Only add odd number chars + char c = static_cast('a' + i - 1); + bool append_char = static_cast(static_cast(c) % 2); + + move_slot.execute( + append_char, + i, + std::string(1, c)); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = + utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Check the result string. It may not be in the order expected as the order of threads is not deterministic + // Thus, check that every char from 'a' to 'a' + N is in the string + for (char c = 'a'; c < static_cast('a' + test::DEFAULT_TIME_REPETITIONS); ++c) + { + bool append_char = static_cast(static_cast(c) % 2); + if (append_char) + { + ASSERT_NE(bucket.find(c), std::string::npos); + } + else + { + ASSERT_EQ(bucket.find(c), std::string::npos); + } + } + + // Erase Manager + delete manager; +} + +TEST(SlotConnectorTest, slot_test_complex_args) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + + // Manager object + utils::threading::IManager* manager = test::create_manager(); + + // Use a function reference and already created values to call in the Pool + utils::threading::SlotConnector move_slot( + manager, + test::test_lambda_increase_waiter); + + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + move_slot.execute(counter, static_cast(i)); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = + utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), target_value); + + delete manager; +} + +int main( + int argc, + char** argv) +{ + // utils::Log::SetVerbosity(utils::Log::Kind::Info); + // utils::Log::Flush(); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/cpp_utils/test/unittest/threading/manager/CMakeLists.txt b/cpp_utils/test/unittest/threading/manager/CMakeLists.txt new file mode 100644 index 00000000..fa5673f2 --- /dev/null +++ b/cpp_utils/test/unittest/threading/manager/CMakeLists.txt @@ -0,0 +1,90 @@ +# Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +################################### +# IManager Specializations Test +################################### + +set(TEST_NAME + ParametrizedThreadManagerTest) + +set(TEST_SOURCES + manager_interface_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/manager/SyncManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/manager/AsyncManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + manager_execute + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) + +################################### +# StdThreadPool Test +################################### + +set(TEST_NAME + StdThreadPoolTest) + +set(TEST_SOURCES + std_thread_pool_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + pool_1_threads_1_tasks + pool_1_threads_M_tasks + pool_N_threads_N_tasks + pool_N_threads_NM_tasks + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) diff --git a/cpp_utils/test/unittest/threading/manager/manager_interface_test.cpp b/cpp_utils/test/unittest/threading/manager/manager_interface_test.cpp new file mode 100644 index 00000000..7e8e8949 --- /dev/null +++ b/cpp_utils/test/unittest/threading/manager/manager_interface_test.cpp @@ -0,0 +1,153 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace eprosima; + +namespace test { + +utils::Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; +unsigned int DEFAULT_THREADS = 3u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + utils::event::IntWaitHandler& counter, + unsigned int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + for (unsigned int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Task type to use. + * Using \c OwnedTask because it is the simpler and easier one. + */ +using TaskType = utils::threading::OwnedTask; + +template +utils::threading::IManager* create_manager_interface() +{ + return new Manager(); +} + +template <> +utils::threading::IManager* create_manager_interface() +{ + utils::threading::StdThreadPool* pool = new utils::threading::StdThreadPool(DEFAULT_THREADS, false); + pool->start(); + return pool; +} + +} /* namespace eprosima */ + +using namespace eprosima::utils; + +template +struct ThreadManagerTest : public ::testing::Test +{}; + +TYPED_TEST_SUITE_P(ThreadManagerTest); + +TYPED_TEST_P(ThreadManagerTest, manager_execute) +{ + // Waiter to check result + utils::event::IntWaitHandler counter(0); + + // Create manager + utils::threading::IManager* manager( + test::create_manager_interface()); + + // Execute lambda increasing in 1 + { + auto lambda = [&counter](){ test::test_lambda_increase_waiter(counter, 1); }; + test::TaskType task(lambda); + manager->execute(std::make_unique(lambda)); + } + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by copy increasing in 1 + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + manager->execute( + std::make_unique( + [&counter, i](){ test::test_lambda_increase_waiter(counter, i); })); + } + + // Wait for lambda to be called required times + utils::event::IntWaitHandlerType target_value = + utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + + // Erase Manager + delete manager; + + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); +} + +// Register test class and test cases +REGISTER_TYPED_TEST_SUITE_P( + ThreadManagerTest, + manager_execute +); + +// Set types used in parametrization +typedef ::testing::Types< + utils::threading::SyncManager, + utils::threading::AsyncManager, + utils::threading::StdThreadPool + > CaseTypes; + +// Generate each test case for each type case +INSTANTIATE_TYPED_TEST_SUITE_P( + ParametrizedThreadManagerTest, + ThreadManagerTest, + CaseTypes); + + +int main( + int argc, + char** argv) +{ + // utils::Log::SetVerbosity(utils::Log::Kind::Info); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/cpp_utils/test/unittest/threading/manager/std_thread_pool_test.cpp b/cpp_utils/test/unittest/threading/manager/std_thread_pool_test.cpp new file mode 100644 index 00000000..4936b3a0 --- /dev/null +++ b/cpp_utils/test/unittest/threading/manager/std_thread_pool_test.cpp @@ -0,0 +1,168 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +using namespace eprosima; + +namespace eprosima { +namespace utils { +namespace test { + +constexpr const utils::Duration_ms DEFAULT_TIME_TEST = 200u; // T +constexpr const utils::Duration_ms RESIDUAL_TIME_TEST = DEFAULT_TIME_TEST / 2u; // dT + +constexpr const uint32_t N_THREADS_IN_TEST = 10; // N +constexpr const uint32_t N_EXECUTIONS_IN_TEST = 20; // M + +void test_lambda_increase_waiter( + utils::event::IntWaitHandler& counter, + unsigned int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + + for (unsigned int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Task type to use. + * Using \c OwnedTask because it is the simpler and easier one. + */ +using TaskType = utils::threading::OwnedTask; + +/** + * TESTS EXPLANATION + * These tests create a StdThreadPool and execute tasks in it. + * + * Tasks: + * Tasks objects used are OwnedTask and are created in the moment to send it to execute, so they will be destroyed + * automatically when finishing the task. + * + * Task function: + * The function used waits for a time T and increases a WaitHandler value the amount of time given by parameter. + * The WaitHandler is used so the test can wait in main thread to the expected value. + * + * Parameteres: + * Two parameters are used within the tests: + * @param n_threads Number of threads + * @param m_tasks Number of repetitions (#tasks added to pool) + * + * @warning if \c m_tasks is not dividible by \c n_threads the test may not work as expected because of + * non exact division solution. + */ +void test_thread_pool_with_parameters( + unsigned int n_threads, + unsigned int m_tasks) +{ + // Create thread_pool + threading::StdThreadPool thread_pool(n_threads, false); + thread_pool.start(); + + // Create timer to know the task has been executed in the time expected + utils::Timer timer; + + // Counter Wait Handler to wait for the task to be executed and check the final value + utils::event::IntWaitHandler waiter(0); + + // Emit N tasks n times + for (uint32_t i = 1; i <= m_tasks; ++i) + { + thread_pool.execute( + std::make_unique( + [&waiter, i] () { test::test_lambda_increase_waiter(waiter, i); } + ) + ); + } + + // Wait for counter value to be greater than 0 (so 1 task is being executed) + utils::event::IntWaitHandlerType target_value = utils::arithmetic_progression_sum(1, 1, m_tasks); + waiter.wait_greater_equal_than(target_value); + + auto time_elapsed = timer.elapsed(); + + // Check that the task has been executed in more than the time expected for lambda and less than expected + // time and residual; and that function has been called exactly once + double lower_time_expected = test::DEFAULT_TIME_TEST * std::floor(m_tasks / n_threads); + double higher_time_expected = test::DEFAULT_TIME_TEST * std::ceil(m_tasks / n_threads) + test::RESIDUAL_TIME_TEST; + + ASSERT_GE(time_elapsed, lower_time_expected); + ASSERT_LE(time_elapsed, higher_time_expected); + ASSERT_EQ(waiter.get_value(), target_value); + + // Thread Pool is destroyed automatically and without errors +} + +} /* namespace test */ +} /* namespace utils */ +} /* namespace eprosima */ + +using namespace utils; + +/** + * Emit 1 tasks to a ThreadPool with 1 thread. + * Check that time elapsed is > T + */ +TEST(StdThreadPoolTest, pool_1_threads_1_tasks) +{ + test::test_thread_pool_with_parameters(1, 1); +} + +/** + * Emit M tasks to a ThreadPool with 1 thread. + */ +TEST(StdThreadPoolTest, pool_1_threads_M_tasks) +{ + test::test_thread_pool_with_parameters(1, test::N_EXECUTIONS_IN_TEST); +} + +/** + * Emit N tasks to a ThreadPool with N threads. + */ +TEST(StdThreadPoolTest, pool_N_threads_N_tasks) +{ + test::test_thread_pool_with_parameters(test::N_THREADS_IN_TEST, test::N_THREADS_IN_TEST); +} + +/** + * Emit M*N tasks to a ThreadPool with N threads. + */ +TEST(StdThreadPoolTest, pool_N_threads_NM_tasks) +{ + test::test_thread_pool_with_parameters( + test::N_THREADS_IN_TEST, + test::N_THREADS_IN_TEST * test::N_EXECUTIONS_IN_TEST); +} + +int main( + int argc, + char** argv) +{ + // eprosima::ddsxrouter::utils::Log::SetVerbosity(utils::Log::Kind::Info); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/cpp_utils/test/unittest/threading/task/CMakeLists.txt b/cpp_utils/test/unittest/threading/task/CMakeLists.txt new file mode 100644 index 00000000..4f417796 --- /dev/null +++ b/cpp_utils/test/unittest/threading/task/CMakeLists.txt @@ -0,0 +1,48 @@ +# Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +################################### +# ITask Specializations Test +################################### + +set(TEST_NAME + ParametrizedThreadTaskTest) + +set(TEST_SOURCES + task_interface_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/threading/task/ReferenceTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + task_operator + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) diff --git a/cpp_utils/test/unittest/threading/task/task_interface_test.cpp b/cpp_utils/test/unittest/threading/task/task_interface_test.cpp new file mode 100644 index 00000000..7b76bff7 --- /dev/null +++ b/cpp_utils/test/unittest/threading/task/task_interface_test.cpp @@ -0,0 +1,159 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include + +#include +#include +#include + +using namespace eprosima::utils; + +namespace test { + +Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + event::IntWaitHandler& counter, + unsigned int increase = 1) +{ + sleep_for(DEFAULT_TIME_TEST); + for (unsigned int i = 0; i < increase; ++i) + { + ++counter; + } +} + +template +threading::ITask* create_task_specialization(std::function* callback); + +template <> +threading::ITask* create_task_specialization( + std::function* callback) +{ + // Copy callback value inside new object + return new threading::ReferenceTask(callback); +} + +template <> +threading::ITask* create_task_specialization( + std::function* callback) +{ + // Copy callback value inside new object + return new threading::OwnedTask(*callback); +} + +template <> +threading::ITask* create_task_specialization>( + std::function* callback) +{ + // Copy callback value inside new object + return new threading::ArgsOwnedTask<>(*callback); +} + +template <> +threading::ITask* create_task_specialization>( + std::function* callback) +{ + // Copy callback value inside new object + return new threading::ArgsOwnedTask( + [callback](int x){ callback->operator()(); }, + 1); +} + +} /* namespace test */ + +// Empty class to parametrized tests +template +struct ThreadTaskTest : public ::testing::Test +{}; +// Needed gtest macro +TYPED_TEST_SUITE_P(ThreadTaskTest); + +/** + * This tests operator() of every specialization if ITask + * + * Uses a IntWaitHandler to increase a value and at the same time wait for it to be updated to a exact value. + * It is increased from a function executed inside the ITask, and the wait for it to be updated and check the value. + * + */ +TYPED_TEST_P(ThreadTaskTest, task_operator) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Function object to create tasks + std::function task_function( + [&counter](){ test::test_lambda_increase_waiter(counter, 1); }); + + // Create task + threading::ITask* task( + test::create_task_specialization( + &task_function)); + + // Execute lambda 1 time + task->operator()(); + counter.wait_equal(1); + counter.set_value(0); + + // Execute lambda N times + for (unsigned int i = 0; i < test::DEFAULT_TIME_REPETITIONS; ++i) + { + task->operator()(); + } + counter.wait_equal(test::DEFAULT_TIME_REPETITIONS); + + // Erase Task + delete task; +} + +// Register test class and test cases +REGISTER_TYPED_TEST_SUITE_P( + ThreadTaskTest, + task_operator +); + +// Set types used in parametrization +typedef ::testing::Types< + threading::ReferenceTask, + threading::OwnedTask, + threading::ArgsOwnedTask<>, + threading::ArgsOwnedTask + > CaseTypes; + +// Generate each test case for each type case +INSTANTIATE_TYPED_TEST_SUITE_P( + ParametrizedThreadTaskTest, + ThreadTaskTest, + CaseTypes); + +int main( + int argc, + char** argv) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/cpp_utils/test/unittest/wait/CMakeLists.txt b/cpp_utils/test/unittest/wait/CMakeLists.txt index 22cc85e0..69d854a5 100644 --- a/cpp_utils/test/unittest/wait/CMakeLists.txt +++ b/cpp_utils/test/unittest/wait/CMakeLists.txt @@ -90,8 +90,7 @@ all_library_sources("${TEST_SOURCES}") set(TEST_LIST push_pop_one_thread_int - push_pop_one_thread_string_move # not working - push_pop_one_thread_string_copy + push_pop_one_thread_string_move push_one_thread_pop_many_int ) diff --git a/cpp_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp b/cpp_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp index ed9620f7..4aa1dd99 100644 --- a/cpp_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp +++ b/cpp_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp @@ -89,37 +89,13 @@ TEST(DBQueueWaitHandlerTest, push_pop_one_thread_string_move) // This lvalue is moved as rvalue, so after moving it will be empty handler.produce(std::move(lvalue)); // TODO uncomment it once DBQueue supports moving values - // ASSERT_EQ(lvalue.size(), 0); + ASSERT_EQ(lvalue.size(), 0u); // Getting first value std::string pop_value = handler.consume(); EXPECT_EQ(source_value, pop_value); } -/** - * Check that pushing and popping values works as expected from the same thread - * Using std::string as object to use inside collection. - * - * CASES: - * - Push and pop one value by reference - */ -TEST(DBQueueWaitHandlerTest, push_pop_one_thread_string_copy) -{ - DBQueueWaitHandler handler; - - std::string lvalue("test_data"); - - handler.produce(lvalue); - - // Getting first value - std::string pop_value = handler.consume(); - EXPECT_EQ(lvalue, pop_value); - - // They should be different objects, check that modifying one does not modify the other - pop_value[0] = 'a'; - EXPECT_NE(lvalue, pop_value); -} - /** * STEPS: * - 1