Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/rmgr #273

Merged
merged 8 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/server/JasmineGraphInstanceService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,14 @@
return status;
}

int deleteStreamingGraphPartition(std::string graphID, std::string partitionID) {
int status = 0;

Check warning on line 382 in src/server/JasmineGraphInstanceService.cpp

View check run for this annotation

Codecov / codecov/patch

src/server/JasmineGraphInstanceService.cpp#L381-L382

Added lines #L381 - L382 were not covered by tests
string partitionFilePathPattern = Utils::getJasmineGraphProperty("org.jasminegraph.server.instance.datafolder")
+ "/g" + graphID + "_p" + partitionID;
status |= Utils::deleteAllMatchingFiles(partitionFilePathPattern);
return status;
}

Check warning on line 387 in src/server/JasmineGraphInstanceService.cpp

View check run for this annotation

Codecov / codecov/patch

src/server/JasmineGraphInstanceService.cpp#L386-L387

Added lines #L386 - L387 were not covered by tests

/** Method for deleting all graph fragments given a graph ID
*
* @param graphID ID of graph fragments to be deleted in the instance
Expand Down Expand Up @@ -2153,6 +2161,7 @@
string partitionID = Utils::read_str_wrapper(connFd, data, INSTANCE_DATA_LENGTH, false);
instance_logger.info("Received partition ID: " + partitionID);
deleteGraphPartition(graphID, partitionID);
deleteStreamingGraphPartition(graphID, partitionID);
// pthread_mutex_lock(&file_lock);
// TODO :: Update catalog file
// pthread_mutex_unlock(&file_lock);
Expand Down
1 change: 1 addition & 0 deletions src/server/JasmineGraphInstanceService.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ limitations under the License.
void *instanceservicesession(void *dummyPt);
void writeCatalogRecord(string record);
int deleteGraphPartition(std::string graphID, std::string partitionID);
int deleteStreamingGraphPartition(std::string graphID, std::string partitionID);
void removeGraphFragments(std::string graphID);

map<long, long> calculateOutDegreeDist(string graphID, string partitionID, int serverPort,
Expand Down
60 changes: 58 additions & 2 deletions src/util/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#endif

unordered_map<std::string, std::string> Utils::propertiesMap;
std::mutex Utils::sqliteMutex;

std::vector<std::string> Utils::split(const std::string &s, char delimiter) {
std::vector<std::string> tokens;
Expand Down Expand Up @@ -259,10 +260,22 @@
int Utils::deleteDirectory(const std::string dirName) {
string command = "rm -rf " + dirName;
int status = system(command.c_str());
if (status == 0)
if (status == 0) {
util_logger.info(dirName + " deleted successfully");
else
} else {
util_logger.warn("Deleting " + dirName + " failed with exit code " + std::to_string(status));
}
return status;
}

Check warning on line 269 in src/util/Utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/Utils.cpp#L268-L269

Added lines #L268 - L269 were not covered by tests

int Utils::deleteAllMatchingFiles(const std::string fileNamePattern) {

Check warning on line 271 in src/util/Utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/Utils.cpp#L271

Added line #L271 was not covered by tests
std::string command = "rm -f " + fileNamePattern + "*";
int status = system(command.c_str());
if (status == 0)
miyurud marked this conversation as resolved.
Show resolved Hide resolved
util_logger.info("Deleted All files associated with: "+ fileNamePattern + "* successfully");
else
util_logger.warn("Deleting files associated with: " + fileNamePattern
+ "* failed with exit code " + std::to_string(status));
return status;
}

Expand Down Expand Up @@ -1117,3 +1130,46 @@
close(sockfd);
return true;
}

void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hostname, int port) {

Check warning on line 1134 in src/util/Utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/Utils.cpp#L1134

Added line #L1134 was not covered by tests
util_logger.info("Assigning partition: " + std::to_string(partitionIndex) + " to worker");
miyurud marked this conversation as resolved.
Show resolved Hide resolved
miyurud marked this conversation as resolved.
Show resolved Hide resolved

auto *sqlite = new SQLiteDBInterface();
sqlite->init();

string workerHost;

Check warning on line 1140 in src/util/Utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/Utils.cpp#L1140

Added line #L1140 was not covered by tests
if (hostname.find('@') != std::string::npos) {
workerHost = Utils::split(hostname, '@')[1];
}

sqliteMutex.lock();

try {
std::string workerSearchQuery =
"SELECT idworker FROM worker WHERE ip='" + workerHost +
"' AND server_port='" + std::to_string(port) + "'";

std::vector<std::vector<std::pair<std::string, std::string>>> results = sqlite->runSelect(workerSearchQuery);

if (results.empty()) {
util_logger.error("Worker not found : " + workerHost);
throw std::runtime_error("Worker not found");
}

std::string workerID = results[0][0].second;

std::string partitionToWorkerQuery =
"INSERT INTO worker_has_partition (partition_idpartition, partition_graph_idgraph, worker_idworker) "
"VALUES ('" + std::to_string(partitionIndex) + "','" + std::to_string(graphId)
+ "','" + workerID + "')";

sqlite->runInsert(partitionToWorkerQuery);
} catch (const std::exception &ex) {
util_logger.error("Error assigning partition to worker: " + std::string(ex.what()));
}

Check warning on line 1169 in src/util/Utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/Utils.cpp#L1169

Added line #L1169 was not covered by tests

sqlite->finalize();
sqliteMutex.unlock();

Check warning on line 1172 in src/util/Utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/Utils.cpp#L1172

Added line #L1172 was not covered by tests

delete sqlite;
}

Check warning on line 1175 in src/util/Utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/util/Utils.cpp#L1175

Added line #L1175 was not covered by tests
5 changes: 5 additions & 0 deletions src/util/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ using json = nlohmann::json;
class Utils {
private:
static unordered_map<std::string, std::string> propertiesMap;
static std::mutex sqliteMutex;

public:
struct worker {
Expand Down Expand Up @@ -82,6 +83,8 @@ class Utils {

static int deleteDirectory(const std::string dirName);

static int deleteAllMatchingFiles(const std::string fileNamePattern);

static std::string getFileName(std::string filePath);

static int getFileSize(std::string filePath);
Expand Down Expand Up @@ -117,6 +120,8 @@ class Utils {

static int connect_wrapper(int sock, const sockaddr *addr, socklen_t slen);

static void assignPartitionToWorker(int graphId, int partitionIndex, string hostname, int port);

/**
* Wrapper to recv(2) to read a string.
*
Expand Down
Loading