Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support network Quality of Service #23

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
*.a
*.d
tracer/traceR
bin/traceR

# hide backups
*~
Expand Down
2 changes: 1 addition & 1 deletion tracer/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ all: traceR

PREFIX = ..

SRCS = p2p-events.C coll-events.C tracer-driver.C
SRCS = p2p-events.C coll-events.C tracer-driver.C qos-manager.C
OBJS = ${SRCS:.C=.o}

traceR: ${OBJS} components
Expand Down
8 changes: 8 additions & 0 deletions tracer/coll-events.C
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

extern "C" {
#include "codes/model-net.h"
#include "codes/model-net-sched.h"
#include "codes/lp-io.h"
#include "codes/codes.h"
#include "codes/codes_mapping.h"
Expand All @@ -16,6 +17,9 @@ extern "C" {
}

#include "tracer-driver.h"
#include "qos-manager.h"

extern QoSManager qosManager;

// the indexing should match between the #define and the lookUpTable
#define TRACER_A2A 0
Expand Down Expand Up @@ -155,6 +159,8 @@ void enqueue_coll_msg(
m_local.proc_event_type = lookUpTable[index].local_event;
m_local.executed.taskid = ns->my_pe->currentCollTask;

int prio = qosManager.getServiceLevel(ns->my_job, lpid_to_pe(lp->id), dest);
model_net_set_msg_param(MN_MSG_PARAM_SCHED, MN_SCHED_PARAM_PRIO, (void*)&prio);
model_net_event(net_id, "coll", pe_to_lpid(dest, ns->my_job), size,
sendOffset + copyTime*(isEager?1:0), sizeof(proc_msg),
(const void*)&m_remote, sizeof(proc_msg), &m_local, lp);
Expand Down Expand Up @@ -241,6 +247,8 @@ void handle_coll_recv_post_event(
size = m->msgId.size;
m_remote.msgId.size = size;
}
int prio = qosManager.getServiceLevel(ns->my_job, lpid_to_pe(lp->id), m->msgId.pe);
model_net_set_msg_param(MN_MSG_PARAM_SCHED, MN_SCHED_PARAM_PRIO, (void*)&prio);
model_net_event(net_id, "coll", pe_to_lpid(m->msgId.pe, ns->my_job),
size, nic_delay, sizeof(proc_msg),
(const void*)&m_remote, sizeof(proc_msg), &m_local, lp);
Expand Down
8 changes: 8 additions & 0 deletions tracer/p2p-events.C
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

extern "C" {
#include "codes/model-net.h"
#include "codes/model-net-sched.h"
#include "codes/lp-io.h"
#include "codes/codes.h"
#include "codes/codes_mapping.h"
Expand All @@ -16,6 +17,9 @@ extern "C" {
}

#include "tracer-driver.h"
#include "qos-manager.h"

extern QoSManager qosManager;

void handle_recv_event(
proc_state * ns,
Expand Down Expand Up @@ -841,6 +845,8 @@ int send_msg(
#endif
m_remote.iteration = iter;

int prio = qosManager.getServiceLevel(ns->my_job, lpid_to_pe(lp->id), lpid_to_pe(dest_id));
model_net_set_msg_param(MN_MSG_PARAM_SCHED, MN_SCHED_PARAM_PRIO, (void*)&prio);
/* model_net_event params:
int net_id, char* category, tw_lpid final_dest_lp,
uint64_t message_size, tw_stime offset, int remote_event_size,
Expand Down Expand Up @@ -877,6 +883,8 @@ void enqueue_msg(
#endif
m_remote.iteration = iter;

int prio = qosManager.getServiceLevel(ns->my_job, lpid_to_pe(lp->id), lpid_to_pe(dest_id));
model_net_set_msg_param(MN_MSG_PARAM_SCHED, MN_SCHED_PARAM_PRIO, (void*)&prio);
model_net_event(net_id, "p2p", dest_id, size, sendOffset,
sizeof(proc_msg), (const void*)&m_remote, sizeof(proc_msg), m_local,
lp);
Expand Down
48 changes: 48 additions & 0 deletions tracer/qos-manager.C
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include "qos-manager.h"

#include <fstream>
#include <stdio.h>

using namespace std;

bool QoSManager::readQoSFileForJob(int job, const char filename[]) {
ifstream qosFile(filename);
if (!qosFile) {
fprintf(stderr, "Cannot read QoS file %s\n", filename);
return false;
}
ServiceLevel defaultSL;
qosFile >> defaultSL;
if(qosFile.fail()) {
fprintf(stderr, "Error reading default service level from QoS file %s\n", filename);
return false;
}
jobs[job].defaultSL = defaultSL;

Rank rank;
ServiceLevel sl;
while(qosFile >> rank >> sl) {
jobs[job].serviceLevels[rank] = sl;
}
return true;
}

QoSManager::ServiceLevel QoSManager::getServiceLevel(Job j, Rank s, Rank d) {
JobQoSMap::iterator job = jobs.find(j);
if(job != jobs.end()) {
ServiceLevelMap::iterator src = job->second.serviceLevels.find(s);
if (src != job->second.serviceLevels.end()) {
return src->second;
}
ServiceLevelMap::iterator dest = job->second.serviceLevels.find(d);
if (dest != job->second.serviceLevels.end()) {
return dest->second;
}
else {
return job->second.defaultSL;
}
}
else {
return overallDefaultSL;
}
}
30 changes: 30 additions & 0 deletions tracer/qos-manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef _QOS_MANAGER_H_
#define _QOS_MANAGER_H_

#include <map>

class QoSManager {
public:
typedef int Job;
typedef int Rank;
typedef int ServiceLevel;

private:
typedef std::map<Rank, ServiceLevel> ServiceLevelMap;
struct JobQoS {
ServiceLevel defaultSL;
ServiceLevelMap serviceLevels;
};
typedef std::map<Job, JobQoS> JobQoSMap;

ServiceLevel overallDefaultSL;
JobQoSMap jobs;

public:
QoSManager(ServiceLevel overallDefaultSL): overallDefaultSL(overallDefaultSL) {}
void setDefaultServiceLevel(ServiceLevel defaultSL) { overallDefaultSL = defaultSL; }
bool readQoSFileForJob(Job job, const char filename[]);
ServiceLevel getServiceLevel(Job job, Rank src, Rank dest);
};

#endif
42 changes: 36 additions & 6 deletions tracer/tracer-driver.C
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ extern "C" {
}

#include "tracer-driver.h"
#include "qos-manager.h"

char tracer_input[256]; /* filename for tracer input file */

CoreInf *global_rank; /* core to job ID and process ID */
JobInf *jobs;
QoSManager qosManager(0);
int default_mapping;
int total_ranks;
tw_stime *jobTimes;
Expand Down Expand Up @@ -179,6 +181,14 @@ int main(int argc, char **argv)
printf("Eager limit is %f bytes\n", eager_limit);

int ret;
int default_sl;
ret = configuration_get_value_int(&config, "PARAMS", "default_sl", NULL,
&default_sl);
if (ret == 0) {
qosManager.setDefaultServiceLevel(default_sl);
printf("Default service level is %d\n", default_sl);
}

/* set up the output directory */
if(lp_io_dir[0]) {
ret = lp_io_prepare(lp_io_dir, 0, &handle, MPI_COMM_WORLD);
Expand Down Expand Up @@ -241,24 +251,43 @@ int main(int argc, char **argv)
total_ranks = 0;

/* read per job information */
#define LINE_SIZE 1024
char line[LINE_SIZE];
fgets(line, LINE_SIZE, jobIn); // Eat everything to the next newline
for(int i = 0; i < num_jobs; i++) {
fgets(line, LINE_SIZE, jobIn);
FILE * lineStream = fmemopen(line, strlen(line), "r");
#if TRACER_BIGSIM_TRACES
char tempTrace[200];
fscanf(jobIn, "%s", tempTrace);
fscanf(lineStream, "%s", tempTrace);
sprintf(jobs[i].traceDir, "%s%s", tempTrace, "/bgTrace");
#else
fscanf(jobIn, "%s", jobs[i].traceDir);
fscanf(lineStream, "%s", jobs[i].traceDir);
#endif
fscanf(jobIn, "%s", jobs[i].map_file);
fscanf(jobIn, "%d", &jobs[i].numRanks); /* number of processes */
fscanf(jobIn, "%d", &jobs[i].numIters); /* number of repetitions */
fscanf(lineStream, "%s", jobs[i].map_file);
fscanf(lineStream, "%d", &jobs[i].numRanks); /* number of processes */
fscanf(lineStream, "%d", &jobs[i].numIters); /* number of repetitions */

// See if there is a QoS file
char qosFile[LINE_SIZE] = "";
if (fscanf(lineStream, "%s", qosFile) == 1) {
if(!qosManager.readQoSFileForJob(i, qosFile)) {
// The QoS Manager already printed an error message, so just quit
MPI_Abort(MPI_COMM_WORLD, 1);
}
}

total_ranks += jobs[i].numRanks;
jobs[i].rankMap = (int*) malloc(jobs[i].numRanks * sizeof(int));
jobs[i].skipMsgId = -1;
jobTimes[i] = 0;
if(!rank) {
printf("Job %d - ranks %d, trace folder %s, rank file %s, iters %d\n",
printf("Job %d - ranks %d, trace folder %s, rank file %s, iters %d",
i, jobs[i].numRanks, jobs[i].traceDir, jobs[i].map_file, jobs[i].numIters);
if(qosFile[0] != '\0') {
printf(", QoS file %s", qosFile);
}
printf("\n");
}
}

Expand Down Expand Up @@ -322,6 +351,7 @@ int main(int argc, char **argv)
next = ' ';
fscanf(jobIn, "%c", &next);
}
fclose(jobIn);

int ranks_till_now = 0;
for(int i = 0; i < num_jobs && !dump_topo_only; i++) {
Expand Down