Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kafka streaming vcnt and ecnt count #264

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void *frontendservicesesion(void *dummyPt) {
std::string kafka_server_IP;
cppkafka::Configuration configs;
KafkaConnector *kstream;
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH);
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH, sqlite);

vector<DataPublisher *> workerClients;
bool workerClientsInitialized = false;
Expand Down Expand Up @@ -1229,13 +1229,13 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
// create kafka consumer and graph partitioner
kstream = new KafkaConnector(configs);
// Create the Partitioner object.
Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL);
Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL, sqlite);
// Create the KafkaConnector object.
kstream = new KafkaConnector(configs);
// Subscribe to the Kafka topic.
kstream->Subscribe(topic_name_s);
// Create the StreamHandler object.
StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients);
StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite);

string path = "kafka:\\" + topic_name_s + ":" + group_id;
std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now());
Expand Down
17 changes: 17 additions & 0 deletions src/partitioner/stream/Partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@
}
}

void Partitioner::updateMetaDB() {
double vertexCount = 0;
double edgesCount = 0;
double edgeCutsCount = 0;

Check warning on line 139 in src/partitioner/stream/Partitioner.cpp

View check run for this annotation

Codecov / codecov/patch

src/partitioner/stream/Partitioner.cpp#L136-L139

Added lines #L136 - L139 were not covered by tests
for (auto partition : this->partitions) {
vertexCount += partition.getVertextCount();
edgesCount += partition.getEdgesCount();
edgeCutsCount += partition.edgeCutsCount();
}
double numberOfEdges = edgesCount + edgeCutsCount/2;

Check warning on line 145 in src/partitioner/stream/Partitioner.cpp

View check run for this annotation

Codecov / codecov/patch

src/partitioner/stream/Partitioner.cpp#L144-L145

Added lines #L144 - L145 were not covered by tests
std::string sqlStatement = "UPDATE graph SET vertexcount = '" + std::to_string(vertexCount) +
"' ,centralpartitioncount = '" + std::to_string(this->numberOfPartitions) +
"' ,edgecount = '" + std::to_string(numberOfEdges) +
"' WHERE idgraph = '" + std::to_string(this->graphID) + "'";
this->sqlite->runUpdate(sqlStatement);
streaming_partitioner_logger.info("Successfully updated metaDB");
}

Check warning on line 152 in src/partitioner/stream/Partitioner.cpp

View check run for this annotation

Codecov / codecov/patch

src/partitioner/stream/Partitioner.cpp#L152

Added line #L152 was not covered by tests
/**
* Greedy vertex assignment objectives of minimizing the number of cut edges
and balancing of the partition sizes. Assign the vertext to partition P that maximize the partition score
Expand Down
8 changes: 6 additions & 2 deletions src/partitioner/stream/Partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <vector>

#include "./Partition.h"
#include "../../metadb/SQLiteDBInterface.h"

typedef std::vector<std::pair<std::string, long>> partitionedEdge;
namespace spt { // spt : Streaming Partitioner
Expand All @@ -29,12 +30,13 @@
long totalEdges = 0;
int graphID;
spt::Algorithms algorithmInUse;
SQLiteDBInterface *sqlite;
// perPartitionCap is : Number of vertices that can be store in this partition, This is a dynamic shared pointer
// containing a value depending on the whole graph size and # of partitions

public:
Partitioner(int numberOfPartitions, int graphID, spt::Algorithms alog)
: numberOfPartitions(numberOfPartitions), graphID(graphID), algorithmInUse(alog) {
Partitioner(int numberOfPartitions, int graphID, spt::Algorithms alog, SQLiteDBInterface* sqlite)
: numberOfPartitions(numberOfPartitions), graphID(graphID), algorithmInUse(alog), sqlite(sqlite) {

Check warning on line 39 in src/partitioner/stream/Partitioner.h

View check run for this annotation

Codecov / codecov/patch

src/partitioner/stream/Partitioner.h#L38-L39

Added lines #L38 - L39 were not covered by tests
for (size_t i = 0; i < numberOfPartitions; i++) {
this->partitions.push_back(Partition(i, numberOfPartitions));
};
Expand All @@ -47,6 +49,8 @@
partitionedEdge fennelPartitioning(std::pair<std::string, std::string> edge);
partitionedEdge ldgPartitioning(std::pair<std::string, std::string> edge);
static std::pair<long, long> deserialize(std::string data);
void updateMetaDB();
void setGraphID(int graphId){this->graphID = graphId;};

Check warning on line 53 in src/partitioner/stream/Partitioner.h

View check run for this annotation

Codecov / codecov/patch

src/partitioner/stream/Partitioner.h#L53

Added line #L53 was not covered by tests
};

#endif // !JASMINE_PARTITIONER_HEADER
10 changes: 7 additions & 3 deletions src/util/kafka/StreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
Logger stream_handler_logger;

StreamHandler::StreamHandler(KafkaConnector *kstream, int numberOfPartitions,
vector<DataPublisher *> &workerClients)
vector<DataPublisher *> &workerClients, SQLiteDBInterface* sqlite)

Check warning on line 30 in src/util/kafka/StreamHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/kafka/StreamHandler.cpp#L30

Added line #L30 was not covered by tests
: kstream(kstream),
workerClients(workerClients),
graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH),
graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH, sqlite),

Check warning on line 33 in src/util/kafka/StreamHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/kafka/StreamHandler.cpp#L33

Added line #L33 was not covered by tests
stream_topic_name("stream_topic_name") { }


Expand Down Expand Up @@ -82,6 +82,10 @@
stream_handler_logger.error("Edge Rejected. Streaming edge should Include the Graph ID.");
continue;
}

auto prop = edgeJson["properties"];
auto graphID = std::string(prop["graphId"]);
graphPartitioner.setGraphID(stoi(graphID));
auto sourceJson = edgeJson["source"];
auto destinationJson = edgeJson["destination"];
string sId = std::string(sourceJson["id"]);
Expand Down Expand Up @@ -114,6 +118,6 @@
workerClients.at(temp_d)->publish(obj.dump());
}
}

graphPartitioner.updateMetaDB();

Check warning on line 121 in src/util/kafka/StreamHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/kafka/StreamHandler.cpp#L121

Added line #L121 was not covered by tests
graphPartitioner.printStats();
}
4 changes: 3 additions & 1 deletion src/util/kafka/StreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ limitations under the License.
#include "../../partitioner/stream/Partitioner.h"
#include "../logger/Logger.h"
#include "KafkaCC.h"
#include "../../metadb/SQLiteDBInterface.h"

class StreamHandler {
public:
StreamHandler(KafkaConnector *kstream, int numberOfPartitions, std::vector<DataPublisher *> &workerClients);
StreamHandler(KafkaConnector *kstream, int numberOfPartitions,
std::vector<DataPublisher *> &workerClients, SQLiteDBInterface* sqlite);
void listen_to_kafka_topic();
cppkafka::Message pollMessage();
bool isErrorInMessage(const cppkafka::Message &msg);
Expand Down
Loading