From 3c74c527b5295dd410dc1fa6e7e9ad8fe5628a18 Mon Sep 17 00:00:00 2001 From: thamindumk Date: Wed, 11 Dec 2024 11:11:22 +0530 Subject: [PATCH] Fix kafka streaming vcnt and ecnt count --- src/frontend/JasmineGraphFrontEnd.cpp | 6 +++--- src/partitioner/stream/Partitioner.cpp | 17 +++++++++++++++++ src/partitioner/stream/Partitioner.h | 8 ++++++-- src/util/kafka/StreamHandler.cpp | 10 +++++++--- src/util/kafka/StreamHandler.h | 4 +++- 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index d9d02bdd8..ba2afb672 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -134,7 +134,7 @@ void *frontendservicesesion(void *dummyPt) { std::string kafka_server_IP; cppkafka::Configuration configs; KafkaConnector *kstream; - Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH); + Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH, sqlite); vector workerClients; bool workerClientsInitialized = false; @@ -1229,13 +1229,13 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c // create kafka consumer and graph partitioner kstream = new KafkaConnector(configs); // Create the Partitioner object. - Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL); + Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL, sqlite); // Create the KafkaConnector object. kstream = new KafkaConnector(configs); // Subscribe to the Kafka topic. kstream->Subscribe(topic_name_s); // Create the StreamHandler object. - StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients); + StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite); string path = "kafka:\\" + topic_name_s + ":" + group_id; std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now()); diff --git a/src/partitioner/stream/Partitioner.cpp b/src/partitioner/stream/Partitioner.cpp index 7eff8eefc..5638f5489 100644 --- a/src/partitioner/stream/Partitioner.cpp +++ b/src/partitioner/stream/Partitioner.cpp @@ -133,6 +133,23 @@ void Partitioner::printStats() { } } +void Partitioner::updateMetaDB() { + double vertexCount = 0; + double edgesCount = 0; + double edgeCutsCount = 0; + for (auto partition : this->partitions) { + vertexCount += partition.getVertextCount(); + edgesCount += partition.getEdgesCount(); + edgeCutsCount += partition.edgeCutsCount(); + } + double numberOfEdges = edgesCount + edgeCutsCount/2; + std::string sqlStatement = "UPDATE graph SET vertexcount = '" + std::to_string(vertexCount) + + "' ,centralpartitioncount = '" + std::to_string(this->numberOfPartitions) + + "' ,edgecount = '" + std::to_string(numberOfEdges) + + "' WHERE idgraph = '" + std::to_string(this->graphID) + "'"; + this->sqlite->runUpdate(sqlStatement); + streaming_partitioner_logger.info("Successfully updated metaDB"); +} /** * Greedy vertex assignment objectives of minimizing the number of cut edges and balancing of the partition sizes. Assign the vertext to partition P that maximize the partition score diff --git a/src/partitioner/stream/Partitioner.h b/src/partitioner/stream/Partitioner.h index 55b1ad5ab..4ba75e3d0 100644 --- a/src/partitioner/stream/Partitioner.h +++ b/src/partitioner/stream/Partitioner.h @@ -15,6 +15,7 @@ #include #include "./Partition.h" +#include "../../metadb/SQLiteDBInterface.h" typedef std::vector> partitionedEdge; namespace spt { // spt : Streaming Partitioner @@ -29,12 +30,13 @@ class Partitioner { long totalEdges = 0; int graphID; spt::Algorithms algorithmInUse; + SQLiteDBInterface *sqlite; // perPartitionCap is : Number of vertices that can be store in this partition, This is a dynamic shared pointer // containing a value depending on the whole graph size and # of partitions public: - Partitioner(int numberOfPartitions, int graphID, spt::Algorithms alog) - : numberOfPartitions(numberOfPartitions), graphID(graphID), algorithmInUse(alog) { + Partitioner(int numberOfPartitions, int graphID, spt::Algorithms alog, SQLiteDBInterface* sqlite) + : numberOfPartitions(numberOfPartitions), graphID(graphID), algorithmInUse(alog), sqlite(sqlite) { for (size_t i = 0; i < numberOfPartitions; i++) { this->partitions.push_back(Partition(i, numberOfPartitions)); }; @@ -47,6 +49,8 @@ class Partitioner { partitionedEdge fennelPartitioning(std::pair edge); partitionedEdge ldgPartitioning(std::pair edge); static std::pair deserialize(std::string data); + void updateMetaDB(); + void setGraphID(int graphId){this->graphID = graphId;}; }; #endif // !JASMINE_PARTITIONER_HEADER diff --git a/src/util/kafka/StreamHandler.cpp b/src/util/kafka/StreamHandler.cpp index bf788ea94..2b00edc40 100644 --- a/src/util/kafka/StreamHandler.cpp +++ b/src/util/kafka/StreamHandler.cpp @@ -27,10 +27,10 @@ using namespace std::chrono; Logger stream_handler_logger; StreamHandler::StreamHandler(KafkaConnector *kstream, int numberOfPartitions, - vector &workerClients) + vector &workerClients, SQLiteDBInterface* sqlite) : kstream(kstream), workerClients(workerClients), - graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH), + graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH, sqlite), stream_topic_name("stream_topic_name") { } @@ -82,6 +82,10 @@ void StreamHandler::listen_to_kafka_topic() { stream_handler_logger.error("Edge Rejected. Streaming edge should Include the Graph ID."); continue; } + + auto prop = edgeJson["properties"]; + auto graphID = std::string(prop["graphId"]); + graphPartitioner.setGraphID(stoi(graphID)); auto sourceJson = edgeJson["source"]; auto destinationJson = edgeJson["destination"]; string sId = std::string(sourceJson["id"]); @@ -114,6 +118,6 @@ void StreamHandler::listen_to_kafka_topic() { workerClients.at(temp_d)->publish(obj.dump()); } } - + graphPartitioner.updateMetaDB(); graphPartitioner.printStats(); } diff --git a/src/util/kafka/StreamHandler.h b/src/util/kafka/StreamHandler.h index 7190be4f0..6370fcda8 100644 --- a/src/util/kafka/StreamHandler.h +++ b/src/util/kafka/StreamHandler.h @@ -20,10 +20,12 @@ limitations under the License. #include "../../partitioner/stream/Partitioner.h" #include "../logger/Logger.h" #include "KafkaCC.h" +#include "../../metadb/SQLiteDBInterface.h" class StreamHandler { public: - StreamHandler(KafkaConnector *kstream, int numberOfPartitions, std::vector &workerClients); + StreamHandler(KafkaConnector *kstream, int numberOfPartitions, + std::vector &workerClients, SQLiteDBInterface* sqlite); void listen_to_kafka_topic(); cppkafka::Message pollMessage(); bool isErrorInMessage(const cppkafka::Message &msg);