From cb6c16393c95fdce8326f7323406c724f4475a2a Mon Sep 17 00:00:00 2001 From: thamindumk Date: Mon, 13 Jan 2025 01:33:24 +0530 Subject: [PATCH] Get isDirected and update to the metaDB --- src/frontend/JasmineGraphFrontEnd.cpp | 47 ++++++++++++++++++++++++-- src/partitioner/stream/Partition.cpp | 25 ++++++++------ src/partitioner/stream/Partition.h | 4 +-- src/partitioner/stream/Partitioner.cpp | 16 +++++++-- src/partitioner/stream/Partitioner.h | 1 + 5 files changed, 76 insertions(+), 17 deletions(-) diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index f216d6301..3cb2ee06a 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -1279,6 +1279,49 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c *loop_exit_p = true; return; } + result_wr = write(connFd, "\r\n", 2); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + string msg_2 = "Is this graph Directed (y/n)? "; + result_wr = write(connFd, msg_2.c_str(), msg_2.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + // Get user response. + char isDirected[FRONTEND_DATA_LENGTH + 1]; + bzero(isDirected, FRONTEND_DATA_LENGTH + 1); + read(connFd, isDirected, FRONTEND_DATA_LENGTH); + string is_directed(isDirected); + is_directed = Utils::trim_copy(is_directed); + for (char &c : is_directed) { + c = tolower(c); + } + string direction; + if(is_directed == "y") { + direction = "1"; + } else { + direction = "0"; + } + + string msg_3 = "Graph type received"; + result_wr = write(connFd, msg_3.c_str(), msg_3.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + + result_wr = write(connFd, "\r\n", 2); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } // create kafka consumer and graph partitioner kstream = new KafkaConnector(configs); // Create the Partitioner object. @@ -1295,9 +1338,9 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c string uploadStartTime = ctime(&time); string sqlStatement = "INSERT INTO graph (name,upload_path,upload_start_time,upload_end_time,graph_status_idgraph_status," - "vertexcount,centralpartitioncount,edgecount) VALUES(\"" + + "vertexcount,centralpartitioncount,edgecount,is_directed) VALUES(\"" + topic_name_s + "\", \"" + path + "\", \"" + uploadStartTime + "\", \"\",\"" + - to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\")"; + to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\",\"" +direction+"\")"; int newGraphID = sqlite->runInsert(sqlStatement); frontend_logger.info("Start listening to " + topic_name_s); diff --git a/src/partitioner/stream/Partition.cpp b/src/partitioner/stream/Partition.cpp index a2cb2e246..a2f111439 100644 --- a/src/partitioner/stream/Partition.cpp +++ b/src/partitioner/stream/Partition.cpp @@ -23,7 +23,7 @@ Logger streaming_partition_logger; // This addition is unidirectional , Add both items of the pair as keys -void Partition::addEdge(std::pair edge) { +void Partition::addEdge(std::pair edge, bool isDirected) { auto exsistFirstVertext = this->edgeList.find(edge.first); if (exsistFirstVertext != this->edgeList.end()) { this->edgeList[edge.first].insert(edge.second); @@ -35,14 +35,16 @@ void Partition::addEdge(std::pair edge) { } } - auto exsistSecondVertext = this->edgeList.find(edge.second); - if (exsistSecondVertext != this->edgeList.end()) { - this->edgeList[edge.second].insert(edge.first); - } else { - this->edgeList[edge.second] = std::set({edge.first}); + if (!isDirected) { + auto exsistSecondVertext = this->edgeList.find(edge.second); + if (exsistSecondVertext != this->edgeList.end()) { + this->edgeList[edge.second].insert(edge.first); + } else { + this->edgeList[edge.second] = std::set({edge.first}); - if (!isExistInEdgeCuts(edge.second)) { - this->vertexCount += 1; + if (!isExistInEdgeCuts(edge.second)) { + this->vertexCount += 1; + } } } } @@ -57,7 +59,7 @@ std::set Partition::getNeighbors(std::string vertex) { // The number of edges, the cardinality of E, is called the size of graph and denoted by |E|. We usually use m to denote // the size of G. -double Partition::getEdgesCount() { +double Partition::getEdgesCount(bool isDirected) { double total = 0; std::set uniqueEdges; for (auto edge : this->edgeList) { @@ -66,7 +68,10 @@ double Partition::getEdgesCount() { uniqueEdges.insert(edge.first + vertext); } } - return uniqueEdges.size(); + if (isDirected) { + return uniqueEdges.size(); + } + return uniqueEdges.size()/2; } // The number of vertices, the cardinality of V, is called the order of graph and devoted by |V|. We usually use n to diff --git a/src/partitioner/stream/Partition.h b/src/partitioner/stream/Partition.h index 61e3ed826..537b0ad08 100644 --- a/src/partitioner/stream/Partition.h +++ b/src/partitioner/stream/Partition.h @@ -53,10 +53,10 @@ class Partition { } this->vertexCount = 0; }; - void addEdge(std::pair edge); + void addEdge(std::pair edge, bool isDirected = false); std::set getNeighbors(std::string); double partitionScore(std::string vertex); - double getEdgesCount(); + double getEdgesCount(bool isDirected = false); double getVertextCount(); double getVertextCountQuick(); void addToEdgeCuts(std::string resident, std::string foreign, int partitionId); diff --git a/src/partitioner/stream/Partitioner.cpp b/src/partitioner/stream/Partitioner.cpp index 5638f5489..6e3f1342c 100644 --- a/src/partitioner/stream/Partitioner.cpp +++ b/src/partitioner/stream/Partitioner.cpp @@ -109,7 +109,7 @@ partitionedEdge Partitioner::hashPartitioning(std::pair()(edge.second) % this->numberOfPartitions; // Hash partitioning if (firstIndex == secondIndex) { - this->partitions[firstIndex].addEdge(edge); + this->partitions[firstIndex].addEdge(edge, this->getIsDirected()); } else { this->partitions[firstIndex].addToEdgeCuts(edge.first, edge.second, secondIndex); this->partitions[secondIndex].addToEdgeCuts(edge.second, edge.first, firstIndex); @@ -121,7 +121,7 @@ void Partitioner::printStats() { int id = 0; for (auto partition : this->partitions) { double vertexCount = partition.getVertextCount(); - double edgesCount = partition.getEdgesCount(); + double edgesCount = partition.getEdgesCount(this->getIsDirected()); double edgeCutsCount = partition.edgeCutsCount(); double edgeCutRatio = partition.edgeCutsRatio(); streaming_partitioner_logger.info(std::to_string(id) + " => Vertex count = " + std::to_string(vertexCount)); @@ -139,7 +139,7 @@ void Partitioner::updateMetaDB() { double edgeCutsCount = 0; for (auto partition : this->partitions) { vertexCount += partition.getVertextCount(); - edgesCount += partition.getEdgesCount(); + edgesCount += partition.getEdgesCount(this->getIsDirected()); edgeCutsCount += partition.edgeCutsCount(); } double numberOfEdges = edgesCount + edgeCutsCount/2; @@ -150,6 +150,16 @@ void Partitioner::updateMetaDB() { this->sqlite->runUpdate(sqlStatement); streaming_partitioner_logger.info("Successfully updated metaDB"); } + +bool Partitioner::getIsDirected() { + std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = '"+std::to_string(this->graphID)+"'"; + auto result = this->sqlite->runSelect(sqlStatement); + if (result[0][0].second == "0") { + return false; + } + return true; +} + /** * Greedy vertex assignment objectives of minimizing the number of cut edges and balancing of the partition sizes. Assign the vertext to partition P that maximize the partition score diff --git a/src/partitioner/stream/Partitioner.h b/src/partitioner/stream/Partitioner.h index 4ba75e3d0..85be6715f 100644 --- a/src/partitioner/stream/Partitioner.h +++ b/src/partitioner/stream/Partitioner.h @@ -50,6 +50,7 @@ class Partitioner { partitionedEdge ldgPartitioning(std::pair edge); static std::pair deserialize(std::string data); void updateMetaDB(); + bool getIsDirected(); void setGraphID(int graphId){this->graphID = graphId;}; };