Skip to content

Commit

Permalink
Fix kafka streaming vcnt and ecnt count
Browse files Browse the repository at this point in the history
  • Loading branch information
thamindumk committed Dec 11, 2024
1 parent bcbae83 commit 3c74c52
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 9 deletions.
6 changes: 3 additions & 3 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void *frontendservicesesion(void *dummyPt) {
std::string kafka_server_IP;
cppkafka::Configuration configs;
KafkaConnector *kstream;
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH);
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH, sqlite);

vector<DataPublisher *> workerClients;
bool workerClientsInitialized = false;
Expand Down Expand Up @@ -1229,13 +1229,13 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
// create kafka consumer and graph partitioner
kstream = new KafkaConnector(configs);
// Create the Partitioner object.
Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL);
Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL, sqlite);
// Create the KafkaConnector object.
kstream = new KafkaConnector(configs);
// Subscribe to the Kafka topic.
kstream->Subscribe(topic_name_s);
// Create the StreamHandler object.
StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients);
StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite);

string path = "kafka:\\" + topic_name_s + ":" + group_id;
std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now());
Expand Down
17 changes: 17 additions & 0 deletions src/partitioner/stream/Partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ void Partitioner::printStats() {
}
}

void Partitioner::updateMetaDB() {
double vertexCount = 0;
double edgesCount = 0;
double edgeCutsCount = 0;

Check warning on line 139 in src/partitioner/stream/Partitioner.cpp

View check run for this annotation

Codecov / codecov/patch

src/partitioner/stream/Partitioner.cpp#L136-L139

Added lines #L136 - L139 were not covered by tests
for (auto partition : this->partitions) {
vertexCount += partition.getVertextCount();
edgesCount += partition.getEdgesCount();
edgeCutsCount += partition.edgeCutsCount();
}
double numberOfEdges = edgesCount + edgeCutsCount/2;

Check warning on line 145 in src/partitioner/stream/Partitioner.cpp

View check run for this annotation

Codecov / codecov/patch

src/partitioner/stream/Partitioner.cpp#L144-L145

Added lines #L144 - L145 were not covered by tests
std::string sqlStatement = "UPDATE graph SET vertexcount = '" + std::to_string(vertexCount) +
"' ,centralpartitioncount = '" + std::to_string(this->numberOfPartitions) +
"' ,edgecount = '" + std::to_string(numberOfEdges) +
"' WHERE idgraph = '" + std::to_string(this->graphID) + "'";
this->sqlite->runUpdate(sqlStatement);
streaming_partitioner_logger.info("Successfully updated metaDB");
}

Check warning on line 152 in src/partitioner/stream/Partitioner.cpp

View check run for this annotation

Codecov / codecov/patch

src/partitioner/stream/Partitioner.cpp#L152

Added line #L152 was not covered by tests
/**
* Greedy vertex assignment objectives of minimizing the number of cut edges
and balancing of the partition sizes. Assign the vertext to partition P that maximize the partition score
Expand Down
8 changes: 6 additions & 2 deletions src/partitioner/stream/Partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <vector>

#include "./Partition.h"
#include "../../metadb/SQLiteDBInterface.h"

typedef std::vector<std::pair<std::string, long>> partitionedEdge;
namespace spt { // spt : Streaming Partitioner
Expand All @@ -29,12 +30,13 @@ class Partitioner {
long totalEdges = 0;
int graphID;
spt::Algorithms algorithmInUse;
SQLiteDBInterface *sqlite;
// perPartitionCap is : Number of vertices that can be store in this partition, This is a dynamic shared pointer
// containing a value depending on the whole graph size and # of partitions

public:
Partitioner(int numberOfPartitions, int graphID, spt::Algorithms alog)
: numberOfPartitions(numberOfPartitions), graphID(graphID), algorithmInUse(alog) {
Partitioner(int numberOfPartitions, int graphID, spt::Algorithms alog, SQLiteDBInterface* sqlite)
: numberOfPartitions(numberOfPartitions), graphID(graphID), algorithmInUse(alog), sqlite(sqlite) {

Check warning on line 39 in src/partitioner/stream/Partitioner.h

View check run for this annotation

Codecov / codecov/patch

src/partitioner/stream/Partitioner.h#L38-L39

Added lines #L38 - L39 were not covered by tests
for (size_t i = 0; i < numberOfPartitions; i++) {
this->partitions.push_back(Partition(i, numberOfPartitions));
};
Expand All @@ -47,6 +49,8 @@ class Partitioner {
partitionedEdge fennelPartitioning(std::pair<std::string, std::string> edge);
partitionedEdge ldgPartitioning(std::pair<std::string, std::string> edge);
static std::pair<long, long> deserialize(std::string data);
void updateMetaDB();
void setGraphID(int graphId){this->graphID = graphId;};

Check warning on line 53 in src/partitioner/stream/Partitioner.h

View check run for this annotation

Codecov / codecov/patch

src/partitioner/stream/Partitioner.h#L53

Added line #L53 was not covered by tests
};

#endif // !JASMINE_PARTITIONER_HEADER
10 changes: 7 additions & 3 deletions src/util/kafka/StreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ using namespace std::chrono;
Logger stream_handler_logger;

StreamHandler::StreamHandler(KafkaConnector *kstream, int numberOfPartitions,
vector<DataPublisher *> &workerClients)
vector<DataPublisher *> &workerClients, SQLiteDBInterface* sqlite)

Check warning on line 30 in src/util/kafka/StreamHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/kafka/StreamHandler.cpp#L30

Added line #L30 was not covered by tests
: kstream(kstream),
workerClients(workerClients),
graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH),
graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH, sqlite),

Check warning on line 33 in src/util/kafka/StreamHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/kafka/StreamHandler.cpp#L33

Added line #L33 was not covered by tests
stream_topic_name("stream_topic_name") { }


Expand Down Expand Up @@ -82,6 +82,10 @@ void StreamHandler::listen_to_kafka_topic() {
stream_handler_logger.error("Edge Rejected. Streaming edge should Include the Graph ID.");
continue;
}

auto prop = edgeJson["properties"];
auto graphID = std::string(prop["graphId"]);
graphPartitioner.setGraphID(stoi(graphID));
auto sourceJson = edgeJson["source"];
auto destinationJson = edgeJson["destination"];
string sId = std::string(sourceJson["id"]);
Expand Down Expand Up @@ -114,6 +118,6 @@ void StreamHandler::listen_to_kafka_topic() {
workerClients.at(temp_d)->publish(obj.dump());
}
}

graphPartitioner.updateMetaDB();

Check warning on line 121 in src/util/kafka/StreamHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/kafka/StreamHandler.cpp#L121

Added line #L121 was not covered by tests
graphPartitioner.printStats();
}
4 changes: 3 additions & 1 deletion src/util/kafka/StreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ limitations under the License.
#include "../../partitioner/stream/Partitioner.h"
#include "../logger/Logger.h"
#include "KafkaCC.h"
#include "../../metadb/SQLiteDBInterface.h"

class StreamHandler {
public:
StreamHandler(KafkaConnector *kstream, int numberOfPartitions, std::vector<DataPublisher *> &workerClients);
StreamHandler(KafkaConnector *kstream, int numberOfPartitions,
std::vector<DataPublisher *> &workerClients, SQLiteDBInterface* sqlite);
void listen_to_kafka_topic();
cppkafka::Message pollMessage();
bool isErrorInMessage(const cppkafka::Message &msg);
Expand Down

0 comments on commit 3c74c52

Please sign in to comment.