diff --git a/include/GraphParallelDFS.h b/include/GraphParallelDFS.h index 94f9e6d..6475626 100644 --- a/include/GraphParallelDFS.h +++ b/include/GraphParallelDFS.h @@ -34,14 +34,19 @@ class GraphParallelDFS { friend ostream& operator<<(ostream& os, GraphParallelDFS& graphParallelDfs); + // create output file + void saveTo(const string& filename); + // getters here int getNNodes() const; - const vector &getAp() const; + const vector &getAp_dag() const; + + const vector &getAi_dag() const; - const vector &getAi() const; + const vector &getAp_dt() const; - const vector &getRoots() const; + const vector &getAi_dt() const; const vector &getEV() const; @@ -58,8 +63,11 @@ class GraphParallelDFS { void computeRanks(); int n_nodes; - vector Ap; - vector Ai; + vector Ap_dag; + vector Ai_dag; + + vector Ap_dt; + vector Ai_dt; // precomputed leaves and roots vector leaves; @@ -76,8 +84,13 @@ class GraphParallelDFS { vector gamma; vector gamma_tilde; - vector parents; vector post_order; + + // contains the parent of each node in the dt + vector parents; + + // contains all the parents of each node in the dag + vector> parents_dag; }; diff --git a/include/OutputFileException.h b/include/OutputFileException.h new file mode 100644 index 0000000..463548c --- /dev/null +++ b/include/OutputFileException.h @@ -0,0 +1,22 @@ +// +// Created by claud on 23/06/2020. +// + +#ifndef PARALLEL_DFS_DAG_OUTPUTFILEEXCEPTION_H +#define PARALLEL_DFS_DAG_OUTPUTFILEEXCEPTION_H + +using namespace std; + +class OutputFileException : exception { +public: + OutputFileException() : message("Unable to open the output file") {} + explicit OutputFileException(const char * message) : message(message) {} + + const char * what() { + return message; + } +private: + const char * message; +}; + +#endif //PARALLEL_DFS_DAG_OUTPUTFILEEXCEPTION_H diff --git a/src/GraphParallelDFS.cpp b/src/GraphParallelDFS.cpp index 3dd312b..7d18e09 100644 --- a/src/GraphParallelDFS.cpp +++ b/src/GraphParallelDFS.cpp @@ -2,7 +2,9 @@ #include #include #include +#include #include "InvalidGraphInputFile.h" +#include "OutputFileException.h" #include "GraphParallelDFS.h" using namespace std; @@ -33,12 +35,12 @@ GraphParallelDFS::GraphParallelDFS(const string &filename) : n_nodes(0){ char c_buffer; istringstream stream(buffer); - // save node index in Ap - this->Ap.push_back(this->Ai.size()); + // save node index in Ap_dt + this->Ap_dag.push_back(this->Ai_dag.size()); // read the first node stream >> node; - this->Ai.push_back(node); + this->Ai_dag.push_back(node); while(stream >> c_buffer && c_buffer != '#'){ if(isdigit(c_buffer)){ @@ -52,7 +54,7 @@ GraphParallelDFS::GraphParallelDFS(const string &filename) : n_nodes(0){ //check the validity of the node value if(child < n_nodes){ - this->Ai.push_back(child); + this->Ai_dag.push_back(child); }else{ throw InvalidGraphInputFile("Out of bound node"); } @@ -64,12 +66,12 @@ GraphParallelDFS::GraphParallelDFS(const string &filename) : n_nodes(0){ } //check the saved number of nodes - if(this->Ap.size() != n_nodes){ + if(this->Ap_dag.size() != n_nodes){ throw InvalidGraphInputFile("The number of read nodes doesn't match the declared one"); } - // mark end of Ai array - this->Ap.push_back(Ai.size()); + // mark end of Ai_dt array + this->Ap_dag.push_back(Ai_dag.size()); //work out roots of the graph for(unsigned int i = 0; i < this->n_nodes; i++){ @@ -83,16 +85,20 @@ int GraphParallelDFS::getNNodes() const { return n_nodes; } -const vector &GraphParallelDFS::getAp() const { - return Ap; +const vector &GraphParallelDFS::getAp_dag() const { + return Ap_dag; } -const vector &GraphParallelDFS::getAi() const { - return Ai; +const vector &GraphParallelDFS::getAi_dag() const { + return Ai_dag; } -const vector &GraphParallelDFS::getRoots() const { - return roots; +const vector &GraphParallelDFS::getAp_dt() const { + return Ap_dt; +} + +const vector &GraphParallelDFS::getAi_dt() const { + return Ai_dt; } const vector &GraphParallelDFS::getEV() const { @@ -123,6 +129,9 @@ void GraphParallelDFS::convertToDT() { vector P; + // store the list of the parents of the dag for each node + parents_dag.resize(n_nodes); + while(!Q.empty()){ node_futures.clear(); P = vector(); @@ -131,8 +140,8 @@ void GraphParallelDFS::convertToDT() { // create and launch a task for each node in Q packaged_task task([this, &paths, &node_mutexes, &mP, &P](int node) { // iterate over the children of node - int first_child = this->Ap[node] + 1; - int ending_child = this->Ap[node + 1]; + int first_child = this->Ap_dag[node] + 1; + int ending_child = this->Ap_dag[node + 1]; // path to the current node with node itself (used in child for comparison) vector Br = paths[node]; @@ -144,7 +153,7 @@ void GraphParallelDFS::convertToDT() { // create and launch a task for each child of node for(int i=first_child; i task_child([this, &paths, &node_mutexes, &mP, &P, &Br](int index, int current_parent){ - int child = this->Ai[index]; + int child = this->Ai_dag[index]; // existing path vector Qr = paths[child]; @@ -163,6 +172,9 @@ void GraphParallelDFS::convertToDT() { // decrement the count of incoming edges which needs to be visited yet for the node child int remaining = --(this->incoming_edges[child]); + // store the current parent of the current child + this->parents_dag[child].push_back(current_parent); + node_mutexes[child].unlock(); // if all incoming edges into this child have been visited, add it to P @@ -199,10 +211,9 @@ void GraphParallelDFS::convertToDT() { Q = move(P); } - // calculate new Ai, new Ap, number of outgoing edges, leaves and their gamma (obvious: it's 1) + // calculate new Ai_dt, new Ap_dt, number of outgoing edges, leaves and their gamma (obvious: it's 1) this->outgoing_edges.resize(n_nodes); - vector Ap_dt = vector(n_nodes + 1, 0); - vector Ai_dt = vector(); + Ap_dt.resize(n_nodes + 1, 0); this->gamma.resize(this->n_nodes, 0); @@ -210,12 +221,12 @@ void GraphParallelDFS::convertToDT() { Ap_dt[i] = Ai_dt.size(); Ai_dt.push_back(i); - int first_child = Ap[i] + 1; - int end_child = Ap[i + 1]; + int first_child = Ap_dag[i] + 1; + int end_child = Ap_dag[i + 1]; // iterate over child of current node i for(int j=first_child; jparents[child] == i) @@ -233,7 +244,7 @@ void GraphParallelDFS::convertToDT() { } } - // Add Ai dimension to dt + // Add Ai_dt dimension to dt Ap_dt[this->n_nodes] = Ai_dt.size(); // update of the number of outgoing edges for the last node (since in the previous cycle it is @@ -244,8 +255,6 @@ void GraphParallelDFS::convertToDT() { this->gamma[n_nodes - 1] = 1; } - this->Ap = move(Ap_dt); - this->Ai = move(Ai_dt); } void GraphParallelDFS::computePostOrder() { @@ -254,7 +263,6 @@ void GraphParallelDFS::computePostOrder() { // Use the precomputed roots // Move them for performance reason since they won't be used anymore - // TODO: if we decide to move remember to remove the getRoot() vector Q = move(this->roots); //mutex which protect P vector modifications @@ -274,8 +282,8 @@ void GraphParallelDFS::computePostOrder() { packaged_task task([this, &mP, &P](int node) { int post = this->post_order[node]; - int children_start = this->Ap[node]+1; - int children_end = this->Ap[node+1]; + int children_start = this->Ap_dt[node] + 1; + int children_end = this->Ap_dt[node + 1]; // vector which collect the children futures vector> child_futures; @@ -284,7 +292,7 @@ void GraphParallelDFS::computePostOrder() { for(int i = children_start; i < children_end; i++){ // create and launch task for each child of the node packaged_task task_child([this, &post, &mP, &P](int index){ - int child = this->Ai[index]; + int child = this->Ai_dt[index]; // pre-compute post-order post_order[child] = post + this->gamma_tilde[child]; @@ -362,7 +370,7 @@ void GraphParallelDFS::computeSubGraphSize(){ // directed tree, hence each node will have at most one parent int parent = this->parents[node]; - // verify that it is actually a parent, i. e. it is not -1 + // verify that it has actually a parent, i. e. it is not -1 if(parent != -1){ // check if no more outgoing edges needs to be visited yet @@ -390,18 +398,18 @@ void GraphParallelDFS::computeSubGraphSize(){ for(int p : C) { - // create and launch a task for each node in P + // create and launch a task for each node in C packaged_task task([this](int p) { - int first_child = Ap[p] + 1; - int end_child = Ap[p + 1]; + int first_child = Ap_dt[p] + 1; + int end_child = Ap_dt[p + 1]; // order children of P - // no race condition since each task will operate on a different section of Ai - sort(this->Ai.begin() + first_child, this->Ai.begin() + end_child); + // no race condition since each task will operate on a different section of Ai_dt + sort(this->Ai_dt.begin() + first_child, this->Ai_dt.begin() + end_child); // iterate over children of P for (int i = first_child; i < end_child; i++) { - int child = Ai[i]; + int child = Ai_dt[i]; this->gamma_tilde[child] = gamma[p]; this->gamma[p] += gamma[child]; @@ -426,7 +434,122 @@ void GraphParallelDFS::computeSubGraphSize(){ } } -void GraphParallelDFS::computeRanks(){} +void GraphParallelDFS::computeRanks(){ + this->e_v.resize(this->n_nodes, 0); + this->s_v.resize(this->n_nodes, 0); + + vector P; + + // mutex to protect P modifications + mutex mP; + + // move leaves since they won't be used anymore + vector Q; + + // list to collect children futures + vector> node_futures; + + // initialize vector of atomic variables with the precomputed outgoing_edges + // calculate also leaves of the dag + vector outgoing(this->n_nodes); + for(int i=0; in_nodes; i++){ + int start_child = this->Ap_dag[i] + 1; + int end_child = this->Ap_dag[i+1]; + + int n_children = end_child - start_child; + outgoing[i].store(n_children); + + if(n_children == 0) + Q.push_back(i); + } + + while(!Q.empty()){ + P = vector(); + node_futures.clear(); + + for(int node : Q){ + // create and launch a task for each node in Q + packaged_task task([this, &outgoing, &mP, &P](int node) { + // e_v is, for definition, the corresponding post order + 1 + this->e_v[node] = this->post_order[node] + 1; + + int first_child = Ap_dag[node] + 1; + int end_child = Ap_dag[node + 1]; + + int min = INT_MAX; + + if(first_child == end_child){ + // this is a leaf so e_v and s_v will correspond + s_v[node] = e_v[node]; + } else { + // iterate over children of node + + // we compute the s_v in this case as the minimum of the s_v of the children + // since they s_v is: + // - equal to e_v in case they are a leaf + // - equal to the mimimum of the s_v of the children otherwise + for (int i = first_child; i < end_child; i++) { + // no parallelization in this cycle: min should be protected in that case + // which would reduce the benefits of the parallelization, considering also that + // the actual parallelizable content of the cycle is almost none + int child = Ai_dag[i]; + + // once here we visited the children of the node node + if(this->s_v[child] < min) min = s_v[child]; + } + + s_v[node] = min; + } + + // update the count of the visited outgoing edges for the parentS of the current node + // NOTE: root nodes will never enter this cycle + for(int parent : parents_dag[node]){ + int remaining = outgoing[parent].fetch_sub(1); + + // check that no more children (of this parent) needs to be visited yet + if(remaining == 1){ + mP.lock(); + P.push_back(parent); + mP.unlock(); + } + } + }); + + node_futures.push_back(move(task.get_future())); + + // actually launch task + task(node); + + } + + // wait for all launched tasks to terminate + for(auto& node_future : node_futures) + node_future.get(); + + // move all content of P to Q + Q = move(P); + } +} + +ostream& operator<<(ostream &os, GraphParallelDFS &graphParallelDfs) { + + for(int node = 0; node < graphParallelDfs.getNNodes(); node++){ + os << node << " " << graphParallelDfs.s_v[node] << " " << graphParallelDfs.e_v[node] << endl; + } + + return os; +} + +void GraphParallelDFS::saveTo(const string& filename){ + ofstream outputFile(filename); + + // check errors in file opening + if(!outputFile.is_open()){ + throw OutputFileException(); + } + + outputFile << *this; +} void GraphParallelDFS::computeLabels() { // phase 1