diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 21da06ce03..df5160c84e 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -168,23 +168,64 @@ class function_body_leaf< continue_msg, Output, B > : public function_body< cont B body; }; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +class metainfo_tag_type; +#endif + +// TODO: add description +struct invoke_body_with_tag_helper { + using first_priority = int; + using second_priority = double; + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + template + static auto invoke(first_priority, Body&& body, metainfo_tag_type&& tag, Args&&... args) + noexcept(noexcept(tbb::detail::invoke(std::forward(body), std::forward(args)..., std::move(tag)))) + -> decltype(tbb::detail::invoke(std::forward(body), std::forward(args)..., std::move(tag)), void()) + { + tbb::detail::invoke(std::forward(body), std::forward(args)..., std::move(tag)); + } +#endif + template + static void invoke(second_priority, Body&& body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&&), + Args&&... args) + noexcept(noexcept(tbb::detail::invoke(std::forward(body), std::forward(args)...))) + { + tbb::detail::invoke(std::forward(body), std::forward(args)...); + } +}; + +// TODO: add comment +template +void invoke_body_with_tag(Body&& body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& tag), Args&&... args) + noexcept(noexcept(invoke_body_with_tag_helper::invoke(1, std::forward(body) __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), + std::forward(args)...))) +{ + invoke_body_with_tag_helper::invoke(/*overload priority helper*/1, + std::forward(body) __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), + std::forward(args)...); +} + + //! function_body that takes an Input and a set of output ports template class multifunction_body : no_assign { public: virtual ~multifunction_body () {} - virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/) = 0; + virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/ + __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& /*tag*/)) = 0; virtual multifunction_body* clone() = 0; virtual void* get_body_ptr() = 0; }; //! leaf for multifunction. OutputSet can be a std::tuple or a vector. -template +template class multifunction_body_leaf : public multifunction_body { public: multifunction_body_leaf(const B &_body) : body(_body) { } - void operator()(const Input &input, OutputSet &oset) override { - tbb::detail::invoke(body, input, oset); // body may explicitly put() to one or more of oset. + // body may explicitly put() to one or more of oset. + void operator()(const Input &input, OutputSet &oset __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& tag)) override { + invoke_body_with_tag(body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), input, oset); } void* get_body_ptr() override { return &body; } multifunction_body_leaf* clone() override { diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index 647f3dc1b6..64d8468107 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -419,12 +419,13 @@ class broadcast_cache : public successor_cache { #endif // call try_put_task and return list of received tasks - bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) { + bool gather_successful_try_puts( const T &t, graph_task_list& tasks + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) { bool is_at_least_one_put_successful = false; typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); typename successors_type::iterator i = this->my_successors.begin(); while ( i != this->my_successors.end() ) { - graph_task * new_task = (*i)->try_put_task(t); + graph_task * new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); if(new_task) { ++i; if(new_task != SUCCESSFULLY_ENQUEUED) { diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 336cb069c6..bc22f81e8a 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -554,6 +554,70 @@ struct init_output_ports { } }; // struct init_output_ports +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + +class metainfo_tag_type { +public: + metainfo_tag_type() = default; + + metainfo_tag_type(const metainfo_tag_type&) = delete; + + metainfo_tag_type(metainfo_tag_type&& other) + : my_metainfo(std::move(other.my_metainfo)) {} + + metainfo_tag_type(const message_metainfo& metainfo) : my_metainfo(metainfo) { + for (auto waiter : my_metainfo.waiters()) { + waiter->reserve(); + } + } + + metainfo_tag_type& operator=(const metainfo_tag_type&) = delete; + metainfo_tag_type& operator=(metainfo_tag_type&& other) { + // TODO: should this method be thread-safe? + if (this != &other) { + reset(); + my_metainfo = std::move(other.my_metainfo); + } + return *this; + } + + ~metainfo_tag_type() { + reset(); + } + + void merge(const metainfo_tag_type& other_tag) { + tbb::spin_mutex::scoped_lock lock(my_mutex); + + // TODO: add comment + for (auto waiter : other_tag.my_metainfo.waiters()) { + waiter->reserve(); + } + my_metainfo.merge(other_tag.my_metainfo); + } + + void reset() { + tbb::spin_mutex::scoped_lock lock(my_mutex); + + for (auto waiter : my_metainfo.waiters()) { + waiter->release(); + } + my_metainfo = message_metainfo{}; + } +private: + friend struct metainfo_tag_accessor; + + message_metainfo my_metainfo; + tbb::spin_mutex my_mutex; +}; + +struct metainfo_tag_accessor { + static const message_metainfo& get_metainfo(const metainfo_tag_type& tag) { + return tag.my_metainfo; + } +}; + +#endif + //! Implements methods for a function node that takes a type Input as input // and has a tuple of output ports specified. template< typename Input, typename OutputPortSet, typename Policy, typename A> @@ -562,6 +626,9 @@ class multifunction_input : public function_input_base::value; typedef Input input_type; typedef OutputPortSet output_ports_type; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + typedef metainfo_tag_type tag_type; +#endif typedef multifunction_body multifunction_body_type; typedef multifunction_input my_class; typedef function_input_base base_type; @@ -570,7 +637,9 @@ class multifunction_input : public function_input_base multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority ) - : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports))) + : base_type(g, max_concurrency, a_priority, + noexcept(invoke_body_with_tag(body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type{}), + input_type(), my_output_ports))) , my_body( new multifunction_body_leaf(body) ) , my_init_body( new multifunction_body_leaf(body) ) , my_output_ports(init_output_ports::call(g, my_output_ports)){ @@ -599,10 +668,13 @@ class multifunction_input : public function_input_base { multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {} bool try_put(const output_type &i) { - graph_task *res = try_put_task(i); + return try_put_impl(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); + } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool try_put(const output_type& i, const metainfo_tag_type& tag) { + return try_put_impl(i, metainfo_tag_accessor::get_metainfo(tag)); + } + + bool try_put(const output_type& i, metainfo_tag_type&& tag) { + metainfo_tag_type local_tag = std::move(tag); + return try_put_impl(i, metainfo_tag_accessor::get_metainfo(local_tag)); + } +#endif + + using base_type::graph_reference; + +protected: + bool try_put_impl(const output_type& i __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) { + graph_task *res = try_put_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); if( !res ) return false; if( res != SUCCESSFULLY_ENQUEUED ) { // wrapping in task_arena::execute() is not needed since the method is called from @@ -861,10 +951,6 @@ class multifunction_output : public function_output { return true; } - using base_type::graph_reference; - -protected: - graph_task* try_put_task(const output_type &i) { return my_successors.try_put_task(i); } diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 5b438faabf..5ed95fa46a 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -84,6 +84,13 @@ class continue_msg {}; } // namespace d2 +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) , metainfo + +#else +#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + #if __TBB_CPP20_CONCEPTS_PRESENT namespace d0 { @@ -114,18 +121,26 @@ concept input_node_body = std::copy_constructible && { body(fc) } -> adaptive_same_as; }; -template +template concept multifunction_node_body = std::copy_constructible && - std::invocable; + std::invocable +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + || std::invocable&&> +#endif + ; template concept sequencer = std::copy_constructible && std::invocable && std::convertible_to, std::size_t>; -template +template concept async_node_body = std::copy_constructible && - std::invocable; + std::invocable +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + || std::invocable&&> +#endif + ; } // namespace d0 #endif // __TBB_CPP20_CONCEPTS_PRESENT @@ -212,11 +227,6 @@ class message_metainfo { private: waiters_type my_waiters; }; // class message_metainfo - -#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) , metainfo - -#else -#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT //! Pure virtual template class that defines a sender of messages of type T @@ -989,8 +999,11 @@ class multifunction_node : private: using input_impl_type::my_predecessors; public: +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + typedef typename input_impl_type::tag_type tag_type; +#endif template - __TBB_requires(multifunction_node_body) + __TBB_requires(multifunction_node_body) __TBB_NOINLINE_SYM multifunction_node( graph &g, size_t concurrency, Body body, Policy = Policy(), node_priority_t a_priority = no_priority @@ -1003,13 +1016,13 @@ class multifunction_node : } template - __TBB_requires(multifunction_node_body) + __TBB_requires(multifunction_node_body) __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority) : multifunction_node(g, concurrency, body, Policy(), a_priority) {} #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET template - __TBB_requires(multifunction_node_body) + __TBB_requires(multifunction_node_body) __TBB_NOINLINE_SYM multifunction_node(const node_set& nodes, size_t concurrency, Body body, Policy p = Policy(), node_priority_t a_priority = no_priority) : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) { @@ -1017,7 +1030,7 @@ class multifunction_node : } template - __TBB_requires(multifunction_node_body) + __TBB_requires(multifunction_node_body) __TBB_NOINLINE_SYM multifunction_node(const node_set& nodes, size_t concurrency, Body body, node_priority_t a_priority) : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {} #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET @@ -3130,8 +3143,11 @@ class async_body: public async_body_base { async_body(const Body &body, gateway_type *gateway) : base_type(gateway), my_body(body) { } - void operator()( const Input &v, Ports & ) noexcept(noexcept(tbb::detail::invoke(my_body, v, std::declval()))) { - tbb::detail::invoke(my_body, v, *this->my_gateway); + void operator()( const Input &v, Ports & __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& tag) ) + noexcept(noexcept(invoke_body_with_tag(my_body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), + v, std::declval()))) + { + invoke_body_with_tag(my_body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), v, *this->my_gateway); } Body get_body() { return my_body; } @@ -3157,6 +3173,9 @@ class async_node typedef receiver_gateway gateway_type; typedef async_body_base async_body_base_type; typedef typename base_type::output_ports_type output_ports_type; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + typedef typename mfn_input_type::tag_type tag_type; +#endif private: class receiver_gateway_impl: public receiver_gateway { @@ -3176,9 +3195,20 @@ class async_node //! Implements gateway_type::try_put for an external activity to submit a message to FG bool try_put(const Output &i) override { - return my_node->try_put_impl(i); + return my_node->try_put_impl(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool try_put(const Output &i, const metainfo_tag_type& tag) override { + return my_node->try_put_impl(i, metainfo_tag_accessor::get_metainfo(tag)); + } + + bool try_put(const Output &i, metainfo_tag_type&& tag) override { + metainfo_tag_type local_tag = std::move(tag); + return my_node->try_put_impl(i, metainfo_tag_accessor::get_metainfo(local_tag)); + } +#endif + private: async_node* my_node; } my_gateway; @@ -3187,13 +3217,14 @@ class async_node async_node* self() { return this; } //! Implements gateway_type::try_put for an external activity to submit a message to FG - bool try_put_impl(const Output &i) { + bool try_put_impl(const Output &i __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) { multifunction_output &port_0 = output_port<0>(*this); broadcast_cache& port_successors = port_0.successors(); fgt_async_try_put_begin(this, &port_0); // TODO revamp: change to std::list graph_task_list tasks; - bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks); + bool is_at_least_one_put_successful = + port_successors.gather_successful_try_puts(i, tasks __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(), "Return status is inconsistent with the method operation." ); @@ -3206,7 +3237,7 @@ class async_node public: template - __TBB_requires(async_node_body) + __TBB_requires(async_node_body) __TBB_NOINLINE_SYM async_node( graph &g, size_t concurrency, Body body, Policy = Policy(), node_priority_t a_priority = no_priority @@ -3222,13 +3253,13 @@ class async_node } template - __TBB_requires(async_node_body) + __TBB_requires(async_node_body) __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority) : async_node(g, concurrency, body, Policy(), a_priority) {} #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET template - __TBB_requires(async_node_body) + __TBB_requires(async_node_body) __TBB_NOINLINE_SYM async_node( const node_set& nodes, size_t concurrency, Body body, Policy = Policy(), node_priority_t a_priority = no_priority ) @@ -3237,7 +3268,7 @@ class async_node } template - __TBB_requires(async_node_body) + __TBB_requires(async_node_body) __TBB_NOINLINE_SYM async_node(const node_set& nodes, size_t concurrency, Body body, node_priority_t a_priority) : async_node(nodes, concurrency, body, Policy(), a_priority) {} #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET diff --git a/include/oneapi/tbb/flow_graph_abstractions.h b/include/oneapi/tbb/flow_graph_abstractions.h index 329e75c43e..6ae4777639 100644 --- a/include/oneapi/tbb/flow_graph_abstractions.h +++ b/include/oneapi/tbb/flow_graph_abstractions.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -33,6 +33,9 @@ class graph_proxy { virtual ~graph_proxy() {} }; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +class metainfo_tag_type; +#endif template class receiver_gateway : public graph_proxy { public: @@ -41,6 +44,10 @@ class receiver_gateway : public graph_proxy { //! Submit signal from an asynchronous activity to FG. virtual bool try_put(const input_type&) = 0; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + virtual bool try_put(const input_type&, const metainfo_tag_type&) = 0; + virtual bool try_put(const input_type&, metainfo_tag_type&&) = 0; +#endif }; } // d2 diff --git a/test/tbb/test_async_node.cpp b/test/tbb/test_async_node.cpp index edab0c3857..bba27ededf 100644 --- a/test/tbb/test_async_node.cpp +++ b/test/tbb/test_async_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2023 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ #include "common/spin_barrier.h" #include "common/test_follows_and_precedes_api.h" #include "common/concepts_common.h" +#include "test_try_put_and_wait.h" #include #include @@ -40,8 +41,6 @@ //! \file test_async_node.cpp //! \brief Test for [flow_graph.async_node] specification - - class minimal_type { template friend struct place_wrapper; @@ -878,3 +877,11 @@ TEST_CASE("constraints for async_node body") { } #endif // __TBB_CPP20_CONCEPTS_PRESENT + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +//! \brief \ref error_guessing +TEST_CASE("test async_node try_put_and_wait") { + using node_type = oneapi::tbb::flow::async_node; + test_try_put_and_wait::test_multioutput(); +} +#endif diff --git a/test/tbb/test_buffer_node.cpp b/test/tbb/test_buffer_node.cpp index 527005aecb..80fd77790a 100644 --- a/test/tbb/test_buffer_node.cpp +++ b/test/tbb/test_buffer_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ #include "common/graph_utils.h" #include "common/test_follows_and_precedes_api.h" -#include "test_buffering_try_put_and_wait.h" +#include "test_try_put_and_wait.h" //! \file test_buffer_node.cpp //! \brief Test for [flow_graph.buffer_node] specification diff --git a/test/tbb/test_buffering_try_put_and_wait.h b/test/tbb/test_buffering_try_put_and_wait.h deleted file mode 100644 index 300521233f..0000000000 --- a/test/tbb/test_buffering_try_put_and_wait.h +++ /dev/null @@ -1,189 +0,0 @@ -/* - Copyright (c) 2024 Intel Corporation - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -#ifndef __TBB_test_tbb_buffering_try_put_and_wait_H -#define __TBB_test_tbb_buffering_try_put_and_wait_H - -#include -#include - -#include - -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - -namespace test_try_put_and_wait { - -template -std::size_t test_buffer_push(const std::vector& start_work_items, - int wait_message, - const std::vector& new_work_items, - std::vector& processed_items, - Args... args) -{ - std::size_t after_try_put_and_wait_start_index = 0; - tbb::task_arena arena(1); - - arena.execute([&] { - tbb::flow::graph g; - - using function_node_type = tbb::flow::function_node; - - BufferingNode buffer1(g, args...); - - function_node_type function(g, tbb::flow::serial, - [&](int input) noexcept { - if (input == wait_message) { - for (auto item : new_work_items) { - buffer1.try_put(item); - } - } - return input; - }); - - BufferingNode buffer2(g, args...); - - function_node_type writer(g, tbb::flow::unlimited, - [&](int input) noexcept { - processed_items.emplace_back(input); - return 0; - }); - - tbb::flow::make_edge(buffer1, function); - tbb::flow::make_edge(function, buffer2); - tbb::flow::make_edge(buffer2, writer); - - for (auto item : start_work_items) { - buffer1.try_put(item); - } - - buffer1.try_put_and_wait(wait_message); - - after_try_put_and_wait_start_index = processed_items.size(); - - g.wait_for_all(); - }); - - return after_try_put_and_wait_start_index; -} - -template -std::size_t test_buffer_pull(const std::vector& start_work_items, - int wait_message, - int occupier, - const std::vector& new_work_items, - std::vector& processed_items, - Args... args) -{ - tbb::task_arena arena(1); - std::size_t after_try_put_and_wait_start_index = 0; - - arena.execute([&] { - tbb::flow::graph g; - - using function_node_type = tbb::flow::function_node; - - BufferingNode buffer(g, args...); - - function_node_type function(g, tbb::flow::serial, - [&](int input) noexcept { - if (input == wait_message) { - for (auto item : new_work_items) { - buffer.try_put(item); - } - } - - processed_items.emplace_back(input); - return 0; - }); - - // Occupy the concurrency of function_node - // This call spawns the task to process the occupier - function.try_put(occupier); - - // Make edge between buffer and function after occupying the concurrency - // To ensure that forward task of the buffer would be spawned after the occupier task - // And the function_node would reject the items from the buffer - // and process them later by calling try_get on the buffer - tbb::flow::make_edge(buffer, function); - - for (auto item : start_work_items) { - buffer.try_put(item); - } - - buffer.try_put_and_wait(wait_message); - - after_try_put_and_wait_start_index = processed_items.size(); - - g.wait_for_all(); - }); - - return after_try_put_and_wait_start_index; -} - -template -std::size_t test_buffer_reserve(std::size_t limiter_threshold, - const std::vector& start_work_items, - int wait_message, - const std::vector& new_work_items, - std::vector& processed_items, - Args... args) -{ - tbb::task_arena arena(1); - std::size_t after_try_put_and_wait_start_index = 0; - - arena.execute([&] { - tbb::flow::graph g; - - BufferingNode buffer(g, args...); - - tbb::flow::limiter_node limiter(g, limiter_threshold); - tbb::flow::function_node function(g, tbb::flow::serial, - [&](int input) { - if (input == wait_message) { - for (auto item : new_work_items) { - buffer.try_put(item); - } - } - // Explicitly put to the decrementer instead of making edge - // to guarantee that the next task would be spawned and not returned - // to the current thread as the next task - // Otherwise, all elements would be processed during the try_put_and_wait - limiter.decrementer().try_put(1); - processed_items.emplace_back(input); - return 0; - }); - - tbb::flow::make_edge(buffer, limiter); - tbb::flow::make_edge(limiter, function); - - for (auto item : start_work_items) { - buffer.try_put(item); - } - - buffer.try_put_and_wait(wait_message); - - after_try_put_and_wait_start_index = processed_items.size(); - - g.wait_for_all(); - }); - - return after_try_put_and_wait_start_index; -} - -} // test_try_put_and_wait - -#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT -#endif // __TBB_test_tbb_buffering_try_put_and_wait_H diff --git a/test/tbb/test_function_node.cpp b/test/tbb/test_function_node.cpp index 999adac189..f11324f2e7 100644 --- a/test/tbb/test_function_node.cpp +++ b/test/tbb/test_function_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -657,7 +657,7 @@ void test_try_put_and_wait_rejecting(size_t concurrency_limit) { std::vector processed_items; std::vector new_work_items; - int wait_message = 0; + int wait_message = 10; for (int i = 1; i < wait_message; ++i) { new_work_items.emplace_back(i); diff --git a/test/tbb/test_multifunction_node.cpp b/test/tbb/test_multifunction_node.cpp index bfebbbe94f..f5696afb95 100644 --- a/test/tbb/test_multifunction_node.cpp +++ b/test/tbb/test_multifunction_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2021 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ #include "common/graph_utils.h" #include "common/test_follows_and_precedes_api.h" #include "common/concepts_common.h" +#include "test_try_put_and_wait.h" //! \file test_multifunction_node.cpp @@ -623,3 +624,12 @@ TEST_CASE("constraints for multifunction_node body") { static_assert(!can_call_multifunction_node_ctor>); } #endif // __TBB_CPP20_CONCEPTS_PRESENT + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +//! \brief \ref error_guessing +TEST_CASE("multifunction_node try_put_and_wait") { + using node_type = oneapi::tbb::flow::multifunction_node>; + test_try_put_and_wait::test_multioutput(); +} + +#endif diff --git a/test/tbb/test_overwrite_node.cpp b/test/tbb/test_overwrite_node.cpp index 3f5ed8fec0..270624abf7 100644 --- a/test/tbb/test_overwrite_node.cpp +++ b/test/tbb/test_overwrite_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ #include "common/graph_utils.h" #include "common/test_follows_and_precedes_api.h" -#include "test_buffering_try_put_and_wait.h" +#include "test_try_put_and_wait.h" //! \file test_overwrite_node.cpp //! \brief Test for [flow_graph.overwrite_node] specification diff --git a/test/tbb/test_priority_queue_node.cpp b/test/tbb/test_priority_queue_node.cpp index 18a60eb935..5971f2d62c 100644 --- a/test/tbb/test_priority_queue_node.cpp +++ b/test/tbb/test_priority_queue_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ #include -#include "test_buffering_try_put_and_wait.h" +#include "test_try_put_and_wait.h" //! \file test_priority_queue_node.cpp //! \brief Test for [flow_graph.priority_queue_node] specification diff --git a/test/tbb/test_queue_node.cpp b/test/tbb/test_queue_node.cpp index 546b47edae..25dd3cbc29 100644 --- a/test/tbb/test_queue_node.cpp +++ b/test/tbb/test_queue_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ #include -#include "test_buffering_try_put_and_wait.h" +#include "test_try_put_and_wait.h" //! \file test_queue_node.cpp //! \brief Test for [flow_graph.queue_node] specification diff --git a/test/tbb/test_sequencer_node.cpp b/test/tbb/test_sequencer_node.cpp index 1e6494d69b..bc75d7d891 100644 --- a/test/tbb/test_sequencer_node.cpp +++ b/test/tbb/test_sequencer_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -28,7 +28,7 @@ #include #include -#include "test_buffering_try_put_and_wait.h" +#include "test_try_put_and_wait.h" //! \file test_sequencer_node.cpp //! \brief Test for [flow_graph.sequencer_node] specification diff --git a/test/tbb/test_try_put_and_wait.h b/test/tbb/test_try_put_and_wait.h new file mode 100644 index 0000000000..3e9c3d94f4 --- /dev/null +++ b/test/tbb/test_try_put_and_wait.h @@ -0,0 +1,468 @@ +/* + Copyright (c) 2024-2025 Intel Corporation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __TBB_test_tbb_buffering_try_put_and_wait_H +#define __TBB_test_tbb_buffering_try_put_and_wait_H + +#include +#include + +#include + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + +namespace test_try_put_and_wait { + +template +std::size_t test_buffer_push(const std::vector& start_work_items, + int wait_message, + const std::vector& new_work_items, + std::vector& processed_items, + Args... args) +{ + std::size_t after_try_put_and_wait_start_index = 0; + tbb::task_arena arena(1); + + arena.execute([&] { + tbb::flow::graph g; + + using function_node_type = tbb::flow::function_node; + + BufferingNode buffer1(g, args...); + + function_node_type function(g, tbb::flow::serial, + [&](int input) noexcept { + if (input == wait_message) { + for (auto item : new_work_items) { + buffer1.try_put(item); + } + } + return input; + }); + + BufferingNode buffer2(g, args...); + + function_node_type writer(g, tbb::flow::unlimited, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(buffer1, function); + tbb::flow::make_edge(function, buffer2); + tbb::flow::make_edge(buffer2, writer); + + for (auto item : start_work_items) { + buffer1.try_put(item); + } + + buffer1.try_put_and_wait(wait_message); + + after_try_put_and_wait_start_index = processed_items.size(); + + g.wait_for_all(); + }); + + return after_try_put_and_wait_start_index; +} + +template +std::size_t test_buffer_pull(const std::vector& start_work_items, + int wait_message, + int occupier, + const std::vector& new_work_items, + std::vector& processed_items, + Args... args) +{ + tbb::task_arena arena(1); + std::size_t after_try_put_and_wait_start_index = 0; + + arena.execute([&] { + tbb::flow::graph g; + + using function_node_type = tbb::flow::function_node; + + BufferingNode buffer(g, args...); + + function_node_type function(g, tbb::flow::serial, + [&](int input) noexcept { + if (input == wait_message) { + for (auto item : new_work_items) { + buffer.try_put(item); + } + } + + processed_items.emplace_back(input); + return 0; + }); + + // Occupy the concurrency of function_node + // This call spawns the task to process the occupier + function.try_put(occupier); + + // Make edge between buffer and function after occupying the concurrency + // To ensure that forward task of the buffer would be spawned after the occupier task + // And the function_node would reject the items from the buffer + // and process them later by calling try_get on the buffer + tbb::flow::make_edge(buffer, function); + + for (auto item : start_work_items) { + buffer.try_put(item); + } + + buffer.try_put_and_wait(wait_message); + + after_try_put_and_wait_start_index = processed_items.size(); + + g.wait_for_all(); + }); + + return after_try_put_and_wait_start_index; +} + +template +std::size_t test_buffer_reserve(std::size_t limiter_threshold, + const std::vector& start_work_items, + int wait_message, + const std::vector& new_work_items, + std::vector& processed_items, + Args... args) +{ + tbb::task_arena arena(1); + std::size_t after_try_put_and_wait_start_index = 0; + + arena.execute([&] { + tbb::flow::graph g; + + BufferingNode buffer(g, args...); + + tbb::flow::limiter_node limiter(g, limiter_threshold); + tbb::flow::function_node function(g, tbb::flow::serial, + [&](int input) { + if (input == wait_message) { + for (auto item : new_work_items) { + buffer.try_put(item); + } + } + // Explicitly put to the decrementer instead of making edge + // to guarantee that the next task would be spawned and not returned + // to the current thread as the next task + // Otherwise, all elements would be processed during the try_put_and_wait + limiter.decrementer().try_put(1); + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(buffer, limiter); + tbb::flow::make_edge(limiter, function); + + for (auto item : start_work_items) { + buffer.try_put(item); + } + + buffer.try_put_and_wait(wait_message); + + after_try_put_and_wait_start_index = processed_items.size(); + + g.wait_for_all(); + }); + + return after_try_put_and_wait_start_index; +} + +template +struct ports_or_gateway; + +template +struct ports_or_gateway> { + using type = typename tbb::flow::multifunction_node::output_ports_type; +}; + +template +struct ports_or_gateway> { + using type = typename tbb::flow::async_node::gateway_type; +}; + +template +using ports_or_gateway_t = typename ports_or_gateway::type; + +template +void put_to_ports_or_gateway(Gateway& gateway, const T& item, Tag&&... tag) { + gateway.try_put(item, std::forward(tag)...); +} + +template +void put_to_ports_or_gateway(std::tuple& ports, const T& item, Tag&&... tag) { + std::get<0>(ports).try_put(item, std::forward(tag)...); +} + +template +void test_multioutput_tag_type() { + static_assert(std::is_same::value, "Unexpected input type"); + using second_arg_type = ports_or_gateway_t; + using tag_type = typename NodeType::tag_type; + + int processed = 0; + + tbb::flow::graph g; + NodeType node(g, tbb::flow::unlimited, + [&](int input, second_arg_type&, tag_type&& tag) { + processed = input; + tag_type tag1; + tag_type tag2(std::move(tag)); + + tag1 = std::move(tag2); + tag = std::move(tag1); + }); + + node.try_put_and_wait(1); + CHECK_MESSAGE(processed == 1, "Body wait not called in try_put_and_wait call"); + g.wait_for_all(); +} + +// TODO: add description +template +void test_multioutput_simple_broadcast() { + static_assert(std::is_same::value, "Unexpected input type"); + tbb::task_arena arena(1); + + using funcnode_type = tbb::flow::function_node; + using second_argument_type = ports_or_gateway_t; + using tag_type = typename NodeType::tag_type; + + arena.execute([&] { + tbb::flow::graph g; + + std::vector processed_items; + std::vector new_work_items; + + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + new_work_items.emplace_back(i); + } + + NodeType* start_node = nullptr; + + NodeType node(g, tbb::flow::unlimited, + [&](int input, second_argument_type& port, tag_type&& tag) { + if (input == wait_message) { + for (int item : new_work_items) { + start_node->try_put(item); + } + } + + // Each even body execution copy-consumes the tag + // each odd execution - move-consumes + static bool copy_consume = true; + + if (copy_consume) { + put_to_ports_or_gateway(port, input, tag); + } else { + put_to_ports_or_gateway(port, input, std::move(tag)); + } + + copy_consume = !copy_consume; + }); + + start_node = &node; + + funcnode_type next_func(g, tbb::flow::unlimited, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(node, next_func); + + bool result = node.try_put_and_wait(wait_message); + CHECK_MESSAGE(result, "unexpected try_put_and_wait result"); + + CHECK(processed_items.size() == 1); + CHECK_MESSAGE(processed_items[0] == wait_message, "Only the wait message should be processed by try_put_and_wait"); + + g.wait_for_all(); + + CHECK(processed_items.size() == new_work_items.size() + 1); + + std::size_t check_index = 1; + for (std::size_t i = new_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items[check_index++] == new_work_items[i - 1], "Unexpected items processing order"); + } + CHECK(check_index == processed_items.size()); + }); +} + +// TODO: add description +template +void test_multioutput_no_broadcast() { + using second_argument_type = ports_or_gateway_t; + using tag_type = typename NodeType::tag_type; + + std::size_t num_items = 10; + std::size_t num_additional_items = 10; + + std::atomic num_processed_items{0}; + std::atomic num_processed_accumulators{0}; + + int accumulator_message = 1; + int add_message = 2; + + tag_type global_tag; + + NodeType* this_node = nullptr; + + std::vector postprocessed_items; + + tbb::flow::graph g; + NodeType node(g, tbb::flow::unlimited, + [&](int input, second_argument_type& port, tag_type&& local_tag) { + if (num_processed_items++ == 0) { + CHECK(input == accumulator_message); + ++num_processed_accumulators; + + global_tag = std::move(local_tag); + for (std::size_t i = 1; i < num_items; ++i) { + this_node->try_put(accumulator_message); + } + for (std::size_t i = 0; i < num_additional_items; ++i) { + this_node->try_put(add_message); + } + } else { + if (input == accumulator_message) { + global_tag.merge(std::move(local_tag)); + if (num_processed_accumulators++ == num_items - 1) { + // The last accumulator was received - "cancel" the operation + global_tag.reset(); + } + } else { + put_to_ports_or_gateway(port, input); + } + } + }); + + this_node = &node; + + tbb::flow::function_node write_node(g, tbb::flow::serial, + [&](int value) noexcept { postprocessed_items.emplace_back(value); return 0; }); + + tbb::flow::make_edge(tbb::flow::output_port<0>(node), write_node); + + node.try_put_and_wait(accumulator_message); + + CHECK_MESSAGE(num_processed_accumulators == num_items, "Unexpected number of accumulators processed"); + + g.wait_for_all(); + + CHECK_MESSAGE(num_processed_items == num_items + num_additional_items, "Unexpected number of items processed"); + CHECK_MESSAGE(postprocessed_items.size() == num_additional_items, "Unexpected number of items written"); + for (auto item : postprocessed_items) { + CHECK_MESSAGE(item == add_message, "Unexpected item written"); + } +} + +// TODO: add test description +template +void test_multioutput_reduction() { + tbb::task_arena arena(1); + + arena.execute([]{ + int num_items = 5; + tbb::flow::graph g; + + using func_node_type = tbb::flow::function_node; + using second_argument_type = ports_or_gateway_t; + using tag_type = typename NodeType::tag_type; + + func_node_type* start_node = nullptr; + + func_node_type start(g, tbb::flow::unlimited, + [&](int i) { + static bool extra_work_added = false; + if (!extra_work_added) { + extra_work_added = true; + for (int j = i + 1; j < i + num_items; ++j) { + start_node->try_put(j); + } + } + return i; + }); + + start_node = &start; + + int num_accumulated = 0; + int accumulated_result = 0; + tag_type accumulated_hint; + + std::vector processed_items; + + NodeType node(g, tbb::flow::unlimited, + [&](int i, second_argument_type& ports, const tag_type& tag) { + ++num_accumulated; + accumulated_result += i; + accumulated_hint.merge(tag); + + if (num_accumulated == num_items) { + put_to_ports_or_gateway(ports, accumulated_result, std::move(accumulated_hint)); + num_accumulated = 0; + } + }); + + tbb::flow::function_node writer(g, tbb::flow::unlimited, + [&](int res) { + // Start extra reduction that should not be handled by try_put_and_wait + static bool extra_loop_added = false; + + if (!extra_loop_added) { + extra_loop_added = true; + for (int i = 100; i < 100 + num_items; ++i) { + node.try_put(i); + } + } + + processed_items.emplace_back(res); + return 0; + }); + + tbb::flow::make_edge(start, node); + tbb::flow::make_edge(node, writer); + + start.try_put_and_wait(1); + + auto first_reduction_result = accumulated_result; + CHECK_MESSAGE(processed_items.size() == 1, "More than one reduction was processed"); + CHECK_MESSAGE(processed_items[0] == first_reduction_result, "Unexpected reduction result"); + + g.wait_for_all(); + + CHECK_MESSAGE(processed_items.size() == 2, "More than one reduction was processed"); + CHECK_MESSAGE(accumulated_result != first_reduction_result, "Unexpected reduction result"); + CHECK_MESSAGE(processed_items[1] == accumulated_result, "Unexpected reduction result"); + }); +} + +template +void test_multioutput() { + test_multioutput_tag_type(); + test_multioutput_simple_broadcast(); + test_multioutput_no_broadcast(); + test_multioutput_reduction(); +} + +} // test_try_put_and_wait + +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +#endif // __TBB_test_tbb_buffering_try_put_and_wait_H