diff --git a/examples/collection/CMakeLists.txt b/examples/collection/CMakeLists.txt index edbd11d64e..f9f776b46d 100644 --- a/examples/collection/CMakeLists.txt +++ b/examples/collection/CMakeLists.txt @@ -3,7 +3,9 @@ set( COLLECTION_EXAMPLES lb_iter jacobi1d_vt + jacobi1d_vt_sync jacobi2d_vt + jacobi2d_vt_sync migrate_collection polymorphic_collection insertable_collection diff --git a/examples/collection/jacobi1d_vt.cc b/examples/collection/jacobi1d_vt.cc index 3a10699029..14f106f150 100644 --- a/examples/collection/jacobi1d_vt.cc +++ b/examples/collection/jacobi1d_vt.cc @@ -62,6 +62,7 @@ #include #include +#include #include #include @@ -69,23 +70,18 @@ static constexpr std::size_t const default_nrow_object = 8; static constexpr std::size_t const default_num_objs = 4; +static constexpr std::size_t const check_conv_freq = 10; static constexpr double const default_tol = 1.0e-02; -struct NodeObj { - bool is_finished_ = false; - void workFinishedHandler() { is_finished_ = true; } - bool isWorkFinished() { return is_finished_; } -}; +struct NodeObj; + using NodeObjProxy = vt::objgroup::proxy::Proxy; struct LinearPb1DJacobi : vt::Collection { - private: - std::vector tcur_, told_; std::vector rhs_; size_t iter_ = 0; - size_t msgReceived_ = 0, totalReceive_ = 0; size_t numObjs_ = 1; size_t numRowsPerObject_ = 1; size_t maxIter_ = 8; @@ -95,177 +91,74 @@ struct LinearPb1DJacobi : vt::Collection { explicit LinearPb1DJacobi() : tcur_(), told_(), rhs_(), iter_(0), - msgReceived_(0), totalReceive_(0), numObjs_(1), numRowsPerObject_(1), maxIter_(8) { } - - using BlankMsg = vt::CollectionMessage; - - struct LPMsg : vt::CollectionMessage { - - size_t numObjects = 0; - size_t nRowPerObject = 0; - size_t iterMax = 0; - NodeObjProxy objProxy; - - LPMsg() = default; - - LPMsg(const size_t nobjs, const size_t nrow, const size_t itMax, NodeObjProxy proxy) : - numObjects(nobjs), nRowPerObject(nrow), iterMax(itMax), objProxy(proxy) - { } - - }; - - void checkCompleteCB(double normRes) { - // - // Only one object for the reduction will visit - // this function - // - - auto const iter_max_reached = iter_ > maxIter_; - auto const norm_res_done = normRes < default_tol; - - if (iter_max_reached or norm_res_done) { - auto const to_print = iter_max_reached ? - "\n Maximum Number of Iterations Reached. \n\n" : - fmt::format("\n Max-Norm Residual Reduced by {} \n\n", default_tol); - - fmt::print(to_print); - - // Notify all nodes that computation is finished - objProxy_.broadcast<&NodeObj::workFinishedHandler>(); - } else { - fmt::print(" ## ITER {} >> Residual Norm = {} \n", iter_, normRes); - } - } - - void doIteration() { - + void kernel() { iter_ += 1; - // - //--- Copy extremal values - // + // Copy extremal values tcur_[0] = told_[0]; tcur_[numRowsPerObject_+1] = told_[numRowsPerObject_+1]; - // - //---- Jacobi iteration step - //---- A tridiagonal matrix = "tridiag" ( [-1.0 2.0 -1.0] ) - //---- rhs_ right hand side vector + // Jacobi iteration step + // A tridiagonal matrix = "tridiag" ( [-1.0 2.0 -1.0] ) + // rhs_ right hand side vector // for (size_t ii = 1; ii <= numRowsPerObject_; ++ii) { tcur_[ii] = 0.5*(rhs_[ii] + told_[ii-1] + told_[ii+1]); } std::copy(tcur_.begin(), tcur_.end(), told_.begin()); + } - // - // Compute the maximum entries among the rows on this object - // We do not take into account the "ghost" entries - // as they may be "out of date". - // - + double computeMaxNorm() { + // Compute the maximum entries among the rows on this object We do not take + // into account the "ghost" entries as they may be "out of date". double maxNorm = 0.0; - for (size_t ii = 1; ii < tcur_.size()-1; ++ii) { double val = tcur_[ii]; maxNorm = (maxNorm > std::fabs(val)) ? maxNorm : std::fabs(val); } + return maxNorm; + } + void reduceMaxNorm(vt::Callback cb) { auto proxy = this->getCollectionProxy(); - proxy.reduce<&LinearPb1DJacobi::checkCompleteCB, vt::collective::MaxOp>( - proxy[0], maxNorm - ); - + proxy.reduce(cb, computeMaxNorm()); } - - struct VecMsg : vt::CollectionMessage { - using MessageParentType = vt::CollectionMessage; - vt_msg_serialize_if_needed_by_parent_or_type1(vt::IdxBase); - - VecMsg() = default; - - VecMsg(vt::IdxBase const& in_index, double const& ref) : - vt::CollectionMessage(), - from_index(in_index), val(ref) - { } - - template - void serialize(Serializer& s) { - MessageParentType::serialize(s); - s | from_index; - s | val; - } - - vt::IdxBase from_index = 0; - double val = 0.0; - }; - - void exchange(VecMsg *msg) { - // Receive and treat the message from a neighboring object. - - const vt::IdxBase myIdx = getIndex().x(); - - if (myIdx > msg->from_index) { - this->told_[0] = msg->val; - msgReceived_ += 1; - } - - if (myIdx < msg->from_index) { - this->told_[numRowsPerObject_ + 1] = msg->val; - msgReceived_ += 1; - } - - // Check whether this 'object' has received all the expected messages. - if (msgReceived_ == totalReceive_) { - msgReceived_ = 0; - doIteration(); + void sendLeft() { + // Send the values to the left + auto proxy = this->getCollectionProxy(); + auto idx = this->getIndex(); + if (idx.x() > 0) { + proxy[idx.x() - 1].send<&LinearPb1DJacobi::exchange>(idx.x(), told_[1], iter_); } - } - void doIter(BlankMsg *msg) { - - // - // Treat the particular case of 1 object - // where no communication is needed. - // Without this treatment, the code would not iterate. - // - - if (numObjs_ == 1) { - doIteration(); - return; - } - //--------------------------------------- - - // - // Routine to send information to a different object - // - - vt::IdxBase const myIdx = getIndex().x(); - - //--- Send the values to the left + void sendRight() { + // Send the values to the right auto proxy = this->getCollectionProxy(); - if (myIdx > 0) { - proxy[myIdx - 1].send( - myIdx, told_[1] - ); + auto idx = this->getIndex(); + if (size_t(idx.x()) < numObjs_ - 1) { + proxy[idx.x() + 1].send<&LinearPb1DJacobi::exchange>(idx.x(), told_[numRowsPerObject_], iter_); } + } - //--- Send values to the right - if (size_t(myIdx) < numObjs_ - 1) { - proxy[myIdx + 1].send( - myIdx, told_[numRowsPerObject_] - ); + void exchange(vt::IdxBase from_index, double val, std::size_t in_iter) { + // Receive and treat the message from a neighboring object. + auto idx = this->getIndex(); + vtAssert(iter_ == in_iter, "iters should match"); + if (idx.x() > from_index) { + told_[0] = val; + } + if (idx.x() < from_index) { + told_[numRowsPerObject_ + 1] = val; } } - - void init() { - + void initializeData() { tcur_.assign(numRowsPerObject_ + 2, 0.0); told_.assign(numRowsPerObject_ + 2, 0.0); rhs_.assign(numRowsPerObject_ + 2, 0.0); @@ -280,39 +173,135 @@ struct LinearPb1DJacobi : vt::Collection { tcur_[ii] = sin(nf * M_PI * x0 * x0); } - totalReceive_ = 2; - if (myIdx == 0) { tcur_[0] = 0.0; - totalReceive_ -= 1; } if (myIdx == numObjs_ - 1) { tcur_[numRowsPerObject_+1] = 0.0; - totalReceive_ -= 1; } std::copy(tcur_.begin(), tcur_.end(), told_.begin()); - } - void init(LPMsg* msg) { - numObjs_ = msg->numObjects; - numRowsPerObject_ = msg->nRowPerObject; - maxIter_ = msg->iterMax; - objProxy_ = msg->objProxy; + void init(size_t numObjects, size_t nRowPerObject, size_t iterMax, NodeObjProxy objProxy) { + numObjs_ = numObjects; + numRowsPerObject_ = nRowPerObject; + maxIter_ = iterMax; + objProxy_ = objProxy; // Initialize the starting vector - init(); + initializeData(); } }; -bool isWorkDone( vt::objgroup::proxy::Proxy const& proxy){ - auto const this_node = vt::theContext()->getNode(); - return proxy[this_node].invoke<&NodeObj::isWorkFinished>(); -} +struct NodeObj { + using ChainSetType = vt::messaging::CollectionChainSet; + + NodeObj( + vt::CollectionProxy in_proxy, std::size_t in_num_objs, + std::size_t in_max_iter + ) : proxy_(in_proxy), + num_objs_(in_num_objs), + max_iter_(in_max_iter) + { } + + void reducedNorm(double normRes) { + converged_ = normRes < default_tol; + if (vt::theContext()->getNode() == 0) { + fmt::print( + "## ITER {} >> Residual Norm = {}, conv={} \n", + cur_iter_, normRes, converged_ + ); + } + } + + void setup(vt::objgroup::proxy::Proxy in_proxy) { + chains_ = std::make_unique(proxy_); + this_proxy_ = in_proxy; + } + + void runToConvergence() { + vt::task::TaskCollective* prev_kernel = nullptr; + + auto iteration = chains_->createTaskRegion([&]{ + auto xl = chains_->taskCollective("exchange left", [&](auto idx, auto t) { + if (prev_kernel) { + t->dependsOn(idx, prev_kernel); + + if (idx.x() != 0) { + auto left = vt::Index1D(idx.x() - 1); + t->dependsOn(left, prev_kernel); + } + } + return proxy_[idx].template send<&LinearPb1DJacobi::sendLeft>(); + }); + + auto xr = chains_->taskCollective("exchange right", [&](auto idx, auto t) { + if (prev_kernel) { + t->dependsOn(idx, prev_kernel); + + if (static_cast(idx.x()) != num_objs_ - 1) { + auto right = vt::Index1D(idx.x() + 1); + t->dependsOn(right, prev_kernel); + } + } + return proxy_[idx].template send<&LinearPb1DJacobi::sendRight>(); + }); + + prev_kernel = chains_->taskCollective("kernel", [&](auto idx, auto t) { + if (idx.x() != 0) { + auto left = vt::Index1D(idx.x() - 1); + t->dependsOn(left, xr); + } + if (static_cast(idx.x()) != num_objs_ - 1) { + auto right = vt::Index1D(idx.x() + 1); + t->dependsOn(right, xl); + } + return proxy_[idx].template send<&LinearPb1DJacobi::kernel>(); + }); + }); + + while (not converged_ and cur_iter_ < max_iter_) { + iteration->enqueueTasks(); + + if (cur_iter_++ % check_conv_freq == 0) { + chains_->taskCollective("checkConv", [&](auto idx, auto t) { + t->dependsOn(idx, prev_kernel); + auto cb = vt::theCB()->makeBcast<&NodeObj::reducedNorm>(this_proxy_); + return proxy_[idx].template send<&LinearPb1DJacobi::reduceMaxNorm>(cb); + }); + + iteration->waitCollective(); + vt::thePhase()->nextPhaseCollective(); + } + } + + iteration->waitCollective(); + + if (vt::theContext()->getNode() == 0) { + if (not converged_) { + fmt::print("Maximum Number of Iterations Reached without convergence.\n"); + } else { + fmt::print("Convergence is reached at iteration {}.\n", cur_iter_); + } + } + + chains_->phaseDone(); + chains_ = nullptr; + } + +private: + vt::CollectionProxy proxy_; + std::size_t num_objs_ = 0; + std::size_t max_iter_ = 0; + std::size_t cur_iter_ = 0; + std::unique_ptr chains_; + bool converged_ = false; + vt::objgroup::proxy::Proxy this_proxy_; +}; int main(int argc, char** argv) { @@ -354,34 +343,29 @@ int main(int argc, char** argv) { return 1; } - // Object group of all nodes that take part in computation - // Used to determine whether the computation is finished - auto grp_proxy = vt::theObjGroup()->makeCollective("examples_jacobi1d"); - // Create the decomposition into objects using BaseIndexType = typename vt::Index1D::DenseIndexType; auto range = vt::Index1D(static_cast(num_objs)); - auto col_proxy = vt::makeCollection("examples_jacobi1d") + auto col_proxy = vt::makeCollection("jacobi1d") .bounds(range) .bulkInsert() .wait(); - vt::runInEpochCollective([col_proxy, grp_proxy, num_objs, numRowsPerObject, maxIter]{ - col_proxy.broadcastCollective( + // Object group of all nodes that take part in computation + // Used to determine whether the computation is finished + auto grp_proxy = vt::theObjGroup()->makeCollective( + "jacobi1d", col_proxy, num_objs, maxIter + ); + + vt::runInEpochCollective([=]{ + col_proxy.broadcastCollective<&LinearPb1DJacobi::init>( num_objs, numRowsPerObject, maxIter, grp_proxy ); }); - while(!isWorkDone(grp_proxy)){ - vt::runInEpochCollective([col_proxy]{ - col_proxy.broadcastCollective< - LinearPb1DJacobi::BlankMsg, &LinearPb1DJacobi::doIter - >(); - }); - - vt::thePhase()->nextPhaseCollective(); - } + grp_proxy.get()->setup(grp_proxy); + grp_proxy.get()->runToConvergence(); vt::finalize(); diff --git a/examples/collection/jacobi1d_vt_sync.cc b/examples/collection/jacobi1d_vt_sync.cc new file mode 100644 index 0000000000..3a10699029 --- /dev/null +++ b/examples/collection/jacobi1d_vt_sync.cc @@ -0,0 +1,391 @@ +/* +//@HEADER +// ***************************************************************************** +// +// jacobi1d_vt.cc +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +/// [Jacobi1D example] + +// +// This code applies a few steps of the Jacobi iteration to +// the linear system A x = 0 +// where is the tridiagonal matrix with pattern [-1 2 -1] +// The initial guess for x is a made-up non-zero vector. +// The exact solution is the vector 0. +// +// The matrix A is square and invertible. +// The number of rows is ((number of objects) * (number of rows per object)) +// +// Such a matrix A is obtained when using 2nd-order finite difference +// for discretizing (-d^2 u /dx^2 = f) on [0, 1] with homogeneous +// Dirichlet condition (u(0) = u(1) = 0) using a uniform grid +// with grid size 1 / ((number of objects) * (number of rows per object) + 1) +// + + +#include +#include + +#include +#include +#include + +static constexpr std::size_t const default_nrow_object = 8; +static constexpr std::size_t const default_num_objs = 4; +static constexpr double const default_tol = 1.0e-02; + +struct NodeObj { + bool is_finished_ = false; + void workFinishedHandler() { is_finished_ = true; } + bool isWorkFinished() { return is_finished_; } +}; +using NodeObjProxy = vt::objgroup::proxy::Proxy; + +struct LinearPb1DJacobi : vt::Collection { + +private: + + std::vector tcur_, told_; + std::vector rhs_; + size_t iter_ = 0; + size_t msgReceived_ = 0, totalReceive_ = 0; + size_t numObjs_ = 1; + size_t numRowsPerObject_ = 1; + size_t maxIter_ = 8; + NodeObjProxy objProxy_; + +public: + + explicit LinearPb1DJacobi() : + tcur_(), told_(), rhs_(), iter_(0), + msgReceived_(0), totalReceive_(0), + numObjs_(1), numRowsPerObject_(1), maxIter_(8) + { } + + + using BlankMsg = vt::CollectionMessage; + + struct LPMsg : vt::CollectionMessage { + + size_t numObjects = 0; + size_t nRowPerObject = 0; + size_t iterMax = 0; + NodeObjProxy objProxy; + + LPMsg() = default; + + LPMsg(const size_t nobjs, const size_t nrow, const size_t itMax, NodeObjProxy proxy) : + numObjects(nobjs), nRowPerObject(nrow), iterMax(itMax), objProxy(proxy) + { } + + }; + + void checkCompleteCB(double normRes) { + // + // Only one object for the reduction will visit + // this function + // + + auto const iter_max_reached = iter_ > maxIter_; + auto const norm_res_done = normRes < default_tol; + + if (iter_max_reached or norm_res_done) { + auto const to_print = iter_max_reached ? + "\n Maximum Number of Iterations Reached. \n\n" : + fmt::format("\n Max-Norm Residual Reduced by {} \n\n", default_tol); + + fmt::print(to_print); + + // Notify all nodes that computation is finished + objProxy_.broadcast<&NodeObj::workFinishedHandler>(); + } else { + fmt::print(" ## ITER {} >> Residual Norm = {} \n", iter_, normRes); + } + } + + void doIteration() { + + iter_ += 1; + + // + //--- Copy extremal values + // + tcur_[0] = told_[0]; + tcur_[numRowsPerObject_+1] = told_[numRowsPerObject_+1]; + + // + //---- Jacobi iteration step + //---- A tridiagonal matrix = "tridiag" ( [-1.0 2.0 -1.0] ) + //---- rhs_ right hand side vector + // + for (size_t ii = 1; ii <= numRowsPerObject_; ++ii) { + tcur_[ii] = 0.5*(rhs_[ii] + told_[ii-1] + told_[ii+1]); + } + + std::copy(tcur_.begin(), tcur_.end(), told_.begin()); + + // + // Compute the maximum entries among the rows on this object + // We do not take into account the "ghost" entries + // as they may be "out of date". + // + + double maxNorm = 0.0; + + for (size_t ii = 1; ii < tcur_.size()-1; ++ii) { + double val = tcur_[ii]; + maxNorm = (maxNorm > std::fabs(val)) ? maxNorm : std::fabs(val); + } + + auto proxy = this->getCollectionProxy(); + proxy.reduce<&LinearPb1DJacobi::checkCompleteCB, vt::collective::MaxOp>( + proxy[0], maxNorm + ); + + } + + + struct VecMsg : vt::CollectionMessage { + using MessageParentType = vt::CollectionMessage; + vt_msg_serialize_if_needed_by_parent_or_type1(vt::IdxBase); + + VecMsg() = default; + + VecMsg(vt::IdxBase const& in_index, double const& ref) : + vt::CollectionMessage(), + from_index(in_index), val(ref) + { } + + template + void serialize(Serializer& s) { + MessageParentType::serialize(s); + s | from_index; + s | val; + } + + vt::IdxBase from_index = 0; + double val = 0.0; + }; + + void exchange(VecMsg *msg) { + // Receive and treat the message from a neighboring object. + + const vt::IdxBase myIdx = getIndex().x(); + + if (myIdx > msg->from_index) { + this->told_[0] = msg->val; + msgReceived_ += 1; + } + + if (myIdx < msg->from_index) { + this->told_[numRowsPerObject_ + 1] = msg->val; + msgReceived_ += 1; + } + + // Check whether this 'object' has received all the expected messages. + if (msgReceived_ == totalReceive_) { + msgReceived_ = 0; + doIteration(); + } + + } + + void doIter(BlankMsg *msg) { + + // + // Treat the particular case of 1 object + // where no communication is needed. + // Without this treatment, the code would not iterate. + // + + if (numObjs_ == 1) { + doIteration(); + return; + } + //--------------------------------------- + + // + // Routine to send information to a different object + // + + vt::IdxBase const myIdx = getIndex().x(); + + //--- Send the values to the left + auto proxy = this->getCollectionProxy(); + if (myIdx > 0) { + proxy[myIdx - 1].send( + myIdx, told_[1] + ); + } + + //--- Send values to the right + if (size_t(myIdx) < numObjs_ - 1) { + proxy[myIdx + 1].send( + myIdx, told_[numRowsPerObject_] + ); + } + } + + + void init() { + + tcur_.assign(numRowsPerObject_ + 2, 0.0); + told_.assign(numRowsPerObject_ + 2, 0.0); + rhs_.assign(numRowsPerObject_ + 2, 0.0); + + double h = 1.0 / (numRowsPerObject_ * numObjs_ + 1.0); + int nf = 3 * int(numRowsPerObject_ * numObjs_ + 1) / 4; + + size_t const myIdx = getIndex().x(); + + for (size_t ii = 0; ii < tcur_.size(); ++ii) { + double x0 = ( numRowsPerObject_ * myIdx + ii) * h; + tcur_[ii] = sin(nf * M_PI * x0 * x0); + } + + totalReceive_ = 2; + + if (myIdx == 0) { + tcur_[0] = 0.0; + totalReceive_ -= 1; + } + + if (myIdx == numObjs_ - 1) { + tcur_[numRowsPerObject_+1] = 0.0; + totalReceive_ -= 1; + } + + std::copy(tcur_.begin(), tcur_.end(), told_.begin()); + + } + + + void init(LPMsg* msg) { + numObjs_ = msg->numObjects; + numRowsPerObject_ = msg->nRowPerObject; + maxIter_ = msg->iterMax; + objProxy_ = msg->objProxy; + + // Initialize the starting vector + init(); + } + +}; + +bool isWorkDone( vt::objgroup::proxy::Proxy const& proxy){ + auto const this_node = vt::theContext()->getNode(); + return proxy[this_node].invoke<&NodeObj::isWorkFinished>(); +} + +int main(int argc, char** argv) { + + size_t num_objs = default_num_objs; + size_t numRowsPerObject = default_nrow_object; + size_t maxIter = 8; + + std::string name(argv[0]); + + vt::initialize(argc, argv); + + vt::NodeType this_node = vt::theContext()->getNode(); + vt::NodeType num_nodes = vt::theContext()->getNumNodes(); + + if (argc == 1) { + if (this_node == 0) { + fmt::print( + stderr, "{}: using default arguments since none provided\n", name + ); + } + num_objs = default_num_objs * num_nodes; + } else if (argc == 2) { + num_objs = static_cast(strtol(argv[1], nullptr, 10)); + } + else if (argc == 3) { + num_objs = static_cast(strtol(argv[1], nullptr, 10)); + numRowsPerObject = static_cast(strtol(argv[2], nullptr, 10)); + } + else if (argc == 4) { + num_objs = static_cast(strtol(argv[1], nullptr, 10)); + numRowsPerObject = static_cast(strtol(argv[2], nullptr, 10)); + maxIter = static_cast(strtol(argv[3], nullptr, 10)); + } + else { + fmt::print( + stderr, "usage: {} \n", + name + ); + return 1; + } + + // Object group of all nodes that take part in computation + // Used to determine whether the computation is finished + auto grp_proxy = vt::theObjGroup()->makeCollective("examples_jacobi1d"); + + // Create the decomposition into objects + using BaseIndexType = typename vt::Index1D::DenseIndexType; + auto range = vt::Index1D(static_cast(num_objs)); + + auto col_proxy = vt::makeCollection("examples_jacobi1d") + .bounds(range) + .bulkInsert() + .wait(); + + vt::runInEpochCollective([col_proxy, grp_proxy, num_objs, numRowsPerObject, maxIter]{ + col_proxy.broadcastCollective( + num_objs, numRowsPerObject, maxIter, grp_proxy + ); + }); + + while(!isWorkDone(grp_proxy)){ + vt::runInEpochCollective([col_proxy]{ + col_proxy.broadcastCollective< + LinearPb1DJacobi::BlankMsg, &LinearPb1DJacobi::doIter + >(); + }); + + vt::thePhase()->nextPhaseCollective(); + } + + vt::finalize(); + + return 0; +} + +/// [Jacobi1D example] diff --git a/examples/collection/jacobi2d_vt.cc b/examples/collection/jacobi2d_vt.cc index eba9608c43..e80b8c3b6d 100644 --- a/examples/collection/jacobi2d_vt.cc +++ b/examples/collection/jacobi2d_vt.cc @@ -42,6 +42,7 @@ */ #include +#include #include #include @@ -76,23 +77,17 @@ static constexpr std::size_t const default_nrow_object = 8; static constexpr std::size_t const default_num_objs = 4; +static std::size_t check_conv_freq = 1; static constexpr double const default_tol = 1.0e-02; -struct NodeObj { - bool is_finished_ = false; - void workFinishedHandler() { is_finished_ = true; } - bool isWorkFinished() { return is_finished_; } -}; +struct NodeObj; using NodeObjProxy = vt::objgroup::proxy::Proxy; struct LinearPb2DJacobi : vt::Collection { - private: - std::vector tcur_, told_; std::vector rhs_; size_t iter_ = 0; - size_t msgReceived_ = 0, totalReceive_ = 0; size_t numObjsX_ = 1, numObjsY_ = 1; size_t numRowsPerObject_ = default_nrow_object; size_t maxIter_ = 5; @@ -102,55 +97,13 @@ struct LinearPb2DJacobi : vt::Collection { LinearPb2DJacobi() : tcur_(), told_(), rhs_(), iter_(0), - msgReceived_(0), totalReceive_(0), numObjsX_(1), numObjsY_(1), numRowsPerObject_(default_nrow_object), maxIter_(5) { } - struct BlankMsg : vt::CollectionMessage { }; - - struct LPMsg : vt::CollectionMessage { - - size_t numXObjs = 0; - size_t numYObjs = 0; - size_t numIter = 0; - NodeObjProxy objProxy; - - LPMsg() = default; - - LPMsg(const size_t nx, const size_t ny, const size_t nref, NodeObjProxy proxy) - : numXObjs(nx), numYObjs(ny), numIter(nref), objProxy(proxy) - { } - - }; - - - void checkCompleteCB(double const normRes) { - // - // Only one object for the reduction will visit - // this function - // - - auto const iter_max_reached = iter_ > maxIter_; - auto const norm_res_done = normRes < default_tol; - - if (iter_max_reached or norm_res_done) { - auto const to_print = iter_max_reached ? - "\n Maximum Number of Iterations Reached. \n\n" : - fmt::format("\n Max-Norm Residual Reduced by {} \n\n", default_tol); - - fmt::print(to_print); - - // Notify all nodes that computation is finished - objProxy_.broadcast<&NodeObj::workFinishedHandler>(); - } else { - fmt::print(" ## ITER {} >> Residual Norm = {} \n", iter_, normRes); - } - } - - void doIteration() { + void kernel() { // //--- Copy ghost values @@ -195,15 +148,13 @@ struct LinearPb2DJacobi : vt::Collection { iter_ += 1; std::copy(tcur_.begin(), tcur_.end(), told_.begin()); + } - // - // Compute the maximum entries among the rows on this object - // We do not take into account the "ghost" entries - // as they may be "out of date". - // - + double computeMaxNorm() { + // Compute the maximum entries among the rows on this object We do not take + // into account the "ghost" entries as they may be "out of date". double maxNorm = 0.0; - + size_t ldx = numRowsPerObject_ + 2; for (size_t iy = 1; iy <= numRowsPerObject_; ++iy) { for (size_t ix = 1; ix <= numRowsPerObject_; ++ix) { size_t node = ix + iy * ldx; @@ -211,81 +162,40 @@ struct LinearPb2DJacobi : vt::Collection { maxNorm = (maxNorm > std::fabs(val)) ? maxNorm : std::fabs(val); } } + return maxNorm; + } + void reduceMaxNorm(vt::Callback cb) { auto proxy = this->getCollectionProxy(); - proxy.reduce<&LinearPb2DJacobi::checkCompleteCB, vt::collective::MaxOp>( - proxy(0,0), maxNorm - ); + proxy.reduce(cb, computeMaxNorm()); } - struct VecMsg : vt::CollectionMessage { - using MessageParentType = vt::CollectionMessage; - vt_msg_serialize_required(); // stl vector - - VecMsg() = default; - VecMsg(IndexType const& in_index, const std::vector &ref) : - vt::CollectionMessage(), - from_index(in_index), val(ref) - { } - - template - void serialize(Serializer& s) { - MessageParentType::serialize(s); - s | from_index; - s | val; - } - - IndexType from_index; - std::vector val; - }; - - void exchange(VecMsg *msg) { - + void exchange(vt::Index2D from_index, std::vector val, std::size_t in_iter) { // Receive and treat the message from a neighboring object. - - if (this->getIndex().x() > msg->from_index.x()) { + vtAssert(iter_ == in_iter, "iters should match"); + if (this->getIndex().x() > from_index.x()) { const size_t ldx = numRowsPerObject_ + 2; - for (size_t jy = 0; jy < msg->val.size(); ++jy) { - this->told_[jy*ldx] = msg->val[jy]; + for (size_t jy = 0; jy < val.size(); ++jy) { + this->told_[jy*ldx] = val[jy]; } - msgReceived_ += 1; } - else if (this->getIndex().x() < msg->from_index.x()) { + else if (this->getIndex().x() < from_index.x()) { const size_t ldx = numRowsPerObject_ + 2; - for (size_t jy = 0; jy < msg->val.size(); ++jy) { - this->told_[numRowsPerObject_ + 1 + jy*ldx] = msg->val[jy]; + for (size_t jy = 0; jy < val.size(); ++jy) { + this->told_[numRowsPerObject_ + 1 + jy*ldx] = val[jy]; } - msgReceived_ += 1; } - else if (this->getIndex().y() > msg->from_index.y()) { - std::copy(msg->val.begin(), msg->val.end(), this->told_.begin()); - msgReceived_ += 1; + else if (this->getIndex().y() > from_index.y()) { + std::copy(val.begin(), val.end(), this->told_.begin()); } - else if (this->getIndex().y() < msg->from_index.y()) { - std::copy(msg->val.begin(), msg->val.end(), + else if (this->getIndex().y() < from_index.y()) { + std::copy(val.begin(), val.end(), &this->told_[(numRowsPerObject_ + 1)*(numRowsPerObject_ + 2)]); - msgReceived_ += 1; - } - - if (msgReceived_ == totalReceive_) { - msgReceived_ = 0; - doIteration(); } - } - void doIter(BlankMsg *msg) { - - // - // Treat the particular case of 1 object - // where no communication is needed. - // Without this treatment, the code would not iterate. - // - if (numObjsX_*numObjsY_ <= 1) { - doIteration(); - return; - } - //--------------------------------------- + void sendNeighbor(std::tuple direction) { + using Tuple = std::tuple; // // Routine to send information to a neighboring object @@ -296,41 +206,46 @@ struct LinearPb2DJacobi : vt::Collection { auto const x = idx.x(); auto const y = idx.y(); - if (x > 0) { - std::vector tcopy(numRowsPerObject_ + 2, 0.0); - for (size_t jy = 1; jy <= numRowsPerObject_; ++jy) - tcopy[jy] = told_[1 + jy * (numRowsPerObject_ + 2)]; - proxy(x-1, y).send(idx, tcopy); + if (direction == Tuple{-1, 0}) { + if (x > 0) { + std::vector tcopy(numRowsPerObject_ + 2, 0.0); + for (size_t jy = 1; jy <= numRowsPerObject_; ++jy) + tcopy[jy] = told_[1 + jy * (numRowsPerObject_ + 2)]; + proxy(x-1, y).send<&LinearPb2DJacobi::exchange>(idx, tcopy, iter_); + } } - if (y > 0) { - std::vector tcopy(numRowsPerObject_ + 2, 0.0); - for (size_t jx = 1; jx <= numRowsPerObject_; ++jx) - tcopy[jx] = told_[jx + (numRowsPerObject_ + 2)]; - proxy(x, y-1).send(idx, tcopy); + if (direction == Tuple{0, -1}) { + if (y > 0) { + std::vector tcopy(numRowsPerObject_ + 2, 0.0); + for (size_t jx = 1; jx <= numRowsPerObject_; ++jx) + tcopy[jx] = told_[jx + (numRowsPerObject_ + 2)]; + proxy(x, y-1).send<&LinearPb2DJacobi::exchange>(idx, tcopy, iter_); + } } - if (size_t(x) < numObjsX_ - 1) { - std::vector tcopy(numRowsPerObject_ + 2, 0.0); - for (size_t jy = 1; jy <= numRowsPerObject_; ++jy) { - tcopy[jy] = told_[numRowsPerObject_ + - jy * (numRowsPerObject_ + 2)]; + if (direction == Tuple{1, 0}) { + if (size_t(x) < numObjsX_ - 1) { + std::vector tcopy(numRowsPerObject_ + 2, 0.0); + for (size_t jy = 1; jy <= numRowsPerObject_; ++jy) { + tcopy[jy] = told_[numRowsPerObject_ + + jy * (numRowsPerObject_ + 2)]; + } + proxy(x+1, y).send<&LinearPb2DJacobi::exchange>(idx, tcopy, iter_); } - proxy(x+1, y).send(idx, tcopy); } - if (size_t(y) < numObjsY_ - 1) { - std::vector tcopy(numRowsPerObject_ + 2, 0.0); - for (size_t jx = 1; jx <= numRowsPerObject_; ++jx) - tcopy[jx] = told_[jx + numRowsPerObject_ * (numRowsPerObject_ + 2)]; - proxy(x, y+1).send(idx, tcopy); + if (direction == Tuple{0, 1}) { + if (size_t(y) < numObjsY_ - 1) { + std::vector tcopy(numRowsPerObject_ + 2, 0.0); + for (size_t jx = 1; jx <= numRowsPerObject_; ++jx) + tcopy[jx] = told_[jx + numRowsPerObject_ * (numRowsPerObject_ + 2)]; + proxy(x, y+1).send<&LinearPb2DJacobi::exchange>(idx, tcopy, iter_); + } } - } - - void init() { - + void initializeData() { //--- Each object will work with (numRowsPerObject_ + 2) unknowns //--- or (numRowsPerObject_ + 2) rows of the matrix size_t ldx = numRowsPerObject_ + 2, ldy = ldx; @@ -362,8 +277,6 @@ struct LinearPb2DJacobi : vt::Collection { } } - totalReceive_ = 4; - // //--- The unknowns correspond to the interior nodes //--- of a regular orthogonal grid on [0, 1] x [0, 1] @@ -375,55 +288,160 @@ struct LinearPb2DJacobi : vt::Collection { if (idx.x() == 0) { for (size_t jy = 0; jy < ldy; ++jy) tcur_[jy * ldy] = 0.0; - totalReceive_ -= 1; } if (idx.y() == 0) { for (size_t jx = 0; jx < ldx; ++jx) tcur_[jx] = 0.0; - totalReceive_ -= 1; } if (numObjsX_ == size_t(idx.x()) + 1) { for (size_t jy = 0; jy < ldy; ++jy) tcur_[jy * ldy + (ldx - 1)] = 0.0; - totalReceive_ -= 1; } if (numObjsY_ == size_t(idx.y()) + 1) { for (size_t jx = 0; jx < ldx; ++jx) tcur_[jx + (ldx - 1)*ldy] = 0.0; - totalReceive_ -= 1; } - std::copy(tcur_.begin(), tcur_.end(), told_.begin()); - } - void init(LPMsg* msg) { - - numObjsX_ = msg->numXObjs; - numObjsY_ = msg->numYObjs; - maxIter_ = msg->numIter; - objProxy_ = msg->objProxy; + void init( + size_t numXObjs, size_t numYObjs, size_t nRowPerObject, size_t iterMax, + NodeObjProxy objProxy + ) { + numRowsPerObject_ = nRowPerObject; + numObjsX_ = numXObjs; + numObjsY_ = numYObjs; + maxIter_ = iterMax; + objProxy_ = objProxy; // Initialize the starting vector - init(); + initializeData(); } }; -bool isWorkDone( vt::objgroup::proxy::Proxy const& proxy){ - auto const this_node = vt::theContext()->getNode(); - return proxy[this_node].invoke<&NodeObj::isWorkFinished>(); -} +struct NodeObj { + using ChainSetType = vt::messaging::CollectionChainSet; + + NodeObj( + vt::CollectionProxy in_proxy, std::size_t in_num_objs_x, + std::size_t in_num_objs_y, std::size_t in_max_iter + ) : proxy_(in_proxy), + num_objs_x_(in_num_objs_x), + num_objs_y_(in_num_objs_y), + max_iter_(in_max_iter) + { } + + void reducedNorm(double normRes) { + converged_ = normRes < default_tol; + if (vt::theContext()->getNode() == 0) { + fmt::print( + "## ITER {} >> Residual Norm = {}, conv={} \n", + cur_iter_, normRes, converged_ + ); + } + } + + void setup(vt::objgroup::proxy::Proxy in_proxy) { + chains_ = std::make_unique(proxy_); + this_proxy_ = in_proxy; + } + + void runToConvergence() { + using Tuple = std::tuple; + + vt::task::TaskCollective* prev_kernel = nullptr; + + auto iteration = chains_->createTaskRegion([&]{ + std::array dirs = { + Tuple{-1, 0}, Tuple{0,-1}, Tuple{1,0}, Tuple{0,1} + }; + std::map, vt::task::TaskCollective*> dep_dir; + + for (auto const& [x, y] : dirs) { + auto dep = chains_->taskCollective("exchange", [&](auto idx, auto t) { + if (prev_kernel) { + t->dependsOn(idx, prev_kernel); + + vt::Index2D idx_dir{idx.x() + x, idx.y() + y}; + + if (idx_dir.x() >= 0 and idx_dir.x() < int(num_objs_x_) and + idx_dir.y() >= 0 and idx_dir.y() < int(num_objs_y_)) { + t->dependsOn(idx_dir, prev_kernel); + } + } + return proxy_[idx].template send<&LinearPb2DJacobi::sendNeighbor>(Tuple{x,y}); + }); + dep_dir[Tuple{x,y}] = dep; + } + + prev_kernel = chains_->taskCollective("kernel", [&](auto idx, auto t) { + if (idx.x() != 0) { + t->dependsOn(vt::Index2D(idx.x() - 1, idx.y()), dep_dir[Tuple{1,0}]); + } + if (idx.y() != 0) { + t->dependsOn(vt::Index2D(idx.x(), idx.y() - 1), dep_dir[Tuple{0,1}]); + } + if (static_cast(idx.x()) != num_objs_x_ - 1) { + t->dependsOn(vt::Index2D(idx.x() + 1, idx.y()), dep_dir[Tuple{-1,0}]); + } + if (static_cast(idx.y()) != num_objs_y_ - 1) { + t->dependsOn(vt::Index2D(idx.x(), idx.y() + 1), dep_dir[Tuple{0,-1}]); + } + return proxy_[idx].template send<&LinearPb2DJacobi::kernel>(); + }); + }); + + while (not converged_ and cur_iter_ < max_iter_) { + iteration->enqueueTasks(); + + if (cur_iter_++ % check_conv_freq == 0) { + chains_->taskCollective("checkConv", [&](auto idx, auto t) { + t->dependsOn(idx, prev_kernel); + auto cb = vt::theCB()->makeBcast<&NodeObj::reducedNorm>(this_proxy_); + return proxy_[idx].template send<&LinearPb2DJacobi::reduceMaxNorm>(cb); + }); + + iteration->waitCollective(); + vt::thePhase()->nextPhaseCollective(); + } + } + + iteration->waitCollective(); + + if (vt::theContext()->getNode() == 0) { + if (not converged_) { + fmt::print("Maximum Number of Iterations Reached without convergence.\n"); + } else { + fmt::print("Convergence is reached at iteration {}.\n", cur_iter_); + } + } + + chains_->phaseDone(); + chains_ = nullptr; + } + +private: + vt::CollectionProxy proxy_; + std::size_t num_objs_x_ = 0; + std::size_t num_objs_y_ = 0; + std::size_t max_iter_ = 0; + std::size_t cur_iter_ = 0; + std::unique_ptr chains_; + bool converged_ = false; + vt::objgroup::proxy::Proxy this_proxy_; +}; int main(int argc, char** argv) { size_t numX_objs = default_num_objs; size_t numY_objs = default_num_objs; size_t maxIter = 10; + size_t numRows = default_nrow_object; std::string name(argv[0]); @@ -447,9 +465,20 @@ int main(int argc, char** argv) { numY_objs = (size_t) strtol(argv[2], nullptr, 10); maxIter = (size_t) strtol(argv[3], nullptr, 10); } + else if (argc == 5 or argc == 6) { + numX_objs = (size_t) strtol(argv[1], nullptr, 10); + numY_objs = (size_t) strtol(argv[2], nullptr, 10); + maxIter = (size_t) strtol(argv[3], nullptr, 10); + numRows = (size_t) strtol(argv[4], nullptr, 10); + if (argc == 6) { + check_conv_freq = (size_t) strtol(argv[5], nullptr, 10); + } + } else { fmt::print( - stderr, "usage: {} \n", + stderr, + "usage: {} " + " \n", name ); return 1; @@ -457,7 +486,6 @@ int main(int argc, char** argv) { } /* --- Print information about the simulation */ - if (this_node == 0) { fmt::print( stdout, "\n - Solve the linear system for the Laplacian with homogeneous Dirichlet" @@ -475,10 +503,6 @@ int main(int argc, char** argv) { fmt::print(stdout, "\n"); } - // Object group of all nodes that take part in computation - // Used to determine whether the computation is finished - auto grp_proxy = vt::theObjGroup()->makeCollective("examples_jacobi2d"); - // Create the decomposition into objects using BaseIndexType = typename vt::Index2D::DenseIndexType; auto range = vt::Index2D( @@ -486,25 +510,25 @@ int main(int argc, char** argv) { static_cast(numY_objs) ); - auto col_proxy = vt::makeCollection("examples_jacobi2d") + auto col_proxy = vt::makeCollection("jacobi2d") .bounds(range) .bulkInsert() .wait(); - vt::runInEpochCollective([col_proxy, grp_proxy, numX_objs, numY_objs, maxIter] { - col_proxy.broadcastCollective( - numX_objs, numY_objs, maxIter, grp_proxy + // Object group of all nodes that take part in computation + // Used to determine whether the computation is finished + auto grp_proxy = vt::theObjGroup()->makeCollective( + "jacobi2d", col_proxy, numX_objs, numY_objs, maxIter + ); + + vt::runInEpochCollective([=] { + col_proxy.broadcastCollective<&LinearPb2DJacobi::init>( + numX_objs, numY_objs, numRows, maxIter, grp_proxy ); }); - while (!isWorkDone(grp_proxy)) { - vt::runInEpochCollective([col_proxy] { - col_proxy.broadcastCollective< - LinearPb2DJacobi::BlankMsg, &LinearPb2DJacobi::doIter>(); - }); - - vt::thePhase()->nextPhaseCollective(); - } + grp_proxy.get()->setup(grp_proxy); + grp_proxy.get()->runToConvergence(); vt::finalize(); diff --git a/examples/collection/jacobi2d_vt_sync.cc b/examples/collection/jacobi2d_vt_sync.cc new file mode 100644 index 0000000000..74ed717195 --- /dev/null +++ b/examples/collection/jacobi2d_vt_sync.cc @@ -0,0 +1,521 @@ +/* +//@HEADER +// ***************************************************************************** +// +// jacobi2d_vt.cc +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#include + +#include +#include +#include + +/// [Jacobi2D example] + +// +// This code applies a few steps of the Jacobi iteration to +// the linear system A x = 0 +// where is a banded symmetric positive definite matrix. +// The initial guess for x is a made-up non-zero vector. +// The exact solution is the vector 0. +// +// The matrix A is square and invertible. +// The number of rows is ((number of objects) * (number of rows per object)) +// +// Such a matrix A is obtained when using 2nd-order finite difference +// for discretizing +// +// -d^2 u / dx^2 -d^2 u / dy^2 = f on [0, 1] x [0, 1] +// +// with homogeneous Dirichlet condition +// +// u = 0 on the boundary of [0, 1] x [0, 1] +// +// using a uniform grid with grid size +// +// 1 / ((number of objects) * (number of rows per object) + 1) +// + + +static constexpr std::size_t const default_nrow_object = 8; +static constexpr std::size_t const default_num_objs = 4; +static constexpr double const default_tol = 1.0e-02; + +struct NodeObj { + bool is_finished_ = false; + void workFinishedHandler() { is_finished_ = true; } + bool isWorkFinished() { return is_finished_; } +}; +using NodeObjProxy = vt::objgroup::proxy::Proxy; + +struct LinearPb2DJacobi : vt::Collection { + +private: + + std::vector tcur_, told_; + std::vector rhs_; + size_t iter_ = 0; + size_t msgReceived_ = 0, totalReceive_ = 0; + size_t numObjsX_ = 1, numObjsY_ = 1; + size_t numRowsPerObject_ = default_nrow_object; + size_t maxIter_ = 5; + NodeObjProxy objProxy_; + +public: + + LinearPb2DJacobi() + : tcur_(), told_(), rhs_(), iter_(0), + msgReceived_(0), totalReceive_(0), + numObjsX_(1), numObjsY_(1), + numRowsPerObject_(default_nrow_object), + maxIter_(5) + { } + + struct LPMsg : vt::CollectionMessage { + + size_t numXObjs = 0; + size_t numYObjs = 0; + size_t numRows = 0; + size_t numIter = 0; + NodeObjProxy objProxy; + + LPMsg() = default; + + LPMsg(const size_t nx, const size_t ny, const size_t rows, const size_t nref, NodeObjProxy proxy) + : numXObjs(nx), numYObjs(ny), numRows(rows), numIter(nref), objProxy(proxy) + { } + + }; + + + void checkCompleteCB(double const normRes) { + // + // Only one object for the reduction will visit + // this function + // + + auto const iter_max_reached = iter_ > maxIter_; + auto const norm_res_done = normRes < default_tol; + + if (iter_max_reached or norm_res_done) { + auto const to_print = iter_max_reached ? + "\n Maximum Number of Iterations Reached. \n\n" : + fmt::format("\n Max-Norm Residual Reduced by {} \n\n", default_tol); + + fmt::print(to_print); + + // Notify all nodes that computation is finished + objProxy_.broadcast<&NodeObj::workFinishedHandler>(); + } else { + fmt::print(" ## ITER {} >> Residual Norm = {} \n", iter_, normRes); + } + } + + void doIteration() { + + // + //--- Copy ghost values + // + + size_t ldx = numRowsPerObject_ + 2; + size_t ldy = numRowsPerObject_ + 2; + + for (size_t jx = 0; jx < ldx; ++jx) + tcur_[jx] = told_[jx]; + + for (size_t jx = 0; jx < ldx; ++jx) + tcur_[jx + (ldy-1) * ldx] = told_[jx + (ldy-1) * ldx]; + + for (size_t jy = 0; jy < ldy; ++jy) + tcur_[jy * ldx] = told_[jy * ldx]; + + for (size_t jy = 0; jy < ldy; ++jy) + tcur_[ldx-1 + jy * ldx] = told_[ldx-1 + jy * ldx]; + + // + //--- Update my row values + // + + for (size_t iy = 1; iy <= numRowsPerObject_; ++iy) { + for (size_t ix = 1; ix <= numRowsPerObject_; ++ix) { + // + //---- Jacobi iteration step for + //---- A banded matrix for the 5-point stencil + //---- [ 0.0 -1.0 0.0] + //---- [-1.0 4.0 -1.0] + //---- [ 0.0 -1.0 0.0] + //---- rhs_ right hand side vector + // + size_t node = ix + iy * ldx; + tcur_[node] = 0.25 * (rhs_[node] + + told_[node - 1] + told_[node + 1] + + told_[node - ldx] + told_[node + ldx]); + } + } + + iter_ += 1; + + std::copy(tcur_.begin(), tcur_.end(), told_.begin()); + + // + // Compute the maximum entries among the rows on this object + // We do not take into account the "ghost" entries + // as they may be "out of date". + // + + double maxNorm = 0.0; + + for (size_t iy = 1; iy <= numRowsPerObject_; ++iy) { + for (size_t ix = 1; ix <= numRowsPerObject_; ++ix) { + size_t node = ix + iy * ldx; + double val = tcur_[node]; + maxNorm = (maxNorm > std::fabs(val)) ? maxNorm : std::fabs(val); + } + } + + auto proxy = this->getCollectionProxy(); + proxy.reduce<&LinearPb2DJacobi::checkCompleteCB, vt::collective::MaxOp>( + proxy(0,0), maxNorm + ); + } + + struct VecMsg : vt::CollectionMessage { + using MessageParentType = vt::CollectionMessage; + vt_msg_serialize_required(); // stl vector + + VecMsg() = default; + VecMsg(IndexType const& in_index, const std::vector &ref) : + vt::CollectionMessage(), + from_index(in_index), val(ref) + { } + + template + void serialize(Serializer& s) { + MessageParentType::serialize(s); + s | from_index; + s | val; + } + + IndexType from_index; + std::vector val; + }; + + void exchange(VecMsg *msg) { + + // Receive and treat the message from a neighboring object. + + if (this->getIndex().x() > msg->from_index.x()) { + const size_t ldx = numRowsPerObject_ + 2; + for (size_t jy = 0; jy < msg->val.size(); ++jy) { + this->told_[jy*ldx] = msg->val[jy]; + } + msgReceived_ += 1; + } + else if (this->getIndex().x() < msg->from_index.x()) { + const size_t ldx = numRowsPerObject_ + 2; + for (size_t jy = 0; jy < msg->val.size(); ++jy) { + this->told_[numRowsPerObject_ + 1 + jy*ldx] = msg->val[jy]; + } + msgReceived_ += 1; + } + else if (this->getIndex().y() > msg->from_index.y()) { + std::copy(msg->val.begin(), msg->val.end(), this->told_.begin()); + msgReceived_ += 1; + } + else if (this->getIndex().y() < msg->from_index.y()) { + std::copy(msg->val.begin(), msg->val.end(), + &this->told_[(numRowsPerObject_ + 1)*(numRowsPerObject_ + 2)]); + msgReceived_ += 1; + } + + if (msgReceived_ == totalReceive_) { + msgReceived_ = 0; + doIteration(); + } + + } + + void doIter() { + + // + // Treat the particular case of 1 object + // where no communication is needed. + // Without this treatment, the code would not iterate. + // + if (numObjsX_*numObjsY_ <= 1) { + doIteration(); + return; + } + //--------------------------------------- + + // + // Routine to send information to a neighboring object + // + + auto proxy = this->getCollectionProxy(); + auto idx = this->getIndex(); + auto const x = idx.x(); + auto const y = idx.y(); + + if (x > 0) { + std::vector tcopy(numRowsPerObject_ + 2, 0.0); + for (size_t jy = 1; jy <= numRowsPerObject_; ++jy) + tcopy[jy] = told_[1 + jy * (numRowsPerObject_ + 2)]; + proxy(x-1, y).send(idx, tcopy); + } + + if (y > 0) { + std::vector tcopy(numRowsPerObject_ + 2, 0.0); + for (size_t jx = 1; jx <= numRowsPerObject_; ++jx) + tcopy[jx] = told_[jx + (numRowsPerObject_ + 2)]; + proxy(x, y-1).send(idx, tcopy); + } + + if (size_t(x) < numObjsX_ - 1) { + std::vector tcopy(numRowsPerObject_ + 2, 0.0); + for (size_t jy = 1; jy <= numRowsPerObject_; ++jy) { + tcopy[jy] = told_[numRowsPerObject_ + + jy * (numRowsPerObject_ + 2)]; + } + proxy(x+1, y).send(idx, tcopy); + } + + if (size_t(y) < numObjsY_ - 1) { + std::vector tcopy(numRowsPerObject_ + 2, 0.0); + for (size_t jx = 1; jx <= numRowsPerObject_; ++jx) + tcopy[jx] = told_[jx + numRowsPerObject_ * (numRowsPerObject_ + 2)]; + proxy(x, y+1).send(idx, tcopy); + } + + } + + + void init() { + + //--- Each object will work with (numRowsPerObject_ + 2) unknowns + //--- or (numRowsPerObject_ + 2) rows of the matrix + size_t ldx = numRowsPerObject_ + 2, ldy = ldx; + + size_t vecSize = ldx * ldy; + tcur_.assign(vecSize, 0.0); + told_.assign(vecSize, 0.0); + rhs_.assign(vecSize, 0.0); + + // + // Set the initial vector to the values of + // a "high-frequency" function + // + + double hx = 1.0 / (numRowsPerObject_ * numObjsX_ + 1.0); + double hy = 1.0 / (numRowsPerObject_ * numObjsY_ + 1.0); + + size_t maxNObjs = (size_t) std::max(numObjsX_, numObjsY_); + int nf = 3 * int(numRowsPerObject_ * maxNObjs + 1) / 4; + + auto idx = this->getIndex(); + + for (size_t iy = 0; iy < ldy; ++iy) { + for (size_t ix = 0; ix < ldx; ++ix) { + double x0 = ( numRowsPerObject_ * idx.x() + ix) * hx; + double y0 = ( numRowsPerObject_ * idx.y() + iy) * hy; + size_t node = ix + iy * ldx; + tcur_[node] = sin(nf * M_PI * (x0 * x0 + y0 * y0)); + } + } + + totalReceive_ = 4; + + // + //--- The unknowns correspond to the interior nodes + //--- of a regular orthogonal grid on [0, 1] x [0, 1] + //--- The total number of grid points in X-direction is + //--- (numRowsPerObject_ * numObjsX_) + 2 + //--- The total number of grid points in Y-direction is + //--- (numRowsPerObject_ * numObjsY_) + 2 + // + if (idx.x() == 0) { + for (size_t jy = 0; jy < ldy; ++jy) + tcur_[jy * ldy] = 0.0; + totalReceive_ -= 1; + } + + if (idx.y() == 0) { + for (size_t jx = 0; jx < ldx; ++jx) + tcur_[jx] = 0.0; + totalReceive_ -= 1; + } + + if (numObjsX_ == size_t(idx.x()) + 1) { + for (size_t jy = 0; jy < ldy; ++jy) + tcur_[jy * ldy + (ldx - 1)] = 0.0; + totalReceive_ -= 1; + } + + if (numObjsY_ == size_t(idx.y()) + 1) { + for (size_t jx = 0; jx < ldx; ++jx) + tcur_[jx + (ldx - 1)*ldy] = 0.0; + totalReceive_ -= 1; + } + + std::copy(tcur_.begin(), tcur_.end(), told_.begin()); + + } + + + void init(LPMsg* msg) { + + numObjsX_ = msg->numXObjs; + numObjsY_ = msg->numYObjs; + maxIter_ = msg->numIter; + objProxy_ = msg->objProxy; + numRowsPerObject_ = msg->numRows; + + // Initialize the starting vector + init(); + } + +}; + +bool isWorkDone( vt::objgroup::proxy::Proxy const& proxy){ + auto const this_node = vt::theContext()->getNode(); + return proxy[this_node].invoke<&NodeObj::isWorkFinished>(); +} + +int main(int argc, char** argv) { + + size_t numX_objs = default_num_objs; + size_t numY_objs = default_num_objs; + size_t maxIter = 10; + size_t numRows = default_nrow_object; + + std::string name(argv[0]); + + vt::initialize(argc, argv); + + vt::NodeType this_node = vt::theContext()->getNode(); + + if (argc == 1) { + if (this_node == 0) { + fmt::print( + stderr, "{}: using default arguments since none provided\n", name + ); + } + } else { + if (argc == 3) { + numX_objs = (size_t) strtol(argv[1], nullptr, 10); + numY_objs = (size_t) strtol(argv[2], nullptr, 10); + } + else if (argc == 4) { + numX_objs = (size_t) strtol(argv[1], nullptr, 10); + numY_objs = (size_t) strtol(argv[2], nullptr, 10); + maxIter = (size_t) strtol(argv[3], nullptr, 10); + } + else if (argc == 5) { + numX_objs = (size_t) strtol(argv[1], nullptr, 10); + numY_objs = (size_t) strtol(argv[2], nullptr, 10); + maxIter = (size_t) strtol(argv[3], nullptr, 10); + numRows = (size_t) strtol(argv[4], nullptr, 10); + } + else { + fmt::print( + stderr, + "usage: {} " + " \n", + name + ); + return 1; + } + } + + /* --- Print information about the simulation */ + + if (this_node == 0) { + fmt::print( + stdout, "\n - Solve the linear system for the Laplacian with homogeneous Dirichlet" + " on [0, 1] x [0, 1]\n" + ); + fmt::print(stdout, " - Second-order centered finite difference\n"); + fmt::print( + stdout, " - Uniform grid with ({} x {} = {}) points in the x-direction and " + " ({} x {} = {}) points in the y-direction\n", + numX_objs, default_nrow_object, numX_objs * default_nrow_object, + numY_objs, default_nrow_object, numY_objs * default_nrow_object + ); + fmt::print(stdout, " - Maximum number of iterations {}\n", maxIter); + fmt::print(stdout, " - Convergence tolerance {}\n", default_tol); + fmt::print(stdout, "\n"); + } + + // Object group of all nodes that take part in computation + // Used to determine whether the computation is finished + auto grp_proxy = vt::theObjGroup()->makeCollective("examples_jacobi2d"); + + // Create the decomposition into objects + using BaseIndexType = typename vt::Index2D::DenseIndexType; + auto range = vt::Index2D( + static_cast(numX_objs), + static_cast(numY_objs) + ); + + auto col_proxy = vt::makeCollection("examples_jacobi2d") + .bounds(range) + .bulkInsert() + .wait(); + + vt::runInEpochCollective([=] { + col_proxy.broadcastCollective( + numX_objs, numY_objs, numRows, maxIter, grp_proxy + ); + }); + + while (!isWorkDone(grp_proxy)) { + vt::runInEpochCollective([col_proxy] { + col_proxy.broadcastCollective<&LinearPb2DJacobi::doIter>(); + }); + + vt::thePhase()->nextPhaseCollective(); + } + + vt::finalize(); + + return 0; + +} +/// [Jacobi2D example] diff --git a/src/vt/context/runnable_context/td.h b/src/vt/context/runnable_context/td.h index 3ab038e256..69ff403b3f 100644 --- a/src/vt/context/runnable_context/td.h +++ b/src/vt/context/runnable_context/td.h @@ -48,6 +48,7 @@ #include "vt/configs/types/types_type.h" #include "vt/configs/types/types_sentinels.h" #include "vt/epoch/epoch_type.h" +#include "vt/epoch/epoch_manip.h" #include @@ -111,6 +112,13 @@ struct TD { */ EpochType getEpoch() const { return ep_; } + /** + * \brief Set the epoch released bit + */ + void setEpochReleasedBit() { + epoch::EpochManip::setDepReleasedBit(ep_); + } + private: EpochType ep_ = no_epoch; /**< The epoch for the task */ #if vt_check_enabled(fcontext) diff --git a/src/vt/epoch/epoch.h b/src/vt/epoch/epoch.h index 7745b1f95f..4845626aae 100644 --- a/src/vt/epoch/epoch.h +++ b/src/vt/epoch/epoch.h @@ -105,7 +105,7 @@ static constexpr BitCountType const epoch_root_num_bits = 1; * ensure the \c epoch_category_num_bits is sufficiently large. * */ -static constexpr BitCountType const epoch_category_num_bits = 2; +static constexpr BitCountType const epoch_category_num_bits = 3; /** * \brief These are different categories of epochs that are allowed. @@ -114,9 +114,10 @@ static constexpr BitCountType const epoch_category_num_bits = 2; * be used to dispatch control logic. */ enum struct eEpochCategory : int8_t { - NoCategoryEpoch = 0x0, - DependentEpoch = 0x1, - DijkstraScholtenEpoch = 0x2 + NoCategoryEpoch = 0x0, + DependentEpoch = 0x1, + DijkstraScholtenEpoch = 0x2, + DependentReleasedEpoch = 0x3 }; /// Operator<< for printing the epoch category \c eEpochCategory enum diff --git a/src/vt/epoch/epoch_manip.cc b/src/vt/epoch/epoch_manip.cc index 1cdda37995..a69eb3300c 100644 --- a/src/vt/epoch/epoch_manip.cc +++ b/src/vt/epoch/epoch_manip.cc @@ -162,6 +162,55 @@ EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) { return BitPackerType::boolGetField(cat); } +/*static*/ bool EpochManip::isDepReleased(EpochType epoch) { + using T = typename std::underlying_type::type; + if (epoch == no_epoch or epoch == term::any_epoch_sentinel) { + return false; + } + BitPackerType::FieldType const dep_bit = + static_cast(eEpochCategory::DependentReleasedEpoch) - 1; + auto cat = static_cast(EpochManip::category(epoch)); + auto ret = BitPackerType::boolGetField(cat); + // vt_print(gen, "isDepReleased ep={:x}, ret={}\n", epoch, ret); + return ret; +} + +/*static*/ void EpochManip::clearDepReleasedBit(EpochType& epoch) { + using ImplType = typename EpochType::ImplType; + using T = typename std::underlying_type::type; + if (epoch == no_epoch or epoch == term::any_epoch_sentinel) { + return; + } + BitPackerType::FieldType const dep_bit = + static_cast(eEpochCategory::DependentReleasedEpoch) - 1; + + if (isRooted(epoch)) { + auto constexpr bit = eEpochRoot::rEpochCategory + dep_bit; + BitPackerType::boolSetField(*epoch, false); + } else { + auto constexpr bit = eEpochColl::cEpochCategory + dep_bit; + BitPackerType::boolSetField(*epoch, false); + } +} + +/*static*/ void EpochManip::setDepReleasedBit(EpochType& epoch) { + using ImplType = typename EpochType::ImplType; + using T = typename std::underlying_type::type; + if (epoch == no_epoch or epoch == term::any_epoch_sentinel) { + return; + } + BitPackerType::FieldType const dep_bit = + static_cast(eEpochCategory::DependentReleasedEpoch) - 1; + + if (isRooted(epoch)) { + auto constexpr bit = eEpochRoot::rEpochCategory + dep_bit; + BitPackerType::boolSetField(*epoch, true); + } else { + auto constexpr bit = eEpochColl::cEpochCategory + dep_bit; + BitPackerType::boolSetField(*epoch, true); + } +} + /*static*/ eEpochCategory EpochManip::category(EpochType const& epoch) { return BitPackerType::getField< eEpochRoot::rEpochCategory, epoch_category_num_bits, eEpochCategory diff --git a/src/vt/epoch/epoch_manip.h b/src/vt/epoch/epoch_manip.h index 342f803da1..37acd24145 100644 --- a/src/vt/epoch/epoch_manip.h +++ b/src/vt/epoch/epoch_manip.h @@ -107,6 +107,29 @@ struct EpochManip : runtime::component::Component { */ static bool isDep(EpochType epoch); + /** + * \brief Gets whether a dependent epoch is released + * + * \param[in] epoch the epoch + * + * \return whether it is released + */ + static bool isDepReleased(EpochType epoch); + + /** + * \brief Clears the released bit for a dependent epoch + * + * \param[in,out] epoch the epoch + */ + static void clearDepReleasedBit(EpochType& epoch); + + /** + * \brief Sets the released bit for a dependent epoch + * + * \param[in] epoch the epoch + */ + static void setDepReleasedBit(EpochType& epoch); + /** * \brief Gets the \c eEpochCategory of a given epoch * diff --git a/src/vt/epoch/epoch_window.cc b/src/vt/epoch/epoch_window.cc index a37533564a..cbe2a457bd 100644 --- a/src/vt/epoch/epoch_window.cc +++ b/src/vt/epoch/epoch_window.cc @@ -43,6 +43,7 @@ #include "vt/epoch/epoch_window.h" #include "vt/epoch/epoch_manip.h" +#include "vt/context/context.h" #include @@ -82,8 +83,16 @@ EpochWindow::EpochWindow(EpochType epoch) { interval.lower(), interval.upper() ); - // All epochs in a given window start out terminated (thus, reusable). - terminated_epochs_.insertInterval(interval); + if ( + EpochManip::isRooted(arch_epoch) and + EpochManip::node(arch_epoch) != theContext()->getNode() + ) { + // We shouldn't put these as terminated otherwise we might think they are + // terminated before they actually are + } else { + // All epochs in a given window start out terminated (thus, reusable). + terminated_epochs_.insertInterval(interval); + } vt_debug_print( normal, term, diff --git a/src/vt/messaging/collection_chain_set.h b/src/vt/messaging/collection_chain_set.h index 93829e1bbd..cd8dde38a6 100644 --- a/src/vt/messaging/collection_chain_set.h +++ b/src/vt/messaging/collection_chain_set.h @@ -46,6 +46,8 @@ #include "vt/config.h" #include "vt/messaging/dependent_send_chain.h" +#include "vt/messaging/task_collective.h" +#include "vt/messaging/task_region.h" #include #include @@ -79,7 +81,7 @@ enum ChainSetLayout { template class CollectionChainSet final { public: - CollectionChainSet() = default; + CollectionChainSet(); CollectionChainSet(const CollectionChainSet&) = delete; CollectionChainSet(CollectionChainSet&&) = delete; @@ -160,8 +162,10 @@ class CollectionChainSet final { void removeIndex(Index idx) { auto iter = chains_.find(idx); vtAssert(iter != chains_.end(), "Cannot remove a non-present chain"); + iter->second.done(); vtAssert( - iter->second.isTerminated(), "Cannot remove a chain with pending work"); + iter->second.isTerminated(), "Cannot remove a chain with pending work" + ); chains_.erase(iter); } @@ -348,11 +352,85 @@ class CollectionChainSet final { } } + /** + * \brief Start a task collective region (called collectively) + */ + void startTasksCollective() { + tasks_ep_ = theTerm()->makeEpochCollective("startTasksCollective"); + vt::theMsg()->pushEpoch(tasks_ep_); + } + + /** + * \brief Wait for the collective tasks to finish: this is also a collective + * call + */ + void waitForTasksCollective() { + task_manager_.get()->dispatchWork(); + + vt::theMsg()->popEpoch(tasks_ep_); + theTerm()->finishedEpoch(tasks_ep_); + runSchedulerThrough(tasks_ep_); + } + + /** + * \brief Create a new task region with a callable that creates the tasks + * + * \param[in] c the callable + * + * \return the task region which can run the callable to enqueue the tasks + */ + template + std::unique_ptr> createTaskRegion(Callable&& c) { + return std::make_unique>( + std::forward(c), task_manager_ + ); + } + + /** + * \brief Create a collective task for each element of a collection + * + * \param[in] label label to task collective + * \param[in] task_action task for each element of the collection + * + * \return the task collective handle + */ + task::TaskCollective* taskCollective( + std::string const& label, + std::function*)> task_action + ) { + auto tc = task_manager_.get()->addTaskCollective(proxy_); + + for (auto& [idx, chain] : chains_) { + // Create a dep epoch + auto ep = theTerm()->makeEpochRooted( + label, term::UseDS{true}, term::ParentEpochCapture{}, true + ); + vt::theMsg()->pushEpoch(ep); + + tc->addTask(idx, ep); + tc->setContext(&idx); + task_action(idx, tc); + tc->setContext(nullptr); + + vt::theMsg()->popEpoch(ep); + theTerm()->finishedEpoch(ep); + + tc->checkDone(idx); + } + return tc; + } + private: /// Set of \c DependentSendChain managed on this node for indices std::unordered_map chains_; /// Deallocator that type erases element listener de-registration std::function deallocator_; + /// The underlying proxy + VirtualProxyType proxy_ = no_vrt_proxy; + /// Task collective manager + objgroup::proxy::Proxy> task_manager_; + /// Task grouping epoch + EpochType tasks_ep_ = no_epoch; }; }} /* end namespace vt::messaging */ diff --git a/src/vt/messaging/collection_chain_set.impl.h b/src/vt/messaging/collection_chain_set.impl.h index 424ea395fc..418e334d32 100644 --- a/src/vt/messaging/collection_chain_set.impl.h +++ b/src/vt/messaging/collection_chain_set.impl.h @@ -49,6 +49,13 @@ namespace vt { namespace messaging { +template +CollectionChainSet::CollectionChainSet() { + task_manager_ = task::TaskCollectiveManager::construct( + true, no_vrt_proxy + ); +} + template template CollectionChainSet::CollectionChainSet( @@ -69,6 +76,8 @@ CollectionChainSet::CollectionChainSet( auto const this_node = theContext()->getNode(); auto const proxy_bits = proxy.getProxy(); + proxy_ = proxy_bits; + ListenerType l = [=](ElementEventEnum event, IndexT idx, NodeType home) { switch (event) { case ElementEventEnum::ElementCreated: @@ -94,6 +103,7 @@ CollectionChainSet::CollectionChainSet( vtAssert(layout == Home, "Must be a home layout"); p[home].template send(idx); } + break; case ElementEventEnum::ElementMigratedOut: if (layout == Local) { removeIndex(idx); @@ -124,6 +134,11 @@ CollectionChainSet::CollectionChainSet( proxy_bits, listener ); }; + + task_manager_ = task::TaskCollectiveManager::construct( + layout == ChainSetLayout::Local, + proxy_bits + ); } }} /* end namespace vt::messaging */ diff --git a/src/vt/messaging/task_collective.h b/src/vt/messaging/task_collective.h new file mode 100644 index 0000000000..80851eedd2 --- /dev/null +++ b/src/vt/messaging/task_collective.h @@ -0,0 +1,481 @@ +/* +//@HEADER +// ***************************************************************************** +// +// task_collective.h +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#if !defined INCLUDED_VT_MESSAGING_TASK_COLLECTIVE_H +#define INCLUDED_VT_MESSAGING_TASK_COLLECTIVE_H + +#include "vt/vrt/collection/manager.fwd.h" +#include "vt/objgroup/manager.h" +#include "vt/topos/location/manager.h" +#include "vt/pipe/pipe_manager.h" + +#include + +namespace vt::task { + +/// Task ID type to identify a collective task +using TaskIDType = uint64_t; + +/// No task sentinel +constexpr TaskIDType const no_task = 0; + +template +struct TaskCollectiveManager; + +/** + * \struct TaskCollective + * + * \brief A collective task that runs across a set of indices + */ +template +struct TaskCollective { + + /// Tag to create a new collective task + struct NewTaskTag {}; + + TaskCollective() = default; + + /** + * \internal \brief Create a new collective task + * + * \param[in] NewTaskTag tag + * \param[in] in_proxy the collection proxy + * \param[in] in_manager collective task manager + */ + TaskCollective( + NewTaskTag, + VirtualProxyType in_proxy, + TaskCollectiveManager* in_manager + ) : id_(in_manager->getNextID()), + proxy_(in_proxy), + manager_(in_manager) + { } + + /** + * \internal \brief Add a new task for an index + * + * \param[in] idx the index + * \param[in] ep the rooted, dependent epoch + */ + void addTask(Index idx, EpochType ep) { + vt_debug_print( + normal, gen, + "add: idx={}, ep={}, id={}\n", idx, ep, id_ + ); + + vtAssert(epochs_.find(idx) == epochs_.end(), "Must not exist"); + epochs_[idx] = ep; + } + + /** + * \brief Add a set of dependencies for this task + * + * \param[in] deps set of dependencies + */ + void dependsOn(std::initializer_list*>> deps) { + vtAssert(context_ != 0, "Must be in a proper context to add dependency"); + + for (auto const& [idx, tc] : deps) { + deps_[*context_].emplace(idx, tc->getID()); + } + } + + /** + * \brief Add a dependency for a task + * + * \param[in] idx the index it depends on + * \param[in] tc the task it depends on + */ + void dependsOn(Index idx, TaskCollective* tc) { + vtAssert(context_ != 0, "Must be in a proper context to add dependency"); + + vt_debug_print( + normal, gen, + "dependsOn: idx={}, dep_idx={}, id={}\n", + *context_, idx, tc->getID() + ); + + deps_[*context_].emplace(idx, tc->getID()); + } + + /** + * \internal \brief Set context for the task collective + * + * \param[in] idx the current index running its task + */ + void setContext(Index const* idx) { + context_ = idx; + } + + /** + * \brief Get the underlying ID for the task collective + * + * \return the ID + */ + TaskIDType getID() const { return id_; } + + /** + * \brief Remove a dependency + * + * \param[in] idx the index + * \param[in] dep_idx the index it depends on + * \param[in] tid the task collective it depends on + */ + void removeDependency(Index idx, Index dep_idx, TaskIDType tid) { + vt_debug_print(terse, gen, "idx={}, dep_idx={}, tid={}\n", idx, dep_idx, tid); + auto it = deps_.find(idx); + vtAssertExpr(it != deps_.end()); + it->second.erase(it->second.find(std::make_tuple(dep_idx, tid))); + if (it->second.size() == 0) { + deps_.erase(it); + checkDone(idx); + } + } + + /** + * \brief Check if a task is ready to run + * + * \param[in] idx the index to check + * + * \return whether it is ready to run + */ + bool checkDone(Index idx) { + auto iter = deps_.find(idx); + + // No dependencies, release immediately + if (iter == deps_.end()) { + vt_debug_print( + normal, gen, + "checkDone: idx={}, epochs_[idx]={:x}\n", + idx, epochs_[idx] + ); + + if constexpr (std::is_same_v) { + theSched()->fullyReleaseEpoch(epochs_[idx]); + } else { + vrt::collection::fullyReleaseEpoch(proxy_, idx, epochs_[idx]); + } + return true; + } else { + auto const& dep_set = iter->second; + for (auto const& [dep_idx, id] : dep_set) { + auto dep_tc = manager_->getTaskCollective(id); + if (auto i = dep_tc->epochs_.find(dep_idx); i != dep_tc->epochs_.end()) { + auto dep_ep = i->second; + theTerm()->addAction( + dep_ep, + [this,idx,dep_idx,id]{ removeDependency(idx, dep_idx, id); } + ); + } else { + manager_->getDepInfo(idx, dep_idx, id_, id); + } + } + } + return false; + } + + /** + * \brief Get the epoch for a given index + * + * \param[in] idx the index + * + * \return the epoch + */ + EpochType getEpochForIdx(Index idx) const { + vtAssert(epochs_.find(idx) != epochs_.end(), "Epoch must exist for index"); + return epochs_.find(idx)->second; + } + +private: + /// The ID for the task collective + TaskIDType id_ = no_task; + /// The indices and epochs for each task + std::unordered_map epochs_; + /// The current index context + Index const* context_ = nullptr; + /// The dependencies for each index + std::unordered_map>> deps_; + /// The underlying collection proxy + VirtualProxyType proxy_ = no_vrt_proxy; + /// The task collective manager + TaskCollectiveManager* manager_ = nullptr; +}; + +/** + * \struct TaskCollectiveManager + * + * \brief Manager for collective tasks + * + */ +template +struct TaskCollectiveManager { + using ThisType = TaskCollectiveManager; + using TaskPtrType = std::unique_ptr>; + using CallbackType = Callback< + Index, std::vector> + >; + + /** + * \brief Construct the objgroup for managing collective tasks + * + * \param[in] in_local_tracking whether the collection uses local tracking + * \param[in] in_col_proxy the collection proxy + * + * \return the objgroup proxy + */ + static auto construct(bool in_local_tracking, VirtualProxyType in_col_proxy) { + auto proxy = theObjGroup()->makeCollective("TCManager"); + proxy.get()->setProxy(proxy); + proxy.get()->local_tracking_ = in_local_tracking; + proxy.get()->col_proxy_ = in_col_proxy; + return proxy; + } + + /** + * \brief Get the next ID for a new task collective + * + * \return a new ID + */ + TaskIDType getNextID() { return cur_id_++; } + + /** + * \brief Add a new collective task + * + * \param[in] in_proxy the collection proxy + * + * \return the task collective + */ + TaskCollective* addTaskCollective(VirtualProxyType in_proxy) { + auto tc = std::make_unique>( + typename task::TaskCollective::NewTaskTag{}, in_proxy, this + ); + auto tc_raw = tc.get(); + auto tc_id = tc->getID(); + tasks_.emplace(tc_id, std::move(tc)); + return tc_raw; + } + + /** + * \brief Get a task collective from the ID + * + * \param[in] id the ID + * + * \return the task collective + */ + TaskCollective* getTaskCollective(TaskIDType id) { + if (auto it = tasks_.find(id); it != tasks_.end()) { + return it->second.get(); + } + return nullptr; + } + + /** + * \brief Fulfill dependency requests and send out any pending dependency + * requests + */ + void dispatchWork() { + wait_iter_++; + + // Send out pending dependency requests + for (auto const& [dep_idx, vec] : pending_deps_) { + getDepInfoImpl(dep_idx, vec); + } + pending_deps_.clear(); + + // Fulfill incoming requests now that we have all the task information + for (auto const& [dep_idx, vec] : pending_lookups_) { + for (auto const& [lookups, cb] : vec) { + depInfoHan(dep_idx, lookups, cb, wait_iter_); + } + } + pending_lookups_.clear(); + } + + /** + * \internal \brief Request dependency info, buffers until ready to group and + * send out + * + * \param[in] idx The requesting index + * \param[in] dep_idx The dependency index + * \param[in] id The requesting task collective + * \param[in] dep_id The dependency task collective + */ + void getDepInfo(Index idx, Index dep_idx, TaskIDType id, TaskIDType dep_id) { + pending_deps_[dep_idx].emplace_back(DepInfo{idx, id, dep_id}); + } + +protected: + /** + * \struct DepInfo + * + * \brief \c DepInfo holder + */ + struct DepInfo { + using isByteCopyable = std::true_type; + + Index idx; /**< Requesting index */ + TaskIDType id; /**< The task collective for requesting index */ + TaskIDType dep_id; /**< The task collective to the dependency */ + }; + + /** + * \brief Handle a dependency info request + * + * \param[in] dep_idx the dependency index + * \param[in] vec a set of dependencies for this index + * \param[in] cb callback to send back epochs + * \param[in] wait_iter the wait collective iteration + */ + void depInfoHan( + Index dep_idx, std::vector const& vec, CallbackType cb, + int64_t wait_iter + ) { + if (wait_iter == wait_iter_) { + std::vector> result; + for (auto const& [idx, id, dep_id] : vec) { + auto tc =getTaskCollective(dep_id); + auto ep = tc->getEpochForIdx(dep_idx); + result.emplace_back(idx, id, dep_id, ep); + } + cb.send(dep_idx, std::move(result)); + } else { + pending_lookups_[dep_idx].emplace_back(vec, cb); + } + } + + /** + * \internal \brief Callback function handler for receiving dependency + * information + * + * \param[in] dep_idx The index dependent on (all from the same target node) + * \param[in] res The result vector + */ + void depRecvInfoHan( + Index dep_idx, + std::vector> const& res + ) { + for (auto [idx, id, dep_id, ep] : res) { + vt_debug_print( + normal, gen, + "depRecvInfoHan: idx={}, dep_idx={}, id={}, dep_id={}, ep={:x}\n", + idx, dep_idx, id, dep_id, ep + ); + + auto tc = getTaskCollective(id); + theTerm()->addAction( + ep, [=]{ tc->removeDependency(idx, dep_idx, dep_id); } + ); + } + } + + void getDepInfoImpl(Index dep_idx, std::vector const& vec) { + NodeType mapped_node = uninitialized_destination; + + if constexpr (std::is_same_v) { + mapped_node = dep_idx; + } else { + mapped_node = vrt::collection::getMappedNodeElm(col_proxy_, dep_idx); + } + + vt_debug_print( + terse, gen, + "getDepInfoImpl: dep_idx={}\n", + dep_idx + ); + + auto send_dep = [=](NodeType node) { + auto const this_node = theContext()->getNode(); + auto cb = theCB()->makeSend<&ThisType::depRecvInfoHan>(proxy_[this_node]); + vt_debug_print(terse, gen, "send_dep: dep_idx={}, node={}\n", dep_idx, node); + proxy_[node].template send<&ThisType::depInfoHan>(dep_idx, vec, cb, wait_iter_); + }; + + if constexpr (std::is_same_v) { + // Node-based chain handling + send_dep(mapped_node); + } else { + // Collection handling + if (local_tracking_) { + // Fetch the current location of the element + auto lm = theLocMan()->getCollectionLM(col_proxy_); + lm->getLocation(dep_idx, mapped_node, send_dep); + } else { + // Tracking on home processor, use the mapped node + send_dep(mapped_node); + } + } + } + + /** + * \internal \brief Set the proxy for the objgroup + * + * \param[in] proxy the proxy + */ + void setProxy(objgroup::proxy::Proxy proxy) { + proxy_ = proxy; + } + +private: + /// The objgroup proxy + objgroup::proxy::Proxy proxy_; + /// Collective tasks map + std::unordered_map tasks_; + /// Whether we are using local tracking + bool local_tracking_ = false; + /// The underlying collection proxy + VirtualProxyType col_proxy_ = no_vrt_proxy; + /// Pending dependency requests from other nodes + std::unordered_map< + Index, std::vector, CallbackType>> + > pending_lookups_; + /// Pending dependencies to resolve, waiting for all tasks to be created + std::unordered_map> pending_deps_; + /// The current wait iteration + int64_t wait_iter_ = 0; + /// The next task collective ID + TaskIDType cur_id_ = 1; +}; + +} /* end namespace vt::task */ + +#endif /*INCLUDED_VT_MESSAGING_TASK_COLLECTIVE_H*/ diff --git a/src/vt/messaging/task_region.h b/src/vt/messaging/task_region.h new file mode 100644 index 0000000000..3e6208638b --- /dev/null +++ b/src/vt/messaging/task_region.h @@ -0,0 +1,125 @@ +/* +//@HEADER +// ***************************************************************************** +// +// task_region.h +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#if !defined INCLUDED_VT_MESSAGING_TASK_REGION_H +#define INCLUDED_VT_MESSAGING_TASK_REGION_H + +#include "vt/config.h" + +namespace vt::task { + +/** + * \struct TaskRegion + * + * \brief A region of tasks that can be enqueued, which are grouped in an epoch. + */ +template +struct TaskRegion { + + /** + * \internal \brief Create a task region + * + * \param[in] in_callable the callable that contains the tasks + * \param[in] in_task_manager the task manager + */ + TaskRegion( + ActionType in_callable, + objgroup::proxy::Proxy> in_task_manager + ) : callable_(in_callable), + task_manager_(in_task_manager) + { } + + /** + * \brief Get the associated epoch + * + * \return the epoch + */ + EpochType getEpoch() const { return ep_; } + + /** + * \brief Run the callable and enqueue the tasks + */ + void enqueueTasks() { + if (ep_ == no_epoch) { + ep_ = theTerm()->makeEpochCollective("TaskRegion::enqueueTasks"); + } + + theMsg()->pushEpoch(ep_); + callable_(); + theMsg()->popEpoch(ep_); + } + + /** + * \brief Tell the region we are done enqueue tasks and to dispatch the work + */ + void finishedEnqueuing() { + if (ep_ != no_epoch) { + task_manager_.get()->dispatchWork(); + theTerm()->finishedEpoch(ep_); + } + } + + /** + * \brief Block on completion of the enqueued work + */ + void waitCollective() { + if (ep_ != no_epoch) { + task_manager_.get()->dispatchWork(); + theTerm()->finishedEpoch(ep_); + runSchedulerThrough(ep_); + ep_ = no_epoch; + } + } + +private: + /// The associated epoch + EpochType ep_ = no_epoch; + /// The callable containing the tasks + ActionType callable_ = nullptr; + /// The task manager + objgroup::proxy::Proxy> task_manager_; +}; + +} /* end namespace vt::task */ + +#endif /*INCLUDED_VT_MESSAGING_TASK_REGION_H*/ diff --git a/src/vt/runnable/make_runnable.h b/src/vt/runnable/make_runnable.h index e3b713b83e..7043a49921 100644 --- a/src/vt/runnable/make_runnable.h +++ b/src/vt/runnable/make_runnable.h @@ -326,20 +326,24 @@ struct RunnableMaker { * \param[in] handler the handler bits * \param[in] from the node that caused this runnable to execute * \param[in] han_type the type of handler + * \param[in] trace whether to trace this runnable * * \return the maker for further customization */ template RunnableMaker makeRunnable( - MsgSharedPtr const& msg, bool is_threaded, HandlerType handler, NodeType from + MsgSharedPtr const& msg, bool is_threaded, HandlerType handler, + NodeType from, bool trace = true ) { auto r = new RunnableNew(msg, is_threaded); #if vt_check_enabled(trace_enabled) - auto const han_type = HandlerManager::getHandlerRegistryType(handler); - if (han_type == auto_registry::RegistryTypeEnum::RegVrt or - han_type == auto_registry::RegistryTypeEnum::RegGeneral or - han_type == auto_registry::RegistryTypeEnum::RegObjGroup) { - r->addContextTrace(msg, handler, from); + if (trace) { + auto const han_type = HandlerManager::getHandlerRegistryType(handler); + if (han_type == auto_registry::RegistryTypeEnum::RegVrt or + han_type == auto_registry::RegistryTypeEnum::RegGeneral or + han_type == auto_registry::RegistryTypeEnum::RegObjGroup) { + r->addContextTrace(msg, handler, from); + } } #endif r->addContextSetContext(r, from); diff --git a/src/vt/runnable/runnable.cc b/src/vt/runnable/runnable.cc index 2a42b5a314..97634afc2f 100644 --- a/src/vt/runnable/runnable.cc +++ b/src/vt/runnable/runnable.cc @@ -256,6 +256,11 @@ EpochType RunnableNew::getEpoch() const { } } +void RunnableNew::setEpochReleasedBit() { + vtAssertExpr(contexts_.has_td); + contexts_.td.setEpochReleasedBit(); +} + void RunnableNew::send(elm::ElementIDStruct elm, MsgSizeType bytes) { if (contexts_.has_lb) contexts_.lb.send(elm, bytes); } diff --git a/src/vt/runnable/runnable.h b/src/vt/runnable/runnable.h index b2da85d583..6e8d8d1c03 100644 --- a/src/vt/runnable/runnable.h +++ b/src/vt/runnable/runnable.h @@ -360,6 +360,11 @@ struct RunnableNew { */ EpochType getEpoch() const; + /** + * \brief Set the epoch released bit for a runnable + */ + void setEpochReleasedBit(); + /** * \brief Whether this runnable targets an object group * diff --git a/src/vt/scheduler/scheduler.cc b/src/vt/scheduler/scheduler.cc index fc9725125e..b2e8a14ace 100644 --- a/src/vt/scheduler/scheduler.cc +++ b/src/vt/scheduler/scheduler.cc @@ -429,6 +429,32 @@ void Scheduler::releaseEpochCollection(EpochType ep, UntypedCollection* untyped) } } +void Scheduler::fullyReleaseEpoch(EpochType ep) { + auto run_through_container = [this](auto& container) { + while (container.size() > 0) { + auto unit = container.pop(); + unit.getRunnable()->setEpochReleasedBit(); + work_queue_.emplace(std::move(unit)); + } + }; + + if (auto result = pending_work_.extract(ep); result) { + run_through_container(result.mapped()); + } + + if (auto result = pending_collection_work_.extract(ep); result) { + for (auto& [obj, queue] : result.mapped()) { + run_through_container(queue); + } + } + + if (auto result = pending_objgroup_work_.extract(ep); result) { + for (auto& [obj, queue] : result.mapped()) { + run_through_container(queue); + } + } +} + bool Scheduler::isReleasedEpochObjgroup( EpochType ep, ObjGroupProxyType proxy ) const { diff --git a/src/vt/scheduler/scheduler.h b/src/vt/scheduler/scheduler.h index 7fb423fe0d..713a4d5c91 100644 --- a/src/vt/scheduler/scheduler.h +++ b/src/vt/scheduler/scheduler.h @@ -393,6 +393,8 @@ struct Scheduler : runtime::component::Component { */ void releaseEpochCollection(EpochType ep, UntypedCollection* untyped); + void fullyReleaseEpoch(EpochType ep); + /** * \brief Check if a epoch is released for an objgroup * diff --git a/src/vt/scheduler/scheduler.impl.h b/src/vt/scheduler/scheduler.impl.h index e2f5199243..7ba6d11f43 100644 --- a/src/vt/scheduler/scheduler.impl.h +++ b/src/vt/scheduler/scheduler.impl.h @@ -126,7 +126,7 @@ void Scheduler::enqueueOrPostpone(UnitT unit) { if (m and not m->env.system_msg) { auto ep = r->getEpoch(); bool const is_dep = epoch::EpochManip::isDep(ep); - if (is_dep) { + if (is_dep and not epoch::EpochManip::isDepReleased(ep)) { auto obj = r->getObj(); if (ep != no_epoch and ep != term::any_epoch_sentinel) { if (obj) { diff --git a/src/vt/termination/term_action.cc b/src/vt/termination/term_action.cc index 45ef08f4a8..ea92e412a0 100644 --- a/src/vt/termination/term_action.cc +++ b/src/vt/termination/term_action.cc @@ -90,7 +90,7 @@ void TermAction::queueActions(EpochType epoch) { auto msg = makeMessage(epoch, encap_epoch); auto const han = auto_registry::makeAutoHandler(); auto const this_node = theContext()->getNode(); - runnable::makeRunnable(msg, true, han, this_node) + runnable::makeRunnable(msg, true, han, this_node, false) .withTDEpoch(encap_epoch) .enqueue(); }; diff --git a/src/vt/termination/termination.cc b/src/vt/termination/termination.cc index 56fd29851e..c1d0e34c40 100644 --- a/src/vt/termination/termination.cc +++ b/src/vt/termination/termination.cc @@ -120,7 +120,12 @@ void TerminationDetector::setLocalTerminated( } TerminationDetector::TermStateType& -TerminationDetector::findOrCreateState(EpochType const& epoch, bool is_ready) { +TerminationDetector::findOrCreateState(EpochType const& epoch_in, bool is_ready) { + EpochType epoch = epoch_in; + if (isDep(epoch_in)) { + epoch::EpochManip::clearDepReleasedBit(epoch); + } + auto const& num_children_ = getNumChildren(); bool const local_term = is_ready; @@ -145,7 +150,12 @@ TerminationDetector::findOrCreateState(EpochType const& epoch, bool is_ready) { } TerminationDetector::TermStateDSType* -TerminationDetector::getDSTerm(EpochType epoch, bool is_root) { +TerminationDetector::getDSTerm(EpochType epoch_in, bool is_root) { + EpochType epoch = epoch_in; + if (isDep(epoch_in)) { + epoch::EpochManip::clearDepReleasedBit(epoch); + } + vt_debug_print( verbose, termds, "getDSTerm: epoch={:x}, is_rooted={}, is_ds={}\n", @@ -668,10 +678,8 @@ void TerminationDetector::epochTerminated(EpochType const& epoch, CallFromEnum f // Matching consume on global epoch once a nested epoch terminates if (epoch != any_epoch_sentinel) { bool const is_rooted = isRooted(epoch); - bool const is_ds = isDS(epoch); if ( not is_rooted or - is_ds or (is_rooted and epoch::EpochManip::node(epoch) == this_node_) ) { consumeOnGlobal(epoch); @@ -705,6 +713,7 @@ void TerminationDetector::inquireTerminated( bool const is_ready = true; auto msg = makeMessage(epoch,is_ready); + theMsg()->markAsTermMessage(msg); theMsg()->sendMsg(from, msg); }); } @@ -773,6 +782,7 @@ TermStatusEnum TerminationDetector::testEpochTerminated(EpochType epoch) { * terminated or not */ auto msg = makeMessage(epoch,this_node_); + theMsg()->markAsTermMessage(msg); theMsg()->sendMsg(root, msg); epoch_wait_status_.insert(epoch); } @@ -1146,6 +1156,10 @@ bool TerminationDetector::isEpochReleased(EpochType epoch) { return true; } + if (epoch::EpochManip::isDepReleased(epoch)) { + return true; + } + // Terminated epochs are always released bool const is_term = theEpoch()->getTerminatedWindow(epoch)->isTerminated( epoch diff --git a/src/vt/vrt/collection/manager.fwd.h b/src/vt/vrt/collection/manager.fwd.h index ea44b370a7..f3c0115af9 100644 --- a/src/vt/vrt/collection/manager.fwd.h +++ b/src/vt/vrt/collection/manager.fwd.h @@ -54,6 +54,12 @@ struct CollectionManager; DispatchBasePtrType getDispatcher(auto_registry::AutoHandlerType const han); +template +void fullyReleaseEpoch(VirtualProxyType proxy, Index idx, EpochType ep); + +template +NodeType getMappedNodeElm(VirtualProxyType proxy, Index idx); + }}} /* end namespace vt::vrt::collection */ namespace vt { diff --git a/src/vt/vrt/collection/manager.h b/src/vt/vrt/collection/manager.h index 3a4dd0a181..b67ffbbf1b 100644 --- a/src/vt/vrt/collection/manager.h +++ b/src/vt/vrt/collection/manager.h @@ -1525,6 +1525,11 @@ struct CollectionManager template friend struct param::ConstructParams; + template + friend void fullyReleaseEpoch( + VirtualProxyType proxy, Index idx, EpochType ep + ); + /** * \internal \brief Migrate an element out of this node * diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index f9258e65af..8b00cb119a 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -2375,6 +2375,21 @@ template } } +template +void fullyReleaseEpoch(VirtualProxyType proxy, Index idx, EpochType ep) { + auto elm_holder = theCollection()->findElmHolder(proxy); + auto const elm_exists = elm_holder->exists(idx); + vtAssertExpr(elm_exists); + auto ptr = elm_holder->lookup(idx).getRawPtr(); + ptr->addReleasedEpoch(ep); + theSched()->fullyReleaseEpoch(ep); +} + +template +NodeType getMappedNodeElm(VirtualProxyType proxy, Index idx) { + return theCollection()->getMappedNode(proxy, idx); +} + }}} /* end namespace vt::vrt::collection */ #include "vt/vrt/collection/collection_builder.impl.h" diff --git a/tests/unit/epoch/test_task_collective.cc b/tests/unit/epoch/test_task_collective.cc new file mode 100644 index 0000000000..59dd94acf4 --- /dev/null +++ b/tests/unit/epoch/test_task_collective.cc @@ -0,0 +1,121 @@ +/* +//@HEADER +// ***************************************************************************** +// +// test_task_collective.cc +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#include + +#include +#include +#include + +#include "test_harness.h" +#include "test_parallel_harness.h" + +namespace vt { namespace tests { namespace unit { + +using TestTaskCollective = TestParallelHarness; + +struct TestGroup { + + void task1(int val) { + vt_print(gen, "val={}, t1={}\n", val, t1); + t1++; + } + + void task2(int val) { + vt_print(gen, "val={}, t1={}, t2={}\n", val, t1, t2); + t2++; + EXPECT_EQ(t1, t2); + } + + void task3(int val) { + vt_print(gen, "val={}, t1={}, t2={}, t3={}\n", val, t1, t2, t3); + t3++; + EXPECT_EQ(t1, t2); + EXPECT_EQ(t2, t3); + } + + void setProxy(objgroup::proxy::Proxy in_proxy) { + proxy_ = in_proxy; + } + +private: + int t1 = 0, t2 = 0, t3 = 0, t4 = 0; + objgroup::proxy::Proxy proxy_; +}; + +TEST_F(TestTaskCollective, test_node_task_collective_1) { + using ChainSetType = messaging::CollectionChainSet; + + auto const num_nodes = theContext()->getNumNodes(); + + auto proxy = theObjGroup()->makeCollective("TestGroup"); + proxy.get()->setProxy(proxy); + + auto chains_ = std::make_unique(); + chains_->addIndex(theContext()->getNode()); + + auto tr = chains_->createTaskRegion([&]{ + auto t1 = chains_->taskCollective("task1", [&](auto node, auto t) { + return proxy[node].template send<&TestGroup::task1>(10); + }); + + auto t2 = chains_->taskCollective("task2", [&](auto node, auto t) { + t->dependsOn(node, t1); + t->dependsOn((node+1)%num_nodes, t1); + return proxy[node].template send<&TestGroup::task2>(10); + }); + + /*auto t3 = */chains_->taskCollective("task3", [&](auto node, auto t) { + t->dependsOn(node, t2); + return proxy[node].template send<&TestGroup::task3>(10); + }); + }); + + // Do this a bunch of times + for (int i = 0; i < 100; i++) { + tr->enqueueTasks(); + tr->waitCollective(); + } +} + +}}} // end namespace vt::tests::unit