diff --git a/src/server/JasmineGraphInstanceService.cpp b/src/server/JasmineGraphInstanceService.cpp index 1e638048c..2a6990800 100644 --- a/src/server/JasmineGraphInstanceService.cpp +++ b/src/server/JasmineGraphInstanceService.cpp @@ -378,6 +378,14 @@ int deleteGraphPartition(std::string graphID, std::string partitionID) { return status; } +int deleteStreamingGraphPartition(std::string graphID, std::string partitionID) { + int status = 0; + string partitionFilePathPattern = Utils::getJasmineGraphProperty("org.jasminegraph.server.instance.datafolder") + + "/g" + graphID + "_p" + partitionID; + status |= Utils::deleteAllMatchingFiles(partitionFilePathPattern); + return status; +} + /** Method for deleting all graph fragments given a graph ID * * @param graphID ID of graph fragments to be deleted in the instance @@ -2153,6 +2161,7 @@ static void delete_graph_command(int connFd, bool *loop_exit_p) { string partitionID = Utils::read_str_wrapper(connFd, data, INSTANCE_DATA_LENGTH, false); instance_logger.info("Received partition ID: " + partitionID); deleteGraphPartition(graphID, partitionID); + deleteStreamingGraphPartition(graphID, partitionID); // pthread_mutex_lock(&file_lock); // TODO :: Update catalog file // pthread_mutex_unlock(&file_lock); diff --git a/src/server/JasmineGraphInstanceService.h b/src/server/JasmineGraphInstanceService.h index 86614ba28..337855ce2 100644 --- a/src/server/JasmineGraphInstanceService.h +++ b/src/server/JasmineGraphInstanceService.h @@ -49,6 +49,7 @@ limitations under the License. void *instanceservicesession(void *dummyPt); void writeCatalogRecord(string record); int deleteGraphPartition(std::string graphID, std::string partitionID); +int deleteStreamingGraphPartition(std::string graphID, std::string partitionID); void removeGraphFragments(std::string graphID); map<long, long> calculateOutDegreeDist(string graphID, string partitionID, int serverPort, diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index 07c1bbcf4..274f8fd54 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -42,6 +42,7 @@ int jasminegraph_profile = PROFILE_K8S; #endif unordered_map<std::string, std::string> Utils::propertiesMap; +std::mutex Utils::sqliteMutex; std::vector<std::string> Utils::split(const std::string &s, char delimiter) { std::vector<std::string> tokens; @@ -259,10 +260,23 @@ std::vector<std::string> Utils::getListOfFilesInDirectory(std::string dirName) { int Utils::deleteDirectory(const std::string dirName) { string command = "rm -rf " + dirName; int status = system(command.c_str()); - if (status == 0) + if (status == 0) { util_logger.info(dirName + " deleted successfully"); - else + } else { util_logger.warn("Deleting " + dirName + " failed with exit code " + std::to_string(status)); + } + return status; +} + +int Utils::deleteAllMatchingFiles(const std::string fileNamePattern) { + std::string command = "rm -f " + fileNamePattern + "*"; + int status = system(command.c_str()); + if (status == 0) { + util_logger.info("Deleted All files associated with: " + fileNamePattern + "* successfully"); + } else { + util_logger.warn("Deleting files associated with: " + fileNamePattern + + "* failed with exit code " + std::to_string(status)); + } return status; } @@ -1117,3 +1131,47 @@ bool Utils::transferPartition(std::string sourceWorker, int sourceWorkerPort, st close(sockfd); return true; } + +void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hostname, int port) { + util_logger.debug("Assigning graph ID: " + std::to_string(graphId) + " partition: " + + std::to_string(partitionIndex) + " to worker"); + + auto *sqlite = new SQLiteDBInterface(); + sqlite->init(); + + string workerHost; + if (hostname.find('@') != std::string::npos) { + workerHost = Utils::split(hostname, '@')[1]; + } + + sqliteMutex.lock(); + + try { + std::string workerSearchQuery = + "SELECT idworker FROM worker WHERE ip='" + workerHost + + "' AND server_port='" + std::to_string(port) + "'"; + + std::vector<std::vector<std::pair<std::string, std::string>>> results = sqlite->runSelect(workerSearchQuery); + + if (results.empty()) { + util_logger.error("Worker not found : " + workerHost); + throw std::runtime_error("Worker not found"); + } + + std::string workerID = results[0][0].second; + + std::string partitionToWorkerQuery = + "INSERT INTO worker_has_partition (partition_idpartition, partition_graph_idgraph, worker_idworker) " + "VALUES ('" + std::to_string(partitionIndex) + "','" + std::to_string(graphId) + + "','" + workerID + "')"; + + sqlite->runInsert(partitionToWorkerQuery); + } catch (const std::exception &ex) { + util_logger.error("Error assigning partition to worker: " + std::string(ex.what())); + } + + sqlite->finalize(); + sqliteMutex.unlock(); + + delete sqlite; +} diff --git a/src/util/Utils.h b/src/util/Utils.h index f8b600914..05a0bcfab 100644 --- a/src/util/Utils.h +++ b/src/util/Utils.h @@ -36,6 +36,7 @@ using json = nlohmann::json; class Utils { private: static unordered_map<std::string, std::string> propertiesMap; + static std::mutex sqliteMutex; public: struct worker { @@ -82,6 +83,8 @@ class Utils { static int deleteDirectory(const std::string dirName); + static int deleteAllMatchingFiles(const std::string fileNamePattern); + static std::string getFileName(std::string filePath); static int getFileSize(std::string filePath); @@ -117,6 +120,8 @@ class Utils { static int connect_wrapper(int sock, const sockaddr *addr, socklen_t slen); + static void assignPartitionToWorker(int graphId, int partitionIndex, string hostname, int port); + /** * Wrapper to recv(2) to read a string. *