From cb6c16393c95fdce8326f7323406c724f4475a2a Mon Sep 17 00:00:00 2001 From: thamindumk Date: Mon, 13 Jan 2025 01:33:24 +0530 Subject: [PATCH 1/4] Get isDirected and update to the metaDB --- src/frontend/JasmineGraphFrontEnd.cpp | 47 ++++++++++++++++++++++++-- src/partitioner/stream/Partition.cpp | 25 ++++++++------ src/partitioner/stream/Partition.h | 4 +-- src/partitioner/stream/Partitioner.cpp | 16 +++++++-- src/partitioner/stream/Partitioner.h | 1 + 5 files changed, 76 insertions(+), 17 deletions(-) diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index f216d6301..3cb2ee06a 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -1279,6 +1279,49 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c *loop_exit_p = true; return; } + result_wr = write(connFd, "\r\n", 2); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + string msg_2 = "Is this graph Directed (y/n)? "; + result_wr = write(connFd, msg_2.c_str(), msg_2.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + // Get user response. + char isDirected[FRONTEND_DATA_LENGTH + 1]; + bzero(isDirected, FRONTEND_DATA_LENGTH + 1); + read(connFd, isDirected, FRONTEND_DATA_LENGTH); + string is_directed(isDirected); + is_directed = Utils::trim_copy(is_directed); + for (char &c : is_directed) { + c = tolower(c); + } + string direction; + if(is_directed == "y") { + direction = "1"; + } else { + direction = "0"; + } + + string msg_3 = "Graph type received"; + result_wr = write(connFd, msg_3.c_str(), msg_3.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + + result_wr = write(connFd, "\r\n", 2); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } // create kafka consumer and graph partitioner kstream = new KafkaConnector(configs); // Create the Partitioner object. @@ -1295,9 +1338,9 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c string uploadStartTime = ctime(&time); string sqlStatement = "INSERT INTO graph (name,upload_path,upload_start_time,upload_end_time,graph_status_idgraph_status," - "vertexcount,centralpartitioncount,edgecount) VALUES(\"" + + "vertexcount,centralpartitioncount,edgecount,is_directed) VALUES(\"" + topic_name_s + "\", \"" + path + "\", \"" + uploadStartTime + "\", \"\",\"" + - to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\")"; + to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\",\"" +direction+"\")"; int newGraphID = sqlite->runInsert(sqlStatement); frontend_logger.info("Start listening to " + topic_name_s); diff --git a/src/partitioner/stream/Partition.cpp b/src/partitioner/stream/Partition.cpp index a2cb2e246..a2f111439 100644 --- a/src/partitioner/stream/Partition.cpp +++ b/src/partitioner/stream/Partition.cpp @@ -23,7 +23,7 @@ Logger streaming_partition_logger; // This addition is unidirectional , Add both items of the pair as keys -void Partition::addEdge(std::pair edge) { +void Partition::addEdge(std::pair edge, bool isDirected) { auto exsistFirstVertext = this->edgeList.find(edge.first); if (exsistFirstVertext != this->edgeList.end()) { this->edgeList[edge.first].insert(edge.second); @@ -35,14 +35,16 @@ void Partition::addEdge(std::pair edge) { } } - auto exsistSecondVertext = this->edgeList.find(edge.second); - if (exsistSecondVertext != this->edgeList.end()) { - this->edgeList[edge.second].insert(edge.first); - } else { - this->edgeList[edge.second] = std::set({edge.first}); + if (!isDirected) { + auto exsistSecondVertext = this->edgeList.find(edge.second); + if (exsistSecondVertext != this->edgeList.end()) { + this->edgeList[edge.second].insert(edge.first); + } else { + this->edgeList[edge.second] = std::set({edge.first}); - if (!isExistInEdgeCuts(edge.second)) { - this->vertexCount += 1; + if (!isExistInEdgeCuts(edge.second)) { + this->vertexCount += 1; + } } } } @@ -57,7 +59,7 @@ std::set Partition::getNeighbors(std::string vertex) { // The number of edges, the cardinality of E, is called the size of graph and denoted by |E|. We usually use m to denote // the size of G. -double Partition::getEdgesCount() { +double Partition::getEdgesCount(bool isDirected) { double total = 0; std::set uniqueEdges; for (auto edge : this->edgeList) { @@ -66,7 +68,10 @@ double Partition::getEdgesCount() { uniqueEdges.insert(edge.first + vertext); } } - return uniqueEdges.size(); + if (isDirected) { + return uniqueEdges.size(); + } + return uniqueEdges.size()/2; } // The number of vertices, the cardinality of V, is called the order of graph and devoted by |V|. We usually use n to diff --git a/src/partitioner/stream/Partition.h b/src/partitioner/stream/Partition.h index 61e3ed826..537b0ad08 100644 --- a/src/partitioner/stream/Partition.h +++ b/src/partitioner/stream/Partition.h @@ -53,10 +53,10 @@ class Partition { } this->vertexCount = 0; }; - void addEdge(std::pair edge); + void addEdge(std::pair edge, bool isDirected = false); std::set getNeighbors(std::string); double partitionScore(std::string vertex); - double getEdgesCount(); + double getEdgesCount(bool isDirected = false); double getVertextCount(); double getVertextCountQuick(); void addToEdgeCuts(std::string resident, std::string foreign, int partitionId); diff --git a/src/partitioner/stream/Partitioner.cpp b/src/partitioner/stream/Partitioner.cpp index 5638f5489..6e3f1342c 100644 --- a/src/partitioner/stream/Partitioner.cpp +++ b/src/partitioner/stream/Partitioner.cpp @@ -109,7 +109,7 @@ partitionedEdge Partitioner::hashPartitioning(std::pair()(edge.second) % this->numberOfPartitions; // Hash partitioning if (firstIndex == secondIndex) { - this->partitions[firstIndex].addEdge(edge); + this->partitions[firstIndex].addEdge(edge, this->getIsDirected()); } else { this->partitions[firstIndex].addToEdgeCuts(edge.first, edge.second, secondIndex); this->partitions[secondIndex].addToEdgeCuts(edge.second, edge.first, firstIndex); @@ -121,7 +121,7 @@ void Partitioner::printStats() { int id = 0; for (auto partition : this->partitions) { double vertexCount = partition.getVertextCount(); - double edgesCount = partition.getEdgesCount(); + double edgesCount = partition.getEdgesCount(this->getIsDirected()); double edgeCutsCount = partition.edgeCutsCount(); double edgeCutRatio = partition.edgeCutsRatio(); streaming_partitioner_logger.info(std::to_string(id) + " => Vertex count = " + std::to_string(vertexCount)); @@ -139,7 +139,7 @@ void Partitioner::updateMetaDB() { double edgeCutsCount = 0; for (auto partition : this->partitions) { vertexCount += partition.getVertextCount(); - edgesCount += partition.getEdgesCount(); + edgesCount += partition.getEdgesCount(this->getIsDirected()); edgeCutsCount += partition.edgeCutsCount(); } double numberOfEdges = edgesCount + edgeCutsCount/2; @@ -150,6 +150,16 @@ void Partitioner::updateMetaDB() { this->sqlite->runUpdate(sqlStatement); streaming_partitioner_logger.info("Successfully updated metaDB"); } + +bool Partitioner::getIsDirected() { + std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = '"+std::to_string(this->graphID)+"'"; + auto result = this->sqlite->runSelect(sqlStatement); + if (result[0][0].second == "0") { + return false; + } + return true; +} + /** * Greedy vertex assignment objectives of minimizing the number of cut edges and balancing of the partition sizes. Assign the vertext to partition P that maximize the partition score diff --git a/src/partitioner/stream/Partitioner.h b/src/partitioner/stream/Partitioner.h index 4ba75e3d0..85be6715f 100644 --- a/src/partitioner/stream/Partitioner.h +++ b/src/partitioner/stream/Partitioner.h @@ -50,6 +50,7 @@ class Partitioner { partitionedEdge ldgPartitioning(std::pair edge); static std::pair deserialize(std::string data); void updateMetaDB(); + bool getIsDirected(); void setGraphID(int graphId){this->graphID = graphId;}; }; From 92b12f18931b4d35ccd8321b406c4b52b90f87b7 Mon Sep 17 00:00:00 2001 From: thamindumk Date: Mon, 13 Jan 2025 01:38:15 +0530 Subject: [PATCH 2/4] Fix a sonar lint issue --- src/frontend/JasmineGraphFrontEnd.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index 3cb2ee06a..7f0a315c6 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -1302,7 +1302,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c c = tolower(c); } string direction; - if(is_directed == "y") { + if (is_directed == "y") { direction = "1"; } else { direction = "0"; From 705149455b0500cdd90c3df93353f6c3b91c4034 Mon Sep 17 00:00:00 2001 From: thamindumk Date: Mon, 13 Jan 2025 14:34:30 +0530 Subject: [PATCH 3/4] Use Conts file for some constant values --- src/frontend/JasmineGraphFrontEnd.cpp | 16 ++++++++-------- src/util/Conts.cpp | 3 ++- src/util/Conts.h | 3 ++- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index 7f0a315c6..a5d841bfb 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -1279,14 +1279,14 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c *loop_exit_p = true; return; } - result_wr = write(connFd, "\r\n", 2); + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; return; } - string msg_2 = "Is this graph Directed (y/n)? "; - result_wr = write(connFd, msg_2.c_str(), msg_2.length()); + string checkDirection = "Is this graph Directed (y/n)? "; + result_wr = write(connFd, checkDirection.c_str(), checkDirection.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; @@ -1303,20 +1303,20 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } string direction; if (is_directed == "y") { - direction = "1"; + direction = Conts::DIRECTED; } else { - direction = "0"; + direction = Conts::UNDIRECTED; } - string msg_3 = "Graph type received"; - result_wr = write(connFd, msg_3.c_str(), msg_3.length()); + string checkGraphType = "Graph type received"; + result_wr = write(connFd, checkGraphType.c_str(), checkGraphType.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; return; } - result_wr = write(connFd, "\r\n", 2); + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; diff --git a/src/util/Conts.cpp b/src/util/Conts.cpp index 49ec2f108..c236af732 100644 --- a/src/util/Conts.cpp +++ b/src/util/Conts.cpp @@ -15,7 +15,8 @@ limitations under the License. std::string Conts::JASMINEGRAPH_EXECUTABLE = "run.sh"; std::string Conts::JASMINEGRAPH_HOME = "JASMINEGRAPH_HOME"; - +std::string Conts::DIRECTED = "1"; +std::string Conts::UNDIRECTED = "0"; std::string Conts::CARRIAGE_RETURN_NEW_LINE = "\r\n"; std::string Conts::GRAPH_TYPE_RDF = "RDF_GRAPH"; diff --git a/src/util/Conts.h b/src/util/Conts.h index 79fb9234e..c26f77ed0 100644 --- a/src/util/Conts.h +++ b/src/util/Conts.h @@ -66,7 +66,8 @@ class Conts { static std::string GRAPH_WITH_TEXT_ATTRIBUTES; static std::string GRAPH_WITH_JSON_ATTRIBUTES; static std::string GRAPH_WITH_XML_ATTRIBUTES; - + static std::string DIRECTED; + static std::string UNDIRECTED; static std::string CARRIAGE_RETURN_NEW_LINE; static std::string From 9cb9d3dfb0a59b8cbccfc60b3c4e16f5bf071c21 Mon Sep 17 00:00:00 2001 From: thamindumk Date: Tue, 14 Jan 2025 17:31:08 +0530 Subject: [PATCH 4/4] Correct spelling error of the variables --- src/partitioner/stream/Partition.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/partitioner/stream/Partition.cpp b/src/partitioner/stream/Partition.cpp index a2f111439..e05f8442c 100644 --- a/src/partitioner/stream/Partition.cpp +++ b/src/partitioner/stream/Partition.cpp @@ -24,8 +24,8 @@ Logger streaming_partition_logger; // This addition is unidirectional , Add both items of the pair as keys void Partition::addEdge(std::pair edge, bool isDirected) { - auto exsistFirstVertext = this->edgeList.find(edge.first); - if (exsistFirstVertext != this->edgeList.end()) { + auto existFirstVertex = this->edgeList.find(edge.first); + if (existFirstVertex != this->edgeList.end()) { this->edgeList[edge.first].insert(edge.second); } else { this->edgeList[edge.first] = std::set({edge.second}); @@ -36,8 +36,8 @@ void Partition::addEdge(std::pair edge, bool isDirecte } if (!isDirected) { - auto exsistSecondVertext = this->edgeList.find(edge.second); - if (exsistSecondVertext != this->edgeList.end()) { + auto existSecondVertex = this->edgeList.find(edge.second); + if (existSecondVertex != this->edgeList.end()) { this->edgeList[edge.second].insert(edge.first); } else { this->edgeList[edge.second] = std::set({edge.first});