From 128f7b39f187f5afe3a8c5797fb0a06e3d8b7dbc Mon Sep 17 00:00:00 2001 From: muthumala Date: Fri, 10 Jan 2025 15:36:02 +0530 Subject: [PATCH 1/8] Fix localstore file deletion issue rmgr command --- src/server/JasmineGraphInstanceService.cpp | 9 ++++ src/server/JasmineGraphInstanceService.h | 1 + src/util/Utils.cpp | 53 ++++++++++++++++++++++ src/util/Utils.h | 7 ++- 4 files changed, 69 insertions(+), 1 deletion(-) diff --git a/src/server/JasmineGraphInstanceService.cpp b/src/server/JasmineGraphInstanceService.cpp index 1e638048c..bd2ea4566 100644 --- a/src/server/JasmineGraphInstanceService.cpp +++ b/src/server/JasmineGraphInstanceService.cpp @@ -378,6 +378,14 @@ int deleteGraphPartition(std::string graphID, std::string partitionID) { return status; } +int deleteStreamingGraphPartition(std::string graphID, std::string partitionID){ + int status = 0; + string partitionFilePathPattern = Utils::getJasmineGraphProperty("org.jasminegraph.server.instance.datafolder") + + "/g" + graphID + "_p" + partitionID; + status |= Utils::deleteAllMatchingFiles(partitionFilePathPattern); + return status; +} + /** Method for deleting all graph fragments given a graph ID * * @param graphID ID of graph fragments to be deleted in the instance @@ -2153,6 +2161,7 @@ static void delete_graph_command(int connFd, bool *loop_exit_p) { string partitionID = Utils::read_str_wrapper(connFd, data, INSTANCE_DATA_LENGTH, false); instance_logger.info("Received partition ID: " + partitionID); deleteGraphPartition(graphID, partitionID); + deleteStreamingGraphPartition(graphID, partitionID); // pthread_mutex_lock(&file_lock); // TODO :: Update catalog file // pthread_mutex_unlock(&file_lock); diff --git a/src/server/JasmineGraphInstanceService.h b/src/server/JasmineGraphInstanceService.h index 86614ba28..337855ce2 100644 --- a/src/server/JasmineGraphInstanceService.h +++ b/src/server/JasmineGraphInstanceService.h @@ -49,6 +49,7 @@ limitations under the License. void *instanceservicesession(void *dummyPt); void writeCatalogRecord(string record); int deleteGraphPartition(std::string graphID, std::string partitionID); +int deleteStreamingGraphPartition(std::string graphID, std::string partitionID); void removeGraphFragments(std::string graphID); map calculateOutDegreeDist(string graphID, string partitionID, int serverPort, diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index 07c1bbcf4..3da8bd4e6 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -42,6 +42,7 @@ int jasminegraph_profile = PROFILE_K8S; #endif unordered_map Utils::propertiesMap; +std::mutex Utils::sqliteMutex; std::vector Utils::split(const std::string &s, char delimiter) { std::vector tokens; @@ -266,6 +267,16 @@ int Utils::deleteDirectory(const std::string dirName) { return status; } +int Utils::deleteAllMatchingFiles(const std::string fileNamePattern){ + std::string command = "rm -f " + fileNamePattern + "*"; // Use -f to force deletion + int status = system(command.c_str()); + if (status == 0) + util_logger.info("Deleted All files associated with: "+ fileNamePattern + "* successfully"); + else + util_logger.warn("Deleting files associated with: " + fileNamePattern + "* failed with exit code " + std::to_string(status)); + return status; +} + bool Utils::is_number(const std::string &compareString) { return !compareString.empty() && std::find_if(compareString.begin(), compareString.end(), [](char c) { return !std::isdigit(c); }) == compareString.end(); @@ -1117,3 +1128,45 @@ bool Utils::transferPartition(std::string sourceWorker, int sourceWorkerPort, st close(sockfd); return true; } + +void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hostname, int port) { + util_logger.info("Assigning partition: " + std::to_string(partitionIndex) + " to worker"); + + auto *sqlite = new SQLiteDBInterface(); + sqlite->init(); + + string workerHost; + if (hostname.find('@') != std::string::npos) { + workerHost = Utils::split(hostname, '@')[1]; + } + + sqliteMutex.lock(); + + try { + std::string workerSearchQuery = + "SELECT idworker FROM worker WHERE ip='" + workerHost + + "' AND server_port='" + std::to_string(port) + "'"; + + std::vector>> results = sqlite->runSelect(workerSearchQuery); + + if (results.empty()) { + util_logger.error("Worker not found in database: " + workerHost); + throw std::runtime_error("Worker not found in database"); + } + + std::string workerID = results[0][0].second; + + std::string partitionToWorkerQuery = + "INSERT INTO worker_has_partition (partition_idpartition, partition_graph_idgraph, worker_idworker) VALUES " + "('" + std::to_string(partitionIndex) + "','" + std::to_string(graphId) + "','" + workerID + "')"; + + sqlite->runInsert(partitionToWorkerQuery); + } catch (const std::exception &ex) { + util_logger.error("Error assigning partition to worker: " + std::string(ex.what())); + } + + sqlite->finalize(); + sqliteMutex.unlock(); + + delete sqlite; +} diff --git a/src/util/Utils.h b/src/util/Utils.h index f8b600914..cb8a5dc39 100644 --- a/src/util/Utils.h +++ b/src/util/Utils.h @@ -36,8 +36,9 @@ using json = nlohmann::json; class Utils { private: static unordered_map propertiesMap; + static std::mutex sqliteMutex; - public: +public: struct worker { std::string workerID; std::string hostname; @@ -82,6 +83,8 @@ class Utils { static int deleteDirectory(const std::string dirName); + static int deleteAllMatchingFiles(const std::string fileNamePattern); + static std::string getFileName(std::string filePath); static int getFileSize(std::string filePath); @@ -117,6 +120,8 @@ class Utils { static int connect_wrapper(int sock, const sockaddr *addr, socklen_t slen); + static void assignPartitionToWorker(int graphId, int partitionIndex, string hostname,int port); + /** * Wrapper to recv(2) to read a string. * From d86859603807bab63a66ff694b10298b33231caf Mon Sep 17 00:00:00 2001 From: muthumala Date: Sun, 12 Jan 2025 07:11:45 +0530 Subject: [PATCH 2/8] Fix cpplint issues --- src/server/JasmineGraphInstanceService.cpp | 2 +- src/util/Utils.cpp | 12 +++++++----- src/util/Utils.h | 4 ++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/server/JasmineGraphInstanceService.cpp b/src/server/JasmineGraphInstanceService.cpp index bd2ea4566..2a6990800 100644 --- a/src/server/JasmineGraphInstanceService.cpp +++ b/src/server/JasmineGraphInstanceService.cpp @@ -378,7 +378,7 @@ int deleteGraphPartition(std::string graphID, std::string partitionID) { return status; } -int deleteStreamingGraphPartition(std::string graphID, std::string partitionID){ +int deleteStreamingGraphPartition(std::string graphID, std::string partitionID) { int status = 0; string partitionFilePathPattern = Utils::getJasmineGraphProperty("org.jasminegraph.server.instance.datafolder") + "/g" + graphID + "_p" + partitionID; diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index 3da8bd4e6..5dbf31761 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -267,13 +267,14 @@ int Utils::deleteDirectory(const std::string dirName) { return status; } -int Utils::deleteAllMatchingFiles(const std::string fileNamePattern){ - std::string command = "rm -f " + fileNamePattern + "*"; // Use -f to force deletion +int Utils::deleteAllMatchingFiles(const std::string fileNamePattern) { + std::string command = "rm -f " + fileNamePattern + "*"; int status = system(command.c_str()); if (status == 0) util_logger.info("Deleted All files associated with: "+ fileNamePattern + "* successfully"); else - util_logger.warn("Deleting files associated with: " + fileNamePattern + "* failed with exit code " + std::to_string(status)); + util_logger.warn("Deleting files associated with: " + fileNamePattern + + "* failed with exit code " + std::to_string(status)); return status; } @@ -1157,8 +1158,9 @@ void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hos std::string workerID = results[0][0].second; std::string partitionToWorkerQuery = - "INSERT INTO worker_has_partition (partition_idpartition, partition_graph_idgraph, worker_idworker) VALUES " - "('" + std::to_string(partitionIndex) + "','" + std::to_string(graphId) + "','" + workerID + "')"; + "INSERT INTO worker_has_partition (partition_idpartition, partition_graph_idgraph, worker_idworker) " + "VALUES ('" + std::to_string(partitionIndex) + "','" + std::to_string(graphId) + + "','" + workerID + "')"; sqlite->runInsert(partitionToWorkerQuery); } catch (const std::exception &ex) { diff --git a/src/util/Utils.h b/src/util/Utils.h index cb8a5dc39..05a0bcfab 100644 --- a/src/util/Utils.h +++ b/src/util/Utils.h @@ -38,7 +38,7 @@ class Utils { static unordered_map propertiesMap; static std::mutex sqliteMutex; -public: + public: struct worker { std::string workerID; std::string hostname; @@ -120,7 +120,7 @@ class Utils { static int connect_wrapper(int sock, const sockaddr *addr, socklen_t slen); - static void assignPartitionToWorker(int graphId, int partitionIndex, string hostname,int port); + static void assignPartitionToWorker(int graphId, int partitionIndex, string hostname, int port); /** * Wrapper to recv(2) to read a string. From 11cbb627cb5387b396a2c2aa6901cf38ffed0d20 Mon Sep 17 00:00:00 2001 From: muthumala Date: Sun, 12 Jan 2025 10:21:06 +0530 Subject: [PATCH 3/8] Enclose if else with curly brackets --- src/util/Utils.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index 5dbf31761..6d7ab9eb0 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -260,10 +260,11 @@ std::vector Utils::getListOfFilesInDirectory(std::string dirName) { int Utils::deleteDirectory(const std::string dirName) { string command = "rm -rf " + dirName; int status = system(command.c_str()); - if (status == 0) + if (status == 0) { util_logger.info(dirName + " deleted successfully"); - else + } else { util_logger.warn("Deleting " + dirName + " failed with exit code " + std::to_string(status)); + } return status; } From 26f9728e98725e86a1bdc4db16249b5135669948 Mon Sep 17 00:00:00 2001 From: Pasindu Muthumala <102838889+muthumala19@users.noreply.github.com> Date: Sun, 12 Jan 2025 10:18:36 +0530 Subject: [PATCH 4/8] Update worker not found error message Co-authored-by: Miyuru Dayarathna --- src/util/Utils.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index 6d7ab9eb0..152353cdb 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -1153,7 +1153,7 @@ void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hos if (results.empty()) { util_logger.error("Worker not found in database: " + workerHost); - throw std::runtime_error("Worker not found in database"); + throw std::runtime_error("Worker not found"); } std::string workerID = results[0][0].second; From c36b7b495a29144ed17e48d8f2d42f634968d71a Mon Sep 17 00:00:00 2001 From: Pasindu Muthumala <102838889+muthumala19@users.noreply.github.com> Date: Sun, 12 Jan 2025 10:19:04 +0530 Subject: [PATCH 5/8] Update worker not found error message Co-authored-by: Miyuru Dayarathna --- src/util/Utils.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index 152353cdb..61c649db0 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -1152,7 +1152,7 @@ void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hos std::vector>> results = sqlite->runSelect(workerSearchQuery); if (results.empty()) { - util_logger.error("Worker not found in database: " + workerHost); + util_logger.error("Worker not found : " + workerHost); throw std::runtime_error("Worker not found"); } From 1625c9a33aa9d2244aa33caffeaab87b800719cd Mon Sep 17 00:00:00 2001 From: Pasindu Muthumala <102838889+muthumala19@users.noreply.github.com> Date: Mon, 13 Jan 2025 22:08:42 +0530 Subject: [PATCH 6/8] Update log when assigning partition to worker Co-authored-by: Miyuru Dayarathna --- src/util/Utils.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index 61c649db0..01a2f6d0e 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -1132,7 +1132,7 @@ bool Utils::transferPartition(std::string sourceWorker, int sourceWorkerPort, st } void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hostname, int port) { - util_logger.info("Assigning partition: " + std::to_string(partitionIndex) + " to worker"); + util_logger.info("Assigning graph ID: " + std::to_string(graphId) + " partition: " + std::to_string(partitionIndex) + " to worker"); auto *sqlite = new SQLiteDBInterface(); sqlite->init(); From 8e8d0788bf24d57aa7de416021c59119b5423ed3 Mon Sep 17 00:00:00 2001 From: muthumala Date: Mon, 13 Jan 2025 22:16:31 +0530 Subject: [PATCH 7/8] Enclose if statement with brackets --- src/util/Utils.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index 01a2f6d0e..23a36116f 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -271,11 +271,12 @@ int Utils::deleteDirectory(const std::string dirName) { int Utils::deleteAllMatchingFiles(const std::string fileNamePattern) { std::string command = "rm -f " + fileNamePattern + "*"; int status = system(command.c_str()); - if (status == 0) - util_logger.info("Deleted All files associated with: "+ fileNamePattern + "* successfully"); - else + if (status == 0) { + util_logger.info("Deleted All files associated with: " + fileNamePattern + "* successfully"); + } else { util_logger.warn("Deleting files associated with: " + fileNamePattern - + "* failed with exit code " + std::to_string(status)); + + "* failed with exit code " + std::to_string(status)); + } return status; } @@ -1132,7 +1133,7 @@ bool Utils::transferPartition(std::string sourceWorker, int sourceWorkerPort, st } void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hostname, int port) { - util_logger.info("Assigning graph ID: " + std::to_string(graphId) + " partition: " + std::to_string(partitionIndex) + " to worker"); + util_logger.debug("Assigning graph ID: " + std::to_string(graphId) + " partition: " + std::to_string(partitionIndex) + " to worker"); auto *sqlite = new SQLiteDBInterface(); sqlite->init(); From 1ee92ef993eeb66d69fd8b22865e272c121dcbdd Mon Sep 17 00:00:00 2001 From: muthumala Date: Tue, 14 Jan 2025 11:47:47 +0530 Subject: [PATCH 8/8] Fix cpplint issues --- src/util/Utils.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index 23a36116f..274f8fd54 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -1133,7 +1133,8 @@ bool Utils::transferPartition(std::string sourceWorker, int sourceWorkerPort, st } void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hostname, int port) { - util_logger.debug("Assigning graph ID: " + std::to_string(graphId) + " partition: " + std::to_string(partitionIndex) + " to worker"); + util_logger.debug("Assigning graph ID: " + std::to_string(graphId) + " partition: " + + std::to_string(partitionIndex) + " to worker"); auto *sqlite = new SQLiteDBInterface(); sqlite->init();