Skip to content

Commit

Permalink
Merge pull request #2278 from DARMA-tasking/2201-implement-memory-awa…
Browse files Browse the repository at this point in the history
…re-temperedlb-in-vt-rebased

#2201: implement memory aware temperedlb in vt rebased (new version)
  • Loading branch information
lifflander authored Sep 17, 2024
2 parents ed311fe + a83c66a commit 2720c16
Show file tree
Hide file tree
Showing 35 changed files with 2,587 additions and 102 deletions.
9 changes: 5 additions & 4 deletions scripts/JSON_data_files_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,14 @@ def validate_comm_links(all_jsons):

for data in all_jsons:
tasks = data["phases"][n]["tasks"]
id_key = "id" if "id" in tasks[0]["entity"] else "seq_id"
task_ids.update({int(task["entity"][id_key]) for task in tasks})
task_ids.update(
{int(task["entity"].get("id", task["entity"].get("seq_id"))) for task in tasks}
)

if data["phases"][n].get("communications") is not None:
comms = data["phases"][n]["communications"]
comm_ids.update({int(comm["from"][id_key]) for comm in comms})
comm_ids.update({int(comm["to"][id_key]) for comm in comms})
comm_ids.update({int(comm["from"].get("id", comm["from"].get("seq_id"))) for comm in comms})
comm_ids.update({int(comm["to"].get("id", comm["to"].get("seq_id"))) for comm in comms})

if not comm_ids.issubset(task_ids):
logging.error(
Expand Down
17 changes: 12 additions & 5 deletions scripts/LBDatafile_schema.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from schema import And, Optional, Schema

def validate_id_and_seq_id(field):
"""Ensure that either seq_id or id is provided."""
def validate_ids(field):
"""
Ensure that 1) either seq_id or id is provided,
and 2) if an object is migratable, collection_id has been set.
"""
if 'seq_id' not in field and 'id' not in field:
raise ValueError('Either id (bit-encoded) or seq_id must be provided.')

if field['migratable'] and 'seq_id' in field and 'collection_id' not in field:
raise ValueError('If an entity is migratable, it must have a collection_id')

return field

LBDatafile_schema = Schema(
Expand Down Expand Up @@ -45,7 +52,7 @@ def validate_id_and_seq_id(field):
'type': str,
'migratable': bool,
Optional('objgroup_id'): int
}, validate_id_and_seq_id),
}, validate_ids),
'node': int,
'resource': str,
Optional('subphases'): [
Expand All @@ -71,7 +78,7 @@ def validate_id_and_seq_id(field):
Optional('migratable'): bool,
Optional('index'): [int],
Optional('objgroup_id'): int,
}, validate_id_and_seq_id),
}, validate_ids),
'messages': int,
'from': And({
'type': str,
Expand All @@ -82,7 +89,7 @@ def validate_id_and_seq_id(field):
Optional('migratable'): bool,
Optional('index'): [int],
Optional('objgroup_id'): int,
}, validate_id_and_seq_id),
}, validate_ids),
'bytes': float
}
],
Expand Down
1 change: 1 addition & 0 deletions src/vt/configs/arguments/app_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ struct AppConfig {
bool vt_lb_self_migration = false;
bool vt_lb_spec = false;
std::string vt_lb_spec_file = "";
bool vt_lb_run_lb_first_phase = false;


bool vt_no_detect_hang = false;
Expand Down
3 changes: 3 additions & 0 deletions src/vt/configs/arguments/args.cc
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ void addLbArgs(CLI::App& app, AppConfig& appConfig) {
auto lb_self_migration = "Allow load balancer to migrate objects to the same node";
auto lb_spec = "Enable LB spec file (defines which phases output LB data)";
auto lb_spec_file = "File containing LB spec; --vt_lb_spec to enable";
auto lb_first_phase_info = "Force LB to run on the first phase (phase 0)";
auto s = app.add_flag("--vt_lb", appConfig.vt_lb, lb);
auto t1 = app.add_flag("--vt_lb_quiet", appConfig.vt_lb_quiet, lb_quiet);
auto u = app.add_option("--vt_lb_file_name", appConfig.vt_lb_file_name, lb_file_name)->capture_default_str()->check(CLI::ExistingFile);
Expand All @@ -935,6 +936,7 @@ void addLbArgs(CLI::App& app, AppConfig& appConfig) {
auto lbasm = app.add_flag("--vt_lb_self_migration", appConfig.vt_lb_self_migration, lb_self_migration);
auto lbspec = app.add_flag("--vt_lb_spec", appConfig.vt_lb_spec, lb_spec);
auto lbspecfile = app.add_option("--vt_lb_spec_file", appConfig.vt_lb_spec_file, lb_spec_file)->capture_default_str()->check(CLI::ExistingFile);
auto lb_first_phase = app.add_flag("--vt_lb_run_lb_first_phase", appConfig.vt_lb_run_lb_first_phase, lb_first_phase_info);

// --vt_lb_name excludes --vt_lb_file_name, and vice versa
v->excludes(u);
Expand Down Expand Up @@ -963,6 +965,7 @@ void addLbArgs(CLI::App& app, AppConfig& appConfig) {
lbasm->group(debugLB);
lbspec->group(debugLB);
lbspecfile->group(debugLB);
lb_first_phase->group(debugLB);

// help options deliberately omitted from the debugLB group above so that
// they appear grouped with --vt_help when --vt_help is used
Expand Down
1 change: 1 addition & 0 deletions src/vt/configs/types/types_sentinels.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ static constexpr SequentialIDType const first_seq_id = 1;
static constexpr PriorityType const no_priority = 0;
static constexpr PriorityLevelType const no_priority_level = 0;
static constexpr ThreadIDType const no_thread_id = 0;
static constexpr SharedIDType const no_shared_id = -1;

} // end namespace vt

Expand Down
2 changes: 2 additions & 0 deletions src/vt/configs/types/types_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ using PriorityLevelType = uint8_t;
using ComponentIDType = uint32_t;
/// Used to hold a unique ID for a user-level thread on a particular node
using ThreadIDType = uint64_t;
/// Used to hold a shared ID
using SharedIDType = int;

// Action types for attaching a closure to a runtime function
/// Used for generically store an action to perform
Expand Down
28 changes: 24 additions & 4 deletions src/vt/elm/elm_comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#if !defined INCLUDED_VT_ELM_ELM_COMM_H
#define INCLUDED_VT_ELM_ELM_COMM_H

#include "vt/configs/types/types_type.h"
#include "vt/elm/elm_id.h"

#include <unordered_map>
Expand All @@ -58,7 +59,9 @@ enum struct CommCategory : int8_t {
CollectionToNodeBcast = 5,
NodeToCollectionBcast = 6,
CollectiveToCollectionBcast = 7,
LocalInvoke = 8
LocalInvoke = 8,
WriteShared = 9,
ReadOnlyShared = 10
};

inline NodeType objGetNode(ElementIDStruct const id) {
Expand All @@ -71,6 +74,8 @@ struct CommKey {
struct CollectionTag { };
struct CollectionToNodeTag { };
struct NodeToCollectionTag { };
struct WriteSharedTag { };
struct ReadOnlySharedTag { };

CommKey() = default;
CommKey(CommKey const&) = default;
Expand Down Expand Up @@ -107,12 +112,25 @@ struct CommKey {
cat_(bcast ? CommCategory::NodeToCollectionBcast : CommCategory::NodeToCollection)
{ }

CommKey(
WriteSharedTag,
NodeType in_home, int in_shared_id
) : nto_(in_home), shared_id_(in_shared_id), cat_(CommCategory::WriteShared)
{ }

CommKey(
ReadOnlySharedTag,
NodeType in_home, int in_shared_id
) : nto_(in_home), shared_id_(in_shared_id), cat_(CommCategory::ReadOnlyShared)
{ }

ElementIDStruct from_ = {};
ElementIDStruct to_ = {};

ElementIDStruct edge_id_ = {};
NodeType nfrom_ = uninitialized_destination;
NodeType nto_ = uninitialized_destination;
SharedIDType shared_id_ = no_shared_id;
CommCategory cat_ = CommCategory::SendRecv;

ElementIDStruct fromObj() const { return from_; }
Expand All @@ -121,6 +139,7 @@ struct CommKey {
ElementIDType toNode() const { return nto_; }
ElementIDStruct edgeID() const { return edge_id_; }
CommCategory commCategory() const { return cat_; }
int sharedID() const { return shared_id_; }

bool selfEdge() const { return cat_ == CommCategory::SendRecv and from_ == to_; }
bool offNode() const {
Expand All @@ -140,12 +159,12 @@ struct CommKey {
return
k.from_ == from_ and k.to_ == to_ and
k.nfrom_ == nfrom_ and k.nto_ == nto_ and
k.cat_ == cat_;
k.cat_ == cat_ and k.shared_id_ == shared_id_;
}

template <typename SerializerT>
void serialize(SerializerT& s) {
s | from_ | to_ | nfrom_ | nto_ | cat_ | edge_id_;
s | from_ | to_ | nfrom_ | nto_ | cat_ | edge_id_ | shared_id_;
}
};

Expand Down Expand Up @@ -189,7 +208,8 @@ struct hash<vt::elm::CommKey> {
size_t operator()(vt::elm::CommKey const& in) const {
return std::hash<uint64_t>()(
std::hash<vt::elm::ElementIDStruct>()(in.from_) ^
std::hash<vt::elm::ElementIDStruct>()(in.to_) ^ in.nfrom_ ^ in.nto_
std::hash<vt::elm::ElementIDStruct>()(in.to_) ^ in.nfrom_ ^ in.nto_ ^
in.shared_id_
);
}
};
Expand Down
5 changes: 5 additions & 0 deletions src/vt/elm/elm_id.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
//@HEADER
*/

#include "vt/context/context.h"
#include "vt/elm/elm_id.h"
#include "vt/elm/elm_id_bits.h"

Expand All @@ -58,4 +59,8 @@ NodeType ElementIDStruct::getCurrNode() const {
return curr_node;
}

bool ElementIDStruct::isLocatedOnThisNode() const {
return theContext()->getNode() == curr_node and not isMigratable();
}

}} /* end namespace vt::elm */
1 change: 1 addition & 0 deletions src/vt/elm/elm_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct ElementIDStruct {
bool isMigratable() const;
NodeType getHomeNode() const;
NodeType getCurrNode() const;
bool isLocatedOnThisNode() const;
};


Expand Down
16 changes: 16 additions & 0 deletions src/vt/elm/elm_lb_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ void ElementLBData::sendToEntity(
sendComm(key, bytes);
}

void ElementLBData::addWritableSharedID(
NodeType home, int shared_id, double bytes
) {
sendComm(
elm::CommKey{elm::CommKey::WriteSharedTag{}, home, shared_id}, bytes
);
}

void ElementLBData::addReadOnlySharedID(
NodeType home, int shared_id, double bytes
) {
sendComm(
elm::CommKey{elm::CommKey::ReadOnlySharedTag{}, home, shared_id}, bytes
);
}

void ElementLBData::sendComm(elm::CommKey key, double bytes) {
phase_comm_[cur_phase_][key].sendMsg(bytes);
subphase_comm_[cur_phase_].resize(cur_subphase_ + 1);
Expand Down
3 changes: 3 additions & 0 deletions src/vt/elm/elm_lb_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ struct ElementLBData {
void sendToEntity(ElementIDStruct to, ElementIDStruct from, double bytes);
void sendComm(elm::CommKey key, double bytes);

void addWritableSharedID(NodeType home, int shared_id, double bytes);
void addReadOnlySharedID(NodeType home, int shared_id, double bytes);

void recvComm(elm::CommKey key, double bytes);
void recvObjData(
ElementIDStruct to_perm,
Expand Down
13 changes: 13 additions & 0 deletions src/vt/messaging/active.h
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,19 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
MsgSizeType const msg_size
);

public:
/**
* \brief Get the rank-based LB data along with element ID for rank-based work
*
* \return tuple with pointers to each one
*/
auto getRankLBData() {
return std::make_tuple(
&bare_handler_dummy_elm_id_for_lb_data_,
&bare_handler_lb_data_
);
}

private:
# if vt_check_enabled(trace_enabled)
trace::UserEventIDType trace_irecv = trace::no_user_event_id;
Expand Down
3 changes: 3 additions & 0 deletions src/vt/vrt/collection/balance/baselb/baselb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ std::shared_ptr<const balance::Reassignment> BaseLB::normalizeReassignments() {
auto const new_node = std::get<1>(transfer);
auto const current_node = obj_id.curr_node;

vtAbortIf(
not obj_id.isMigratable(), "Transfering object that is not migratable"
);
if (current_node == new_node) {
vt_debug_print(
verbose, lb, "BaseLB::normalizeReassignments(): self migration\n"
Expand Down
43 changes: 40 additions & 3 deletions src/vt/vrt/collection/balance/lb_data_holder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,7 @@ std::unique_ptr<nlohmann::json> LBDataHolder::toJson(PhaseType phase) const {

i = 0;
if (node_comm_.find(phase) != node_comm_.end()) {
for (auto&& elm : node_comm_.at(phase)) {
auto volume = elm.second;
auto const& key = elm.first;
for (auto const& [key, volume] : node_comm_.at(phase)) {
j["communications"][i]["bytes"] = volume.bytes;
j["communications"][i]["messages"] = volume.messages;

Expand Down Expand Up @@ -296,6 +294,17 @@ std::unique_ptr<nlohmann::json> LBDataHolder::toJson(PhaseType phase) const {
outputEntity(j["communications"][i]["from"], key.fromObj());
break;
}
case elm::CommCategory::ReadOnlyShared:
case elm::CommCategory::WriteShared: {
j["communications"][i]["type"] =
(key.cat_ == elm::CommCategory::ReadOnlyShared) ?
"ReadOnlyShared" : "WriteShared";
j["communications"][i]["to"]["type"] = "node";
j["communications"][i]["to"]["id"] = key.toNode();
j["communications"][i]["from"]["type"] = "shared_id";
j["communications"][i]["from"]["id"] = key.sharedID();
break;
}
case elm::CommCategory::LocalInvoke:
case elm::CommCategory::CollectiveToCollectionBcast:
// not currently supported
Expand Down Expand Up @@ -476,6 +485,34 @@ LBDataHolder::LBDataHolder(nlohmann::json const& j)
);
CommVolume vol{bytes, messages};
this->node_comm_[id][key] = vol;
} else if (
type == "ReadOnlyShared" or type == "WriteShared"
) {
vtAssertExpr(comm["from"]["type"] == "shared_id");
vtAssertExpr(comm["to"]["type"] == "node");

CommVolume vol{bytes, messages};
auto to_node = comm["to"]["id"];
vtAssertExpr(to_node.is_number());

auto from_shared_id = comm["from"]["id"];
vtAssertExpr(from_shared_id.is_number());

if (type == "ReadOnlyShared") {
CommKey key(
CommKey::ReadOnlySharedTag{},
static_cast<NodeType>(to_node),
static_cast<int>(from_shared_id)
);
this->node_comm_[id][key] = vol;
} else {
CommKey key(
CommKey::WriteSharedTag{},
static_cast<NodeType>(to_node),
static_cast<int>(from_shared_id)
);
this->node_comm_[id][key] = vol;
}
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions src/vt/vrt/collection/balance/lb_data_holder.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,9 @@

#include "vt/config.h"
#include "vt/vrt/collection/balance/lb_common.h"
#include "vt/elm/elm_comm.h"

#include <unordered_map>
#include <memory>
#include <variant>
#include <string>

#include <nlohmann/json_fwd.hpp>

Expand Down
6 changes: 5 additions & 1 deletion src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ LBType LBManager::decideLBToRun(PhaseType phase, bool try_file) {
} else {
auto interval = theConfig()->vt_lb_interval;
vtAssert(interval != 0, "LB Interval must not be 0");
if (phase % interval == 1 || (interval == 1 && phase != 0)) {
vt::PhaseType offset = theConfig()->vt_lb_run_lb_first_phase ? 0 : 1;
if (
phase % interval == offset ||
(interval == 1 && phase != 0)
) {
bool name_match = false;
for (auto&& elm : get_lb_names()) {
if (elm.second == theConfig()->vt_lb_name) {
Expand Down
4 changes: 4 additions & 0 deletions src/vt/vrt/collection/balance/model/composed_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,8 @@ int ComposedModel::getNumSubphases() const {
return base_->getNumSubphases();
}

CommMapType ComposedModel::getComm(PhaseOffset when) const {
return base_->getComm(when);
}

}}}}
1 change: 1 addition & 0 deletions src/vt/vrt/collection/balance/model/composed_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class ComposedModel : public LoadModel
bool hasUserData() const override;
ElmUserDataType getUserData(ElementIDStruct object, PhaseOffset when) const override;
unsigned int getNumPastPhasesNeeded(unsigned int look_back) const override;
CommMapType getComm(PhaseOffset offset) const override;

ObjectIterator begin() const override;

Expand Down
Loading

0 comments on commit 2720c16

Please sign in to comment.