From 0e35744a8b65b74bf4e57c3e49e8c02fc5d57bf4 Mon Sep 17 00:00:00 2001 From: Jakub Szuppe Date: Fri, 26 Aug 2016 23:53:40 +0200 Subject: [PATCH] Add distributed::reduce() algorithm --- include/boost/compute/distributed/reduce.hpp | 359 +++++++++++++++++++ test/CMakeLists.txt | 1 + test/test_distributed_reduce.cpp | 143 ++++++++ 3 files changed, 503 insertions(+) create mode 100644 include/boost/compute/distributed/reduce.hpp create mode 100644 test/test_distributed_reduce.cpp diff --git a/include/boost/compute/distributed/reduce.hpp b/include/boost/compute/distributed/reduce.hpp new file mode 100644 index 000000000..66475fb08 --- /dev/null +++ b/include/boost/compute/distributed/reduce.hpp @@ -0,0 +1,359 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2016 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#ifndef BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP +#define BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP + +#include + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +namespace boost { +namespace compute { +namespace distributed { +namespace detail { + +template +inline ::boost::compute::command_queue& +final_reduce_queue(OutputIterator result, + command_queue &queue, + typename boost::enable_if_c< + !is_device_iterator::value + >::type* = 0) +{ + (void) result; + + ::boost::compute::command_queue& device_queue = queue.get(0); + // CPU device is preferred, however if there is none, the first device + // queue is used + for(size_t i = 0; i < queue.size(); i++) + { + if(queue.get(i).get_device().type() & ::boost::compute::device::cpu) + { + device_queue = queue.get(i); + break; + } + } + return device_queue; +} + +template +inline ::boost::compute::command_queue& +final_reduce_queue(OutputIterator result, + command_queue &queue, + typename boost::enable_if_c< + is_device_iterator::value + >::type* = 0) +{ + // first, find all queues that can be used with result iterator + const ::boost::compute::context& result_context = + result.get_buffer().get_context(); + std::vector compatible_queues; + for(size_t i = 0; i < queue.size(); i++) + { + if(queue.get(i).get_context() == result_context) + { + compatible_queues.push_back(i); + } + } + BOOST_ASSERT_MSG( + compatible_queues.empty(), + "There is no device command queue that can be use to copy to result" + ); + + // then choose device queue from compatible device queues + + // CPU device is preferred, however if there is none, the first + // compatible device queue is used + ::boost::compute::command_queue& device_queue = queue.get(compatible_queues[0]); + for(size_t i = 0; i < compatible_queues.size(); i++) + { + size_t n = compatible_queues[i]; + if(queue.get(n).get_device().type() & ::boost::compute::device::cpu) + { + device_queue = queue.get(n); + break; + } + } + return device_queue; +} + +template< + class InputType, weight_func weight, class Alloc, + class OutputIterator, + class BinaryFunction +> +inline void +dispatch_reduce(const vector &input, + OutputIterator result, + BinaryFunction function, + command_queue &queue) +{ + typedef typename + boost::compute::result_of::type + result_type; + + // find device queue for the final reduction + ::boost::compute::command_queue& device_queue = + final_reduce_queue(result, queue); + + ::boost::compute::buffer parts_results_device( + device_queue.get_context(), input.parts() * sizeof(result_type) + ); + + // if all devices queues are in the same OpenCL context we can + // save part reduction directly into parts_results_device buffer + size_t reduced = 0; + if(queue.one_context()) + { + // reduce each part of input vector + for(size_t i = 0; i < input.parts(); i++) + { + if(input.begin(i) != input.end(i)) { + ::boost::compute::reduce( + input.begin(i), + input.end(i), + ::boost::compute::make_buffer_iterator( + parts_results_device, reduced + ), + function, + queue.get(i) + ); + reduced++; + } + } + } + else + { + // reduce each part of input vector + std::vector parts_results_host(input.parts()); + for(size_t i = 0; i < input.parts(); i++) + { + if(input.begin(i) != input.end(i)) { + ::boost::compute::reduce( + input.begin(i), + input.end(i), + &parts_results_host[reduced], + function, + queue.get(i) + ); + reduced++; + } + } + ::boost::compute::copy_n( + parts_results_host.begin(), + reduced, + ::boost::compute::make_buffer_iterator( + parts_results_device + ), + device_queue + ); + } + // final reduction + ::boost::compute::reduce( + ::boost::compute::make_buffer_iterator( + parts_results_device + ), + ::boost::compute::make_buffer_iterator( + parts_results_device, reduced + ), + result, + function, + device_queue + ); +} + +// special case for when OutputIterator is a host iterator +// and binary operator is plus +template< + class InputType, weight_func weight, class Alloc, + class OutputIterator, + class T +> +inline void +dispatch_reduce(const vector &input, + OutputIterator result, + ::boost::compute::plus function, + command_queue &queue, + typename boost::enable_if_c< + !is_device_iterator::value + >::type* = 0) +{ + // reduce each part of input vector + std::vector parts_results_host(input.parts()); + for(size_t i = 0; i < input.parts(); i++) + { + ::boost::compute::reduce( + input.begin(i), + input.end(i), + &parts_results_host[i], + function, + queue.get(i) + ); + } + + // final reduction + *result = parts_results_host[0]; + for(size_t i = 1; i < input.parts(); i++) + { + *result += static_cast(parts_results_host[i]); + } +} + +// special case for when OutputIterator is a host iterator +// and binary operator is min +template< + class InputType, weight_func weight, class Alloc, + class OutputIterator, + class T +> +inline void +dispatch_reduce(vector &input, + OutputIterator result, + ::boost::compute::min function, + command_queue &queue, + typename boost::enable_if_c< + !is_device_iterator::value + >::type* = 0) +{ + // reduce each part of input vector + std::vector parts_results_host(input.parts()); + for(size_t i = 0; i < input.parts(); i++) + { + ::boost::compute::reduce( + input.begin(i), + input.end(i), + &parts_results_host[i], + function, + queue.get(i) + ); + } + + // final reduction + *result = parts_results_host[0]; + for(size_t i = 1; i < input.parts(); i++) + { + *result = (std::min)(static_cast(*result), parts_results_host[i]); + } +} + +// special case for when OutputIterator is a host iterator +// and binary operator is max +template< + class InputType, weight_func weight, class Alloc, + class OutputIterator, + class T +> +inline void +dispatch_reduce(const vector &input, + OutputIterator result, + ::boost::compute::max function, + command_queue &queue, + typename boost::enable_if_c< + !is_device_iterator::value + >::type* = 0) +{ + // reduce each part of input vector + std::vector parts_results_host(input.parts()); + for(size_t i = 0; i < input.parts(); i++) + { + ::boost::compute::reduce( + input.begin(i), + input.end(i), + &parts_results_host[i], + function, + queue.get(i) + ); + } + + // final reduction + *result = parts_results_host[0]; + for(size_t i = 1; i < input.parts(); i++) + { + *result = (std::max)(static_cast(*result), parts_results_host[i]); + } +} + +} // end detail namespace + +/// Returns the result of applying \p function to the elements in the +/// \p input vector. +/// +/// If no function is specified, \c plus will be used. +/// +/// \param input input vector +/// \param result iterator pointing to the output +/// \param function binary reduction function +/// \param queue distributed command queue to perform the operation +/// +/// Distributed command queue \p queue has to span same set of compute +/// devices as distributed command queue used to create \p input vector. +/// +/// If \p result is a device iterator, its underlying buffer must be allocated +/// in context of at least one device command queue from \p queue. +/// +/// The \c reduce() algorithm assumes that the binary reduction function is +/// associative. When used with non-associative functions the result may +/// be non-deterministic and vary in precision. Notably this affects the +/// \c plus() function as floating-point addition is not associative +/// and may produce slightly different results than a serial algorithm. +/// +/// This algorithm supports both host and device iterators for the +/// result argument. This allows for values to be reduced and copied +/// to the host all with a single function call. +template< + class InputType, weight_func weight, class Alloc, + class OutputIterator, + class BinaryFunction +> +inline void +reduce(const vector &input, + OutputIterator result, + BinaryFunction function, + command_queue &queue) +{ + if(input.empty()) { + return; + } + + detail::dispatch_reduce(input, result, function, queue); +} + +/// \overload +template< + class InputType, weight_func weight, class Alloc, + class OutputIterator +> +inline void +reduce(const vector &input, + OutputIterator result, + command_queue &queue) +{ + return reduce(input, result, ::boost::compute::plus(), queue); +} + +} // end distributed namespace +} // end compute namespace +} // end boost namespace + +#endif /* BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP */ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 655f95f1a..3cd2d2fb7 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -85,6 +85,7 @@ add_compute_test("distributed.context" test_distributed_context.cpp) add_compute_test("distributed.command_queue" test_distributed_command_queue.cpp) add_compute_test("distributed.vector" test_distributed_vector.cpp) add_compute_test("distributed.copy" test_distributed_copy.cpp) +add_compute_test("distributed.reduce" test_distributed_reduce.cpp) add_compute_test("distributed.transform" test_distributed_transform.cpp) add_compute_test("utility.extents" test_extents.cpp) diff --git a/test/test_distributed_reduce.cpp b/test/test_distributed_reduce.cpp new file mode 100644 index 000000000..bfb20b1b9 --- /dev/null +++ b/test/test_distributed_reduce.cpp @@ -0,0 +1,143 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2016 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#define BOOST_TEST_MODULE TestDistributedReduce +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "check_macros.hpp" +#include "context_setup.hpp" + +#include "distributed_check_functions.hpp" +#include "distributed_queue_setup.hpp" + +namespace bc = boost::compute; + +BOOST_AUTO_TEST_CASE(reduce_int_to_host) +{ + // construct distributed::command_queue + bc::distributed::command_queue distributed_queue = + get_distributed_queue(queue); + + bc::int_ data[] = { 5, 1, 9, 17, 13 }; + bc::distributed::vector distributed_vector( + data, data + 5, distributed_queue + ); + distributed_queue.finish(); + + bc::int_ sum; + bc::distributed::reduce( + distributed_vector, + &sum, + bc::plus(), + distributed_queue + ); + BOOST_CHECK_EQUAL(sum, 45); + + bc::int_ product; + bc::distributed::reduce( + distributed_vector, + &product, + bc::multiplies(), + distributed_queue + ); + BOOST_CHECK_EQUAL(product, 9945); + + bc::int_ min_value; + bc::distributed::reduce( + distributed_vector, + &min_value, + bc::min(), + distributed_queue + ); + BOOST_CHECK_EQUAL(min_value, 1); + + bc::int_ max_value; + bc::distributed::reduce( + distributed_vector, + &max_value, + bc::max(), + distributed_queue + ); + BOOST_CHECK_EQUAL(max_value, 17); +} + +BOOST_AUTO_TEST_CASE(reduce_int_to_device) +{ + // construct distributed::command_queue + bc::distributed::command_queue distributed_queue = + get_distributed_queue(queue); + + bc::int_ data[] = { 1, 5, 9, 13, 17 }; + bc::distributed::vector distributed_vector( + data, data + 5, distributed_queue + ); + distributed_queue.finish(); + + bc::vector result1(1, distributed_queue.get_context(0)); + bc::distributed::reduce( + distributed_vector, + result1.begin(), + bc::plus(), + distributed_queue + ); + BOOST_CHECK_EQUAL(result1.begin().read(queue), 45); + + bc::vector result2(1, distributed_queue.get_context(1)); + bc::distributed::reduce( + distributed_vector, + result2.begin(), + bc::multiplies(), + distributed_queue + ); + BOOST_CHECK_EQUAL(result2.begin().read(distributed_queue.get(1)), 9945); +} + +BOOST_AUTO_TEST_CASE(reduce_int_custom_function) +{ + // construct distributed::command_queue + bc::distributed::command_queue distributed_queue = + get_distributed_queue(queue); + + bc::distributed::vector distributed_vector( + size_t(34), bc::int_(2), distributed_queue + ); + distributed_queue.finish(); + + BOOST_COMPUTE_FUNCTION(bc::float_, custom_sum, (bc::int_ x, bc::int_ y), + { + return x + y; + }); + + + bc::float_ sum; + bc::distributed::reduce( + distributed_vector, + &sum, + custom_sum, + distributed_queue + ); + BOOST_CHECK_EQUAL(sum, bc::float_(68)); +} + +BOOST_AUTO_TEST_SUITE_END()