Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#410: Dependent Epochs rewritten #2204

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ccd5afd
#410: epoch: change unused InsertEpoch to DependentEpoch
lifflander Jun 17, 2019
b393469
#410: epoch: add function to bit-combine epoch category bits
lifflander Jun 19, 2019
9a1efe8
#410: termination: add isDep check
lifflander Sep 28, 2023
72fbfec
#410: term: implement dependent epochs
lifflander Jun 20, 2019
5cd8599
#410: test: add release dependent epoch test
lifflander Jun 20, 2019
10147f4
#410: reduce: fix warning
lifflander Oct 12, 2023
1db25fa
#410: epoch: add test, move pending epochs to scheduler
lifflander Oct 12, 2023
a9816c6
#410: epoch: rework deps, objgroup dep epochs, scheduler buffers
lifflander Oct 16, 2023
4de8a74
#410: objgroup: implement objgroup proxy functions for dependent epochs
lifflander Oct 17, 2023
bfdc3f4
#410: collection: add dependent epochs to collections, system message…
lifflander Oct 18, 2023
3c1ac7f
#410: test: add new test for dep epochs and collections
lifflander Aug 16, 2019
cdf69ab
#410: collection: add missing header include
lifflander Oct 18, 2023
9764708
#410: tests: cleanup tests, fix name collison
lifflander Oct 18, 2023
a974aba
#410: tests: fix license
lifflander Oct 18, 2023
3e76af5
#410: tests: fix some small compilation errors
lifflander Oct 18, 2023
3d2b117
#410: collection: switch broadcast after system broadcast to user msg
lifflander Oct 18, 2023
f746269
#410: collection: fix missing system message designation
lifflander Oct 19, 2023
cb2b519
#410: tests: rewrite dep epoch test to fix logic error
lifflander Oct 19, 2023
4b3a9c6
#410: termination: remove unneeded code, cleanup scheduler
lifflander Oct 31, 2023
48f7a8c
#410: termination: cleanup more code---remove unecessary condition
lifflander Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/vt/collective/reduce/reduce_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@

namespace vt { namespace collective { namespace reduce {

static std::unique_ptr<Reduce> makeReduceScope(detail::ReduceScope const& scope) {
return std::make_unique<Reduce>(scope);
}

ReduceManager::ReduceManager()
: reducers_( // default cons reducer for non-group
[](detail::ReduceScope const& scope) {
return std::make_unique<Reduce>(scope);
}
)
: reducers_(makeReduceScope)
{
// insert the default reducer scope
reducers_.make(
Expand Down
7 changes: 7 additions & 0 deletions src/vt/context/runnable_context/td.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ struct TD {
*/
void resume();

/**
* \brief Get epoch for this context
*
* \return the epoch
*/
EpochType getEpoch() const { return ep_; }

private:
EpochType ep_ = no_epoch; /**< The epoch for the task */
#if vt_check_enabled(fcontext)
Expand Down
2 changes: 1 addition & 1 deletion src/vt/epoch/epoch.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ static constexpr BitCountType const epoch_category_num_bits = 2;
*/
enum struct eEpochCategory : int8_t {
NoCategoryEpoch = 0x0,
InsertEpoch = 0x1,
DependentEpoch = 0x1,
DijkstraScholtenEpoch = 0x2
};

Expand Down
64 changes: 43 additions & 21 deletions src/vt/epoch/epoch_manip.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ namespace vt { namespace epoch {

static EpochType const arch_epoch_coll = makeEpochZero();

EpochManip::EpochManip()
: terminated_collective_epochs_(
std::make_unique<EpochWindow>(arch_epoch_coll)
)
{ }

/*static*/ EpochType EpochManip::generateEpoch(
bool const& is_rooted, NodeType const& root_node,
eEpochCategory const& category
Expand Down Expand Up @@ -123,22 +117,19 @@ EpochType EpochManip::getArchetype(EpochType epoch) const {

EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) {
auto const is_rooted = isRooted(epoch);
if (is_rooted and epoch != term::any_epoch_sentinel) {
auto const& arch_epoch = getArchetype(epoch);
auto iter = terminated_epochs_.find(arch_epoch);
if (iter == terminated_epochs_.end()) {
terminated_epochs_.emplace(
std::piecewise_construct,
std::forward_as_tuple(arch_epoch),
std::forward_as_tuple(std::make_unique<EpochWindow>(arch_epoch))
);
iter = terminated_epochs_.find(arch_epoch);
}
return iter->second.get();
} else {
vtAssertExpr(terminated_collective_epochs_ != nullptr);
return terminated_collective_epochs_.get();
auto& container = is_rooted and epoch != term::any_epoch_sentinel ?
terminated_epochs_ : terminated_collective_epochs_;
auto const& arch_epoch = getArchetype(epoch);
auto iter = container.find(arch_epoch);
if (iter == container.end()) {
container.emplace(
std::piecewise_construct,
std::forward_as_tuple(arch_epoch),
std::forward_as_tuple(std::make_unique<EpochWindow>(arch_epoch))
);
iter = container.find(arch_epoch);
}
return iter->second.get();
}

/*static*/ bool EpochManip::isRooted(EpochType const& epoch) {
Expand All @@ -148,6 +139,29 @@ EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) {
return BitPackerType::boolGetField<field,size,ImplType>(*epoch);
}

/*static*/ bool EpochManip::isDS(EpochType epoch) {
using T = typename std::underlying_type<epoch::eEpochCategory>::type;
if (isRooted(epoch)) {
auto const ds_bit = epoch::eEpochCategory::DijkstraScholtenEpoch;
auto const cat = category(epoch);
bool const is_ds = static_cast<T>(cat) & static_cast<T>(ds_bit);
return is_ds;
} else {
return false;
}
}

/*static*/ bool EpochManip::isDep(EpochType epoch) {
using T = typename std::underlying_type<epoch::eEpochCategory>::type;
if (epoch == no_epoch or epoch == term::any_epoch_sentinel) {
return false;
}
auto const dep_bit = epoch::eEpochCategory::DependentEpoch;
auto const cat = epoch::EpochManip::category(epoch);
bool const is_dep = static_cast<T>(cat) & static_cast<T>(dep_bit);
return is_dep;
}

/*static*/ eEpochCategory EpochManip::category(EpochType const& epoch) {
return BitPackerType::getField<
eEpochRoot::rEpochCategory, epoch_category_num_bits, eEpochCategory
Expand Down Expand Up @@ -190,6 +204,14 @@ void EpochManip::setCategory(EpochType& epoch, eEpochCategory const cat) {
>(*epoch,cat);
}

/*static*/ eEpochCategory EpochManip::makeCat(
eEpochCategory c1, eEpochCategory c2
) {
using T = typename std::underlying_type<eEpochCategory>::type;
auto ret = static_cast<T>(c1) | static_cast<T>(c2);
return static_cast<eEpochCategory>(ret);
}

/*static*/
void EpochManip::setNode(EpochType& epoch, NodeType const node) {
vtAssert(isRooted(epoch), "Must be rooted to manipulate the node");
Expand Down
38 changes: 33 additions & 5 deletions src/vt/epoch/epoch_manip.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace vt { namespace epoch {
struct EpochManip : runtime::component::Component<EpochManip> {
using CapturedContextType = term::ParentEpochCapture;

EpochManip();
EpochManip() = default;

std::string name() override { return "EpochManip"; }

Expand All @@ -89,6 +89,24 @@ struct EpochManip : runtime::component::Component<EpochManip> {
*/
static bool isRooted(EpochType const& epoch);

/**
* \brief Gets whether an epoch is DS or onot
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo.

*
* \param[in] epoch the epoch
*
* \return whether it is DS
*/
static bool isDS(EpochType epoch);

/**
* \brief Gets whether an epoch is dependent or onot
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo.

*
* \param[in] epoch the epoch
*
* \return whether it is dependent
*/
static bool isDep(EpochType epoch);

/**
* \brief Gets the \c eEpochCategory of a given epoch
*
Expand Down Expand Up @@ -152,6 +170,14 @@ struct EpochManip : runtime::component::Component<EpochManip> {
*/
static void setSeq(EpochType& epoch, EpochType::ImplType const seq);

/**
* \brief Combine eEpochCategory elements
*
* \param[in] c1 category 1
* \param[in] c2 category 2
*/
static eEpochCategory makeCat(eEpochCategory c1, eEpochCategory c2);

/*
* General (stateless) methods for creating a epoch with certain properties
* based on a current sequence number
Expand Down Expand Up @@ -250,10 +276,12 @@ struct EpochManip : runtime::component::Component<EpochManip> {
}

private:
// epoch window container for specific archetyped epochs
std::unordered_map<EpochType,std::unique_ptr<EpochWindow>> terminated_epochs_;
// epoch window for basic collective epochs
std::unique_ptr<EpochWindow> terminated_collective_epochs_ = nullptr;
/// epoch window container for specific archetyped epochs
std::unordered_map<EpochType, std::unique_ptr<EpochWindow>> terminated_epochs_;
/// epoch window for basic collective epochs
std::unordered_map<
EpochType, std::unique_ptr<EpochWindow>
> terminated_collective_epochs_;
};

}} /* end namespace vt::epoch */
Expand Down
13 changes: 7 additions & 6 deletions src/vt/messaging/active.cc
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ void ActiveMessenger::finishPendingDataMsgAsyncRecv(InProgressDataIRecv* irecv)
theTerm()->consume(term::any_epoch_sentinel,1,sender);
theTerm()->hangDetectRecv();
};
theSched()->enqueue(irecv->priority, run);
theSched()->enqueueLambda(irecv->priority, run);
}
}

Expand Down Expand Up @@ -934,12 +934,13 @@ void ActiveMessenger::prepareActiveMsgToRun(
using MsgType = ShortMessage;
auto msg = base.to<MsgType>().get();

auto const is_term = envelopeIsTerm(msg->env);
auto const is_bcast = envelopeIsBcast(msg->env);
auto const dest = envelopeGetDest(msg->env);
auto const handler = envelopeGetHandler(msg->env);
auto const epoch = envelopeIsEpochType(msg->env) ?
auto const is_term = envelopeIsTerm(msg->env);
auto const is_bcast = envelopeIsBcast(msg->env);
auto const dest = envelopeGetDest(msg->env);
auto const handler = envelopeGetHandler(msg->env);
auto const epoch = envelopeIsEpochType(msg->env) ?
envelopeGetEpoch(msg->env) : term::any_epoch_sentinel;

auto const from_node = is_bcast ? dest : in_from_node;

if (!is_term || vt_check_enabled(print_term_msgs)) {
Expand Down
2 changes: 1 addition & 1 deletion src/vt/messaging/active.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ struct BufferedActiveMsg {

BufferedActiveMsg(
MessageType const& in_buffered_msg, NodeType const& in_from_node,
ActionType in_cont
ActionType in_cont = nullptr
) : buffered_msg(in_buffered_msg), from_node(in_from_node), cont(in_cont)
{ }

Expand Down
8 changes: 8 additions & 0 deletions src/vt/runnable/runnable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ void RunnableNew::resume(TimeType time) {
#endif
}

EpochType RunnableNew::getEpoch() const {
if (contexts_.has_td) {
return contexts_.td.getEpoch();
} else {
return no_epoch;
}
}

void RunnableNew::send(elm::ElementIDStruct elm, MsgSizeType bytes) {
if (contexts_.has_lb) contexts_.lb.send(elm, bytes);
}
Expand Down
7 changes: 7 additions & 0 deletions src/vt/runnable/runnable.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ struct RunnableNew {
*/
static void operator delete(void* ptr);

/**
* \brief Get the epoch for a runnable
*
* \return the epoch
*/
EpochType getEpoch() const;

private:
detail::Contexts contexts_; /**< The contexts */
MsgSharedPtr<BaseMsgType> msg_ = nullptr; /**< The associated message */
Expand Down
2 changes: 1 addition & 1 deletion src/vt/runtime/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ bool Runtime::initialize(bool const force_now) {
printStartupBanner();
// Enqueue a check for later in case arguments are modified before work
// actually executes
theSched->enqueue([this]{
theSched->enqueueLambda([this]{
this->checkForArgumentErrors();
});

Expand Down
2 changes: 1 addition & 1 deletion src/vt/runtime/runtime_banner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ void Runtime::printStartupBanner() {

// Enqueue a check for later in case arguments are modified before work
// actually executes
theSched->enqueue([this]{
theSched->enqueueLambda([this]{
this->checkForArgumentErrors();
});
}
Expand Down
11 changes: 11 additions & 0 deletions src/vt/scheduler/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ void Scheduler::resume(ThreadIDType tid) {
suspended_.resumeRunnable(tid);
}

void Scheduler::releaseEpoch(EpochType ep) {
auto iter = pending_work_.find(ep);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible use of pending_work_.extract(ep)

if (iter != pending_work_.end()) {
auto& container = iter->second;
while (container.size() > 0) {
work_queue_.emplace(container.pop());
}
pending_work_.erase(iter);
}
}

#if vt_check_enabled(fcontext)
ThreadManager* Scheduler::getThreadManager() {
return thread_manager_.get();
Expand Down
31 changes: 30 additions & 1 deletion src/vt/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,24 @@ struct Scheduler : runtime::component::Component<Scheduler> {
template <typename RunT>
void enqueue(RunT r);

/**
* \brief Enqueue a callable to execute later with the default priority
* \c default_priority
*
* \param[in] r action to execute
*/
template <typename Callable>
void enqueueLambda(Callable&& c);

/**
* \brief Enqueue a callable to execute later with a priority
*
* \param[in] priority the priority of the action
* \param[in] r action to execute
*/
template <typename Callable>
void enqueueLambda(PriorityType priority, Callable&& c);

/**
* \brief Enqueue an runnable with a priority to execute later
*
Expand Down Expand Up @@ -352,6 +370,13 @@ struct Scheduler : runtime::component::Component<Scheduler> {
ThreadManager* getThreadManager();
#endif

/**
* \brief Release an epoch to run
*
* \param[in] ep the epoch to release
*/
void releaseEpoch(EpochType ep);

template <typename SerializerT>
void serialize(SerializerT& s) {
s | work_queue_
Expand Down Expand Up @@ -381,7 +406,8 @@ struct Scheduler : runtime::component::Component<Scheduler> {
| vtLiveTime
| schedLoopTime
| idleTime
| idleTimeMinusTerm;
| idleTimeMinusTerm
| pending_work_;
}

private:
Expand Down Expand Up @@ -419,6 +445,9 @@ struct Scheduler : runtime::component::Component<Scheduler> {
Queue<UnitType> work_queue_;
# endif

/// Unreleased work pending an epoch release
std::unordered_map<EpochType, Queue<UnitType>> pending_work_;

#if vt_check_enabled(fcontext)
std::unique_ptr<ThreadManager> thread_manager_ = nullptr;
#endif
Expand Down
Loading