Skip to content

Commit

Permalink
Merge pull request #275 from thamindumk/fix/is_directed
Browse files Browse the repository at this point in the history
Get isDirected and update to the metaDB
  • Loading branch information
miyurud authored Jan 14, 2025
2 parents 6bc8c99 + 5bc4050 commit 4e6fbf0
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 20 deletions.
47 changes: 45 additions & 2 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,49 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
*loop_exit_p = true;
return;
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
string checkDirection = "Is this graph Directed (y/n)? ";
result_wr = write(connFd, checkDirection.c_str(), checkDirection.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
// Get user response.
char isDirected[FRONTEND_DATA_LENGTH + 1];
bzero(isDirected, FRONTEND_DATA_LENGTH + 1);
read(connFd, isDirected, FRONTEND_DATA_LENGTH);
string is_directed(isDirected);
is_directed = Utils::trim_copy(is_directed);
for (char &c : is_directed) {
c = tolower(c);
}
string direction;
if (is_directed == "y") {
direction = Conts::DIRECTED;
} else {
direction = Conts::UNDIRECTED;
}

string checkGraphType = "Graph type received";
result_wr = write(connFd, checkGraphType.c_str(), checkGraphType.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
// create kafka consumer and graph partitioner
kstream = new KafkaConnector(configs);
// Create the Partitioner object.
Expand All @@ -1072,9 +1115,9 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
string uploadStartTime = ctime(&time);
string sqlStatement =
"INSERT INTO graph (name,upload_path,upload_start_time,upload_end_time,graph_status_idgraph_status,"
"vertexcount,centralpartitioncount,edgecount) VALUES(\"" +
"vertexcount,centralpartitioncount,edgecount,is_directed) VALUES(\"" +
topic_name_s + "\", \"" + path + "\", \"" + uploadStartTime + "\", \"\",\"" +
to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\")";
to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\",\"" +direction+"\")";
int newGraphID = sqlite->runInsert(sqlStatement);

frontend_logger.info("Start listening to " + topic_name_s);
Expand Down
29 changes: 17 additions & 12 deletions src/partitioner/stream/Partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
Logger streaming_partition_logger;

// This addition is unidirectional , Add both items of the pair as keys
void Partition::addEdge(std::pair<std::string, std::string> edge) {
auto exsistFirstVertext = this->edgeList.find(edge.first);
if (exsistFirstVertext != this->edgeList.end()) {
void Partition::addEdge(std::pair<std::string, std::string> edge, bool isDirected) {
auto existFirstVertex = this->edgeList.find(edge.first);
if (existFirstVertex != this->edgeList.end()) {
this->edgeList[edge.first].insert(edge.second);
} else {
this->edgeList[edge.first] = std::set<std::string>({edge.second});
Expand All @@ -35,14 +35,16 @@ void Partition::addEdge(std::pair<std::string, std::string> edge) {
}
}

auto exsistSecondVertext = this->edgeList.find(edge.second);
if (exsistSecondVertext != this->edgeList.end()) {
this->edgeList[edge.second].insert(edge.first);
} else {
this->edgeList[edge.second] = std::set<std::string>({edge.first});
if (!isDirected) {
auto existSecondVertex = this->edgeList.find(edge.second);
if (existSecondVertex != this->edgeList.end()) {
this->edgeList[edge.second].insert(edge.first);
} else {
this->edgeList[edge.second] = std::set<std::string>({edge.first});

if (!isExistInEdgeCuts(edge.second)) {
this->vertexCount += 1;
if (!isExistInEdgeCuts(edge.second)) {
this->vertexCount += 1;
}
}
}
}
Expand All @@ -57,7 +59,7 @@ std::set<std::string> Partition::getNeighbors(std::string vertex) {

// The number of edges, the cardinality of E, is called the size of graph and denoted by |E|. We usually use m to denote
// the size of G.
double Partition::getEdgesCount() {
double Partition::getEdgesCount(bool isDirected) {
double total = 0;
std::set<std::string> uniqueEdges;
for (auto edge : this->edgeList) {
Expand All @@ -66,7 +68,10 @@ double Partition::getEdgesCount() {
uniqueEdges.insert(edge.first + vertext);
}
}
return uniqueEdges.size();
if (isDirected) {
return uniqueEdges.size();
}
return uniqueEdges.size()/2;
}

// The number of vertices, the cardinality of V, is called the order of graph and devoted by |V|. We usually use n to
Expand Down
4 changes: 2 additions & 2 deletions src/partitioner/stream/Partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ class Partition {
}
this->vertexCount = 0;
};
void addEdge(std::pair<std::string, std::string> edge);
void addEdge(std::pair<std::string, std::string> edge, bool isDirected = false);
std::set<std::string> getNeighbors(std::string);
double partitionScore(std::string vertex);
double getEdgesCount();
double getEdgesCount(bool isDirected = false);
double getVertextCount();
double getVertextCountQuick();
void addToEdgeCuts(std::string resident, std::string foreign, int partitionId);
Expand Down
16 changes: 13 additions & 3 deletions src/partitioner/stream/Partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ partitionedEdge Partitioner::hashPartitioning(std::pair<std::string, std::string
int secondIndex = std::hash<std::string>()(edge.second) % this->numberOfPartitions; // Hash partitioning

if (firstIndex == secondIndex) {
this->partitions[firstIndex].addEdge(edge);
this->partitions[firstIndex].addEdge(edge, this->getIsDirected());
} else {
this->partitions[firstIndex].addToEdgeCuts(edge.first, edge.second, secondIndex);
this->partitions[secondIndex].addToEdgeCuts(edge.second, edge.first, firstIndex);
Expand All @@ -121,7 +121,7 @@ void Partitioner::printStats() {
int id = 0;
for (auto partition : this->partitions) {
double vertexCount = partition.getVertextCount();
double edgesCount = partition.getEdgesCount();
double edgesCount = partition.getEdgesCount(this->getIsDirected());
double edgeCutsCount = partition.edgeCutsCount();
double edgeCutRatio = partition.edgeCutsRatio();
streaming_partitioner_logger.info(std::to_string(id) + " => Vertex count = " + std::to_string(vertexCount));
Expand All @@ -139,7 +139,7 @@ void Partitioner::updateMetaDB() {
double edgeCutsCount = 0;
for (auto partition : this->partitions) {
vertexCount += partition.getVertextCount();
edgesCount += partition.getEdgesCount();
edgesCount += partition.getEdgesCount(this->getIsDirected());
edgeCutsCount += partition.edgeCutsCount();
}
double numberOfEdges = edgesCount + edgeCutsCount/2;
Expand All @@ -150,6 +150,16 @@ void Partitioner::updateMetaDB() {
this->sqlite->runUpdate(sqlStatement);
streaming_partitioner_logger.info("Successfully updated metaDB");
}

bool Partitioner::getIsDirected() {
std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = '"+std::to_string(this->graphID)+"'";
auto result = this->sqlite->runSelect(sqlStatement);
if (result[0][0].second == "0") {
return false;
}
return true;
}

/**
* Greedy vertex assignment objectives of minimizing the number of cut edges
and balancing of the partition sizes. Assign the vertext to partition P that maximize the partition score
Expand Down
1 change: 1 addition & 0 deletions src/partitioner/stream/Partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Partitioner {
partitionedEdge ldgPartitioning(std::pair<std::string, std::string> edge);
static std::pair<long, long> deserialize(std::string data);
void updateMetaDB();
bool getIsDirected();
void setGraphID(int graphId){this->graphID = graphId;};
};

Expand Down
3 changes: 2 additions & 1 deletion src/util/Conts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ limitations under the License.

std::string Conts::JASMINEGRAPH_EXECUTABLE = "run.sh";
std::string Conts::JASMINEGRAPH_HOME = "JASMINEGRAPH_HOME";

std::string Conts::DIRECTED = "1";
std::string Conts::UNDIRECTED = "0";
std::string Conts::CARRIAGE_RETURN_NEW_LINE = "\r\n";
std::string Conts::TEMP_GRAPH_FILE_PATH = "/var/tmp/";
std::string Conts::GRAPH_TYPE_RDF = "RDF_GRAPH";
Expand Down
2 changes: 2 additions & 0 deletions src/util/Conts.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class Conts {
static std::string GRAPH_WITH_TEXT_ATTRIBUTES;
static std::string GRAPH_WITH_JSON_ATTRIBUTES;
static std::string GRAPH_WITH_XML_ATTRIBUTES;
static std::string DIRECTED;
static std::string UNDIRECTED;
static std::string TEMP_GRAPH_FILE_PATH;
static std::string CARRIAGE_RETURN_NEW_LINE;

Expand Down

0 comments on commit 4e6fbf0

Please sign in to comment.