diff --git a/include/kamping/collectives/bcast.hpp b/include/kamping/collectives/bcast.hpp new file mode 100644 index 000000000..485743add --- /dev/null +++ b/include/kamping/collectives/bcast.hpp @@ -0,0 +1,129 @@ +// This file is part of KaMPI.ng. +// +// Copyright 2022 The KaMPI.ng Authors +// +// KaMPI.ng is free software : you can redistribute it and/or modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +// version. KaMPI.ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License +// for more details. +// +// You should have received a copy of the GNU Lesser General Public License along with KaMPI.ng. If not, see +// . + +#pragma once + +#include +#include + +#include "kamping/assertion_levels.hpp" +#include "kamping/checking_casts.hpp" +#include "kamping/comm_helper/is_same_on_all_ranks.hpp" +#include "kamping/communicator.hpp" +#include "kamping/mpi_datatype.hpp" +#include "kamping/mpi_function_wrapper_helpers.hpp" +#include "kamping/named_parameter_selection.hpp" +#include "kamping/parameter_check.hpp" +#include "kamping/parameter_factories.hpp" +#include "kamping/parameter_objects.hpp" +#include "kamping/parameter_type_definitions.hpp" + +/// @brief Wrapper for \c MPI_Bcast +/// +/// This wrapper for \c MPI_Bcast sends data from the root to all other ranks. +/// The following buffer is required: +/// - \ref kamping::send_recv_buf() containing the data that is sent to the other ranks. Non-root ranks must allocate +/// and provide this buffer as it's needed for deducing the value type. The container will be resized on non-root ranks +/// to fit exactly the received data. +/// The following parameter is optional but causes additional communication if not present. +/// - \ref kamping::recv_count() specifying how many elements are broadcasted. If not specified, will be +/// communicated through an additional bcast. If not specified, we broadcast the whole send_recv_buf. If specified, +/// has to be the same on all ranks (including the root). Has to either be specified or not specified on all ranks. The +/// following parameter is optional: +/// - \ref kamping::root() specifying an alternative root. If not present, the default root of the \c +/// Communicator is used, see root(). +/// @todo Add support for `bcast(..)` style deduction of send_recv_buf's type on non-root ranks. +/// @todo Add support for unnamed first parameter send_recv_buf. +/// @tparam Args Automatically deducted template parameters. +/// @param args All required and any number of the optional buffers described above. +/// @return Result type wrapping the output buffer if not specified as input parameter. +template +auto kamping::Communicator::bcast(Args... args) const { + using namespace ::kamping::internal; + KAMPING_CHECK_PARAMETERS( + Args, KAMPING_REQUIRED_PARAMETERS(send_recv_buf), KAMPING_OPTIONAL_PARAMETERS(root, recv_count)); + + // Get the root PE + auto&& root = select_parameter_type_or_default(std::tuple(this->root()), args...); + KASSERT(this->is_valid_rank(root.rank()), "Invalid rank as root.", assert::light); + + // Get the send_recv_buf; for now, the user *has* to provide a send-receive buffer. + auto&& send_recv_buf = internal::select_parameter_type(args...); + using value_type = typename std::remove_reference_t::value_type; + static_assert(!std::is_const_v, "Const send_recv_buf'fers are not allowed."); + auto mpi_value_type = mpi_datatype(); + + /// @todo Uncomment, once the send_recv_buf is optional. + // if (this->is_root(root.rank())) { + // KASSERT(has_user_provided_send_recv_buf, "The send_recv_buf is mandatory at the root.", assert::light); + // } + + // Get the optional recv_count parameter. If the parameter is not given, allocate a new container. + auto&& recv_count_param = internal::select_parameter_type_or_default< + ParameterType::recv_count, LibAllocatedSingleElementBuffer>( + std::tuple(), args...); + + constexpr bool recv_count_is_output_parameter = has_to_be_computed; + KASSERT( + is_same_on_all_ranks(recv_count_is_output_parameter), + "recv_count() parameter is an output parameter on some PEs, but not on alle PEs.", assert::light_communication); + + // If it is not user provided, broadcast the size of send_recv_buf from the root to all ranks. + int recv_count = recv_count_param.get_single_element(); + if constexpr (recv_count_is_output_parameter) { + if (this->is_root(root.rank())) { + recv_count = asserting_cast(send_recv_buf.size()); + } + // Transfer the recv_count + // This error code is unused if KTHROW is removed at compile time. + /// @todo Use bcast_single for this. + [[maybe_unused]] int err = MPI_Bcast( + &recv_count, // buffer + 1, // count + mpi_datatype(), // datatype + root.rank_signed(), // root + this->mpi_communicator() // comm + ); + THROW_IF_MPI_ERROR(err, MPI_Bcast); + + // Output the recv count via the output_parameter + *recv_count_param.data() = recv_count; + } + if (this->is_root(root.rank())) { + KASSERT( + asserting_cast(recv_count) == send_recv_buf.size(), + "If a recv_count() is provided on the root rank, it has to be equal to the number of elements in the " + "send_recv_buf. For partial transfers, use a kamping::Span."); + } + KASSERT( + this->is_same_on_all_ranks(recv_count), "The recv_count must be equal on all ranks.", + assert::light_communication); + + // Resize my send_recv_buf to be able to hold all received data. + // Trying to resize a single element buffer to something other than 1 will throw an error. + send_recv_buf.resize(asserting_cast(recv_count)); + + // Perform the broadcast. The error code is unused if KTHROW is removed at compile time. + [[maybe_unused]] int err = MPI_Bcast( + send_recv_buf.data(), // buffer + asserting_cast(send_recv_buf.size()), // count + mpi_value_type, // datatype + root.rank_signed(), // root + this->mpi_communicator() // comm + ); + THROW_IF_MPI_ERROR(err, MPI_Bcast); + + return MPIResult( + std::move(send_recv_buf), BufferCategoryNotUsed{}, std::move(recv_count_param), BufferCategoryNotUsed{}, + BufferCategoryNotUsed{}); +} // namespace kamping::internal diff --git a/include/kamping/collectives/reduce.hpp b/include/kamping/collectives/reduce.hpp index 39cc87da8..86b8efc88 100644 --- a/include/kamping/collectives/reduce.hpp +++ b/include/kamping/collectives/reduce.hpp @@ -67,7 +67,8 @@ auto kamping::Communicator::reduce(Args... args) const { using recv_value_type = typename std::remove_reference_t::value_type; auto& operation_param = internal::select_parameter_type(args...); - auto operation = operation_param.template build_operation(); + // If you want to understand the syntax of the following line, ignore the "template " ;-) + auto operation = operation_param.template build_operation(); // Check parameters static_assert( diff --git a/include/kamping/communicator.hpp b/include/kamping/communicator.hpp index 595228764..d1d7c614d 100644 --- a/include/kamping/communicator.hpp +++ b/include/kamping/communicator.hpp @@ -209,6 +209,9 @@ class Communicator { template auto gather(Args... args) const; + template + auto bcast(Args... args) const; + template void barrier(Args... args) const; diff --git a/include/kamping/parameter_objects.hpp b/include/kamping/parameter_objects.hpp index b1312e33f..205737d24 100644 --- a/include/kamping/parameter_objects.hpp +++ b/include/kamping/parameter_objects.hpp @@ -190,7 +190,9 @@ class DataBuffer { KASSERT(!is_extracted, "Cannot resize a buffer that has already been extracted.", assert::normal); #endif if constexpr (is_single_element) { - KASSERT(size == 1u, "Single element buffers must hold exactly one element."); + KASSERT( + size == 1u, "Cannot resize a single element buffer to hold zero or more than one element. Single " + "element buffers always hold exactly one element."); } else if constexpr (std::is_same_v>) { KASSERT(this->size() >= size, "Span cannot be resized and is smaller than the requested size."); } else { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4f2944457..885fade9f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -30,6 +30,7 @@ kamping_register_mpi_test(test_mpi_communicator FILES mpi_communicator_test.cpp kamping_register_mpi_test(test_mpi_datatype FILES mpi_datatype_test.cpp CORES 1) kamping_register_mpi_test(test_mpi_gather FILES collectives/mpi_gather_test.cpp CORES 1 4) kamping_register_mpi_test(test_mpi_reduce FILES collectives/mpi_reduce_test.cpp CORES 1 4) +kamping_register_mpi_test(test_mpi_bcast FILES collectives/mpi_bcast_test.cpp CORES 1 4) kamping_register_mpi_test(test_mpi_allreduce FILES collectives/mpi_allreduce_test.cpp CORES 1 4) kamping_register_mpi_test(test_operation_wrapper FILES mpi_operation_wrapper_test.cpp CORES 1) kamping_register_mpi_test(test_parameter_factories_mpi FILES parameter_factories_mpi_test.cpp CORES 1) @@ -94,3 +95,4 @@ kamping_register_compilation_failure_test( "GET_SINGLE_ELEMENT_ON_VECTOR" LIBRARIES kamping_base ) + diff --git a/tests/collectives/mpi_bcast_test.cpp b/tests/collectives/mpi_bcast_test.cpp new file mode 100644 index 000000000..e33c66247 --- /dev/null +++ b/tests/collectives/mpi_bcast_test.cpp @@ -0,0 +1,317 @@ +// This file is part of KaMPI.ng. +// +// Copyright 2022 The KaMPI.ng Authors +// +// KaMPI.ng is free software : you can redistribute it and/or modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +// version. KaMPI.ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License +// for more details. +// +// You should have received a copy of the GNU Lesser General Public License along with KaMPI.ng. If not, see +// . + +#include "../test_assertions.hpp" + +#include + +#include +#include + +#include "../helpers_for_testing.hpp" +#include "kamping/collectives/bcast.hpp" +#include "kamping/communicator.hpp" +#include "kamping/parameter_factories.hpp" + +using namespace ::kamping; +using namespace ::testing; + +TEST(BcastTest, single_element) { + Communicator comm; + + // Basic use case, broadcast a single POD. + size_t value = comm.rank(); + comm.bcast(send_recv_buf(value)); + EXPECT_EQ(value, comm.root()); + + // TODO Using the unnamed first parameter. + // value++; + // comm.bcast(value); + // EXPECT_EQ(value, comm.root() + 1); + + // Broadcast a single POD to all processes, manually specify the root process. + assert(comm.size() > 0); + const size_t root = comm.size() - 1; + value = comm.rank(); + comm.bcast(send_recv_buf(value), kamping::root(root)); + EXPECT_EQ(value, root); + + // Broadcast a single POD to all processes, use a non-default communicator's root. + value = comm.rank(); + comm.root(root); + ASSERT_EQ(root, comm.root()); + comm.bcast(send_recv_buf(value)); + EXPECT_EQ(value, root); + + // Broadcast a single POD to all processes, manually specify the recv_count. + value = comm.rank(); + /// @todo Uncomment, once EXPECT_KASSERT_FAILS supports KASSERTs which fail only on some ranks. + // EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(value), recv_count(0)), ""); + comm.bcast(send_recv_buf(value), recv_count(1)); + EXPECT_EQ(value, root); + /// @todo Uncomment, once EXPECT_KASSERT_FAILS supports KASSERTs which fail only on some ranks. + // EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(value), recv_count(2)), ""); +} + +TEST(Bcasttest, vector_partial_transfer) { + Communicator comm; + + std::vector values(5); + int num_transferred_values = 3; + std::iota(values.begin(), values.end(), comm.rank() * 10); + kamping::Span transfer_view(values.data(), asserting_cast(num_transferred_values)); + + comm.bcast(send_recv_buf(transfer_view)); + EXPECT_EQ(values.size(), 5); + EXPECT_THAT(values, ElementsAre(0, 1, 2, comm.rank() * 10 + 3, comm.rank() * 10 + 4)); + + std::iota(values.begin(), values.end(), comm.rank() * 10); + comm.bcast(send_recv_buf(transfer_view), recv_count(num_transferred_values)); + EXPECT_EQ(values.size(), 5); + EXPECT_THAT(values, ElementsAre(0, 1, 2, comm.rank() * 10 + 3, comm.rank() * 10 + 4)); + + std::iota(values.begin(), values.end(), comm.rank() * 10); + num_transferred_values = -1; + comm.bcast(send_recv_buf(transfer_view), recv_count_out(num_transferred_values)); + EXPECT_EQ(values.size(), 5); + EXPECT_EQ(num_transferred_values, 3); + EXPECT_THAT(values, ElementsAre(0, 1, 2, comm.rank() * 10 + 3, comm.rank() * 10 + 4)); +} + +TEST(BcastTest, vector_recv_count) { + Communicator comm; + + { // All ranks provide the same recv_count. + const size_t num_values = 4; + + std::vector values(num_values); + if (comm.is_root()) { + std::fill(values.begin(), values.end(), comm.rank()); + } + + comm.bcast(send_recv_buf(values), recv_count(num_values)); + EXPECT_EQ(values.size(), num_values); + EXPECT_THAT(values, Each(Eq(comm.root()))); + } + + if (comm.size() > 1) { + { // Some ranks provide a recv_count, some don't. + const size_t num_values = 4; + + std::vector values(num_values); + if (comm.is_root()) { + EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(values), recv_count(num_values)), ""); + } else { + EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(values)), ""); + } + } + + { // All ranks provide a recv_count, but they differ. + const size_t num_values = 4; + [[maybe_unused]] const size_t alternative_num_values = 3; + + std::vector values(num_values); + if (comm.is_root()) { + EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(values), recv_count(num_values)), ""); + } else { + EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(values), recv_count(alternative_num_values)), ""); + } + } + } +} + +TEST(BcastTest, vector_recv_count_not_equal_to_vector_size) { + Communicator comm; + + /// @todo Uncomment, once EXPECT_KASSERT_FAILS supports KASSERTs which fail only on some ranks. + // { // recv count < vector size + // const size_t num_values = 4; + // const int num_transferred_values = num_values - 1; + + // std::vector values(num_values); + // EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(values), recv_count(num_transferred_values)), ""); + // } + + /// @todo Uncomment, once EXPECT_KASSERT_FAILS supports KASSERTs which fail only on some ranks. + // { // recv count > vector size + // const size_t num_values = 4; + // const int num_transferred_values = num_values + 1; + + // std::vector values(num_values); + // EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(values), recv_count(num_transferred_values)), ""); + // } +} + +TEST(BcastTest, vector_no_recv_count) { + Communicator comm; + + { // All send_recv_bufs are already large enough. + std::vector values(4); + if (comm.is_root()) { + std::fill(values.begin(), values.end(), comm.rank()); + } + + comm.bcast(send_recv_buf(values)); + EXPECT_EQ(values.size(), 4); + EXPECT_THAT(values, Each(Eq(comm.root()))); + } + + { // Some send_recv_bufs need to be resized. + std::vector values; + if (comm.is_root()) { + values.resize(100); + std::fill(values.begin(), values.end(), comm.rank()); + } else { + values.resize(0); + } + + comm.bcast(send_recv_buf(values)); + EXPECT_EQ(values.size(), 100); + EXPECT_THAT(values, Each(Eq(comm.root()))); + } + + { // All send_recv_bufs are of different size + comm.root(0); + std::vector values; + + if (comm.is_root()) { + values.resize(43); + std::fill(values.begin(), values.end(), comm.rank()); + } else { + values.resize(comm.rank()); + std::fill(values.begin(), values.end(), comm.rank()); + } + + comm.bcast(send_recv_buf(values)); + EXPECT_EQ(values.size(), 43); + EXPECT_THAT(values, Each(Eq(comm.root()))); + } +} + +TEST(BcastTest, vector_recv_count_as_out_parameter) { + Communicator comm; + + { // All send_recv_bufs are already large enough. + std::vector values(4); + if (comm.is_root()) { + std::fill(values.begin(), values.end(), comm.rank()); + } + + int num_elements_received = -1; + comm.bcast(send_recv_buf(values), recv_count_out(num_elements_received)); + EXPECT_EQ(values.size(), 4); + EXPECT_EQ(num_elements_received, values.size()); + EXPECT_THAT(values, Each(Eq(comm.root()))); + } + + { // Some send_recv_bufs need to be resized. + std::vector values; + if (comm.is_root()) { + values.resize(100); + std::fill(values.begin(), values.end(), comm.rank()); + } else { + values.resize(0); + } + + int num_elements_received = -1; + comm.bcast(send_recv_buf(values), recv_count_out(num_elements_received)); + EXPECT_EQ(values.size(), 100); + EXPECT_EQ(num_elements_received, values.size()); + EXPECT_THAT(values, Each(Eq(comm.root()))); + } + + { // All send_recv_bufs are of different size. + comm.root(0); + std::vector values; + + if (comm.is_root()) { + values.resize(43); + std::fill(values.begin(), values.end(), comm.rank()); + } else { + values.resize(comm.rank()); + std::fill(values.begin(), values.end(), comm.rank()); + } + + int num_elements_received = -1; + comm.bcast(send_recv_buf(values), recv_count_out(num_elements_received)); + EXPECT_EQ(values.size(), 43); + EXPECT_EQ(num_elements_received, values.size()); + EXPECT_THAT(values, Each(Eq(comm.root()))); + } + + if (comm.size() > 1) { + { // Root rank provides recv_count, the other ranks need request as an out parameter. + comm.root(0); + std::vector values(0); + int num_elements = 43; + + if (comm.is_root()) { + values.resize(asserting_cast(num_elements)); + EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(values), recv_count(num_elements)), ""); + } else { + values.resize(comm.rank()); + [[maybe_unused]] int num_elements_received = -1; + EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(values), recv_count_out(num_elements_received)), ""); + } + } + } + + { // + comm.root(0); + std::vector values(0); + int num_elements = 43; + + if (comm.is_root()) { + values.resize(asserting_cast(num_elements)); + std::fill(values.begin(), values.end(), comm.rank()); + auto result = comm.bcast(send_recv_buf(values)); + EXPECT_EQ(result.extract_recv_count(), num_elements); + } else { + values.resize(comm.rank()); + int num_elements_received = -1; + comm.bcast(send_recv_buf(values), recv_count_out(num_elements_received)); + EXPECT_EQ(num_elements, num_elements_received); + EXPECT_EQ(num_elements_received, values.size()); + } + + EXPECT_EQ(values.size(), num_elements); + EXPECT_THAT(values, Each(Eq(comm.root()))); + } +} + +TEST(BcastTest, vector_needs_resizing_and_counts_are_given) { + Communicator comm; + + size_t num_values = 10; + + std::vector values; + if (comm.is_root()) { + values.resize(num_values); + std::fill(values.begin(), values.end(), comm.rank()); + } + comm.bcast(send_recv_buf(values), recv_count(asserting_cast(num_values))); + EXPECT_EQ(values.size(), num_values); + EXPECT_THAT(values, Each(Eq(comm.root()))); +} + +TEST(BcastTest, message_of_size_0) { + Communicator comm; + + std::vector values(0); + EXPECT_NO_THROW(comm.bcast(send_recv_buf(values))); + EXPECT_EQ(values.size(), 0); + + values.resize(1); + /// @todo Uncomment, once EXPECT_KASSERT_FAILS supports KASSERTs which fail only on some ranks. + // EXPECT_KASSERT_FAILS(comm.bcast(send_recv_buf(values), recv_count(0)), ""); +} diff --git a/tests/parameter_objects_test.cpp b/tests/parameter_objects_test.cpp index a61e13848..063e38429 100644 --- a/tests/parameter_objects_test.cpp +++ b/tests/parameter_objects_test.cpp @@ -369,9 +369,14 @@ TEST(SingleElementModifiableBufferTest, get_basics) { int_buffer.resize(1); EXPECT_EQ(int_buffer.size(), 1); #if KASSERT_ASSERTION_LEVEL >= KAMPING_ASSERTION_LEVEL_NORMAL - EXPECT_DEATH(int_buffer.resize(0), "Single element buffers must hold exactly one element."); - EXPECT_DEATH(int_buffer.resize(2), "Single element buffers must hold exactly one element."); + EXPECT_DEATH( + int_buffer.resize(0), "Cannot resize a single element buffer to hold zero or more than one element. Single " + "element buffers always hold exactly one element."); + EXPECT_DEATH( + int_buffer.resize(2), "Cannot resize a single element buffer to hold zero or more than one element. Single " + "element buffers always hold exactly one element."); #endif + EXPECT_EQ(int_buffer.get().size(), 1); EXPECT_EQ(*(int_buffer.get().data()), 5); EXPECT_EQ(*(int_buffer.data()), 5); @@ -406,8 +411,12 @@ TEST(LibAllocatedSingleElementBufferTest, get_basics) { int_buffer.resize(1); EXPECT_EQ(int_buffer.size(), 1); #if KASSERT_ASSERTION_LEVEL >= KAMPING_ASSERTION_LEVEL_NORMAL - EXPECT_DEATH(int_buffer.resize(0), "Single element buffers must hold exactly one element."); - EXPECT_DEATH(int_buffer.resize(2), "Single element buffers must hold exactly one element."); + EXPECT_DEATH( + int_buffer.resize(0), "Cannot resize a single element buffer to hold zero or more than one element. Single " + "element buffers always hold exactly one element."); + EXPECT_DEATH( + int_buffer.resize(2), "Cannot resize a single element buffer to hold zero or more than one element. Single " + "element buffers always hold exactly one element."); #endif EXPECT_EQ(int_buffer.get().size(), 1); EXPECT_EQ(*(int_buffer.get().data()), 5);