Skip to content

Commit

Permalink
Merge pull request #228 from FYP-Auto-Scale-JasmineGraph/wrap_recv
Browse files Browse the repository at this point in the history
Use wrapper functions in JasmineGraphServer
  • Loading branch information
miyurud authored Jan 25, 2024
2 parents 0ed821a + a491df3 commit daeda1c
Show file tree
Hide file tree
Showing 11 changed files with 1,708 additions and 3,052 deletions.
24 changes: 8 additions & 16 deletions src/backend/JasmineGraphBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ void *backendservicesesion(void *dummyPt) {
backendservicesessionargs *sessionargs = (backendservicesessionargs *)dummyPt;
int connFd = sessionargs->connFd;
SQLiteDBInterface *sqLiteDbInterface = sessionargs->sqlite;
delete sessionargs;
backend_logger.log("Thread No: " + to_string(pthread_self()), "info");
char data[301];
bzero(data, 301);
Expand Down Expand Up @@ -132,35 +133,26 @@ int JasmineGraphBackend::run() {
}

listen(listenFd, 10);
pthread_t threadA[workerCount];

len = sizeof(clntAdd);

int noThread = 0;

while (noThread < workerCount) {
while (true) {
backend_logger.log("Backend Listening", "info");

// this is where client connects. svr will hang in this mode until client conn
int connFd = accept(listenFd, (struct sockaddr *)&clntAdd, &len);

if (connFd < 0) {
backend_logger.log("Cannot accept connection", "error");
return 0;
continue;
}
backend_logger.log("Connection successful", "info");

struct backendservicesessionargs backendservicesessionargs1;
backendservicesessionargs1.sqlite = this->sqlite;
backendservicesessionargs1.connFd = connFd;

pthread_create(&threadA[noThread], NULL, backendservicesesion, &backendservicesessionargs1);

noThread++;
backendservicesessionargs *sessionargs = new backendservicesessionargs;
sessionargs->sqlite = this->sqlite;
sessionargs->connFd = connFd;
pthread_t pt;
pthread_create(&pt, NULL, backendservicesesion, sessionargs);
}

for (int i = 0; i < noThread; i++) {
pthread_join(threadA[i], NULL);
}
return 1;
}
51 changes: 28 additions & 23 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ limitations under the License.
#include <set>
#include <thread>

#include "../metadb/SQLiteDBInterface.h"
#include "../nativestore/DataPublisher.h"
#include "../nativestore/RelationBlock.h"
#include "../metadb/SQLiteDBInterface.h"
#include "../ml/trainer/JasmineGraphTrainingSchedular.h"
#include "../partitioner/local/JSONParser.h"
#include "../partitioner/local/MetisPartitioner.h"
#include "../partitioner/local/RDFParser.h"
Expand Down Expand Up @@ -90,8 +89,14 @@ static void start_remote_worker_command(int connFd, bool *loop_exit_p);
static void sla_command(int connFd, SQLiteDBInterface *sqlite, PerformanceSQLiteDBInterface *perfSqlite,
bool *loop_exit_p);

void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface *sqlite,
PerformanceSQLiteDBInterface *perfSqlite, JobScheduler *jobScheduler) {
void *frontendservicesesion(void *dummyPt) {
frontendservicesessionargs *sessionargs = (frontendservicesessionargs *)dummyPt;
std::string masterIP = sessionargs->masterIP;
int connFd = sessionargs->connFd;
SQLiteDBInterface *sqlite = sessionargs->sqlite;
PerformanceSQLiteDBInterface *perfSqlite = sessionargs->perfSqlite;
JobScheduler *jobScheduler = sessionargs->jobScheduler;
delete sessionargs;
frontend_logger.info("Thread No: " + to_string(pthread_self()));
frontend_logger.info("Master IP: " + masterIP);
char data[FRONTEND_DATA_LENGTH + 1];
Expand Down Expand Up @@ -279,18 +284,18 @@ int JasmineGraphFrontEnd::run() {

if (connFd < 0) {
frontend_logger.error("Cannot accept connection");
return 0;
continue;
}
frontend_logger.info("Connection successful");

frontend_logger.info("Master IP" + masterIP);

// TODO(miyurud):Temporarily commenting this line to enable building the project. Asked tmkasun to provide a
// permanent fix later when he is available.
threadVector.push_back(
std::thread(frontendservicesesion, masterIP, connFd, this->sqlite, this->perfSqlite, this->jobScheduler));
frontend_logger.info("Connection successful from " + std::string(inet_ntoa(clntAdd.sin_addr)));

currentFESession++;
frontendservicesessionargs *sessionargs = new frontendservicesessionargs;
sessionargs->masterIP = masterIP;
sessionargs->connFd = connFd;
sessionargs->sqlite = this->sqlite;
sessionargs->perfSqlite = this->perfSqlite;
sessionargs->jobScheduler = this->jobScheduler;
pthread_t pt;
pthread_create(&pt, NULL, frontendservicesesion, sessionargs);
}
}

Expand Down Expand Up @@ -360,7 +365,7 @@ void JasmineGraphFrontEnd::removeGraph(std::string graphID, SQLiteDBInterface *s
cout << "HOST ID : " << j->first << " Partition ID : " << j->second << endl;
}
sqlite->runUpdate("UPDATE graph SET graph_status_idgraph_status = " + to_string(Conts::GRAPH_STATUS::DELETING) +
" WHERE idgraph = " + graphID);
" WHERE idgraph = " + graphID);

JasmineGraphServer::removeGraph(hostHasPartition, graphID, masterIP);

Expand Down Expand Up @@ -1206,16 +1211,16 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
*loop_exit_p = true;
return;
}
// create kafka consumer and graph partitioner
// create kafka consumer and graph partitioner
kstream = new KafkaConnector(configs);
// Create the Partitioner object.
// Create the Partitioner object.
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH);
// Create the KafkaConnector object.
// Create the KafkaConnector object.
kstream = new KafkaConnector(configs);
// Subscribe to the Kafka topic.
// Subscribe to the Kafka topic.
kstream->Subscribe(topic_name_s);
// Create the StreamHandler object.
StreamHandler* stream_handler = new StreamHandler(kstream, graphPartitioner, workerClients);
// Create the StreamHandler object.
StreamHandler *stream_handler = new StreamHandler(kstream, graphPartitioner, workerClients);

frontend_logger.info("Start listening to " + topic_name_s);
input_stream_handler_thread = thread(&StreamHandler::listen_to_kafka_topic, stream_handler);
Expand Down Expand Up @@ -1736,7 +1741,7 @@ static void train_command(int connFd, SQLiteDBInterface *sqlite, bool *loop_exit
}
return;
}
auto* server = JasmineGraphServer::getInstance();
auto *server = JasmineGraphServer::getInstance();
if (Utils::getJasmineGraphProperty("org.jasminegraph.fl.org.training") == "true") {
frontend_logger.info("Initiate org communication");
JasmineGraphServer::initiateOrgCommunication(graphID, trainData, sqlite, server->masterHost);
Expand Down Expand Up @@ -2184,7 +2189,7 @@ static void sla_command(int connFd, SQLiteDBInterface *sqlite, PerformanceSQLite
std::stringstream ss;
std::vector<vector<pair<string, string>>> v =
perfSqlite->runSelect("SELECT graph_id, partition_count, sla_value FROM graph_sla where id_sla_category in (" +
adjustedIdList + ");");
adjustedIdList + ");");
for (std::vector<vector<pair<string, string>>>::iterator i = v.begin(); i != v.end(); ++i) {
std::stringstream slass;
slass << "|";
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/JasmineGraphFrontEnd.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ class JasmineGraphFrontEnd {
};

struct frontendservicesessionargs {
SQLiteDBInterface *sqlite;
std::string masterIP;
int connFd;
SQLiteDBInterface *sqlite;
PerformanceSQLiteDBInterface *perfSqlite;
JobScheduler *jobScheduler;
};

#endif // JASMINGRAPH_JASMINGRAPHFRONTEND_H
32 changes: 21 additions & 11 deletions src/ml/trainer/JasmineGraphTrainingSchedular.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ limitations under the License.
using namespace std;
Logger trainScheduler_logger;

static long getAvailableMemory(std::string hostname);
static long estimateMemory(int edgeCount, std::string graph_id);

static std::map<int, int> packPartitionsToMemory(std::vector<std::pair<int, int>> partitionMemoryList, int capacity);

static std::map<std::string, std::map<int, std::map<int, int>>> scheduleGradientPassingTraining(std::string graphID);

static std::vector<std::pair<int, double>> estimateMemoryDistOpt(std::vector<std::vector<int>> partitionMetadata,
long availableMemory);

static std::map<int, std::map<int, int>> schedulePartitionsBestFit(
std::vector<std::pair<int, double>> partitionMemoryList, std::map<int, int> partitionWorkerMap, int capacity);

map<string, std::map<int, int>> JasmineGraphTrainingSchedular::schedulePartitionTraining(std::string graphID) {
map<string, std::map<int, int>> scheduleForEachHost;
vector<pair<string, string>> hostData;
Expand Down Expand Up @@ -98,7 +111,7 @@ map<string, std::map<int, int>> JasmineGraphTrainingSchedular::schedulePartition
return scheduleForEachHost;
}

long JasmineGraphTrainingSchedular::estimateMemory(int vertexCount, string graph_id) {
static long estimateMemory(int vertexCount, string graph_id) {
auto *refToSqlite = new SQLiteDBInterface();
refToSqlite->init();

Expand Down Expand Up @@ -134,8 +147,8 @@ long JasmineGraphTrainingSchedular::estimateMemory(int vertexCount, string graph
return totalMemoryApproximation;
}

long JasmineGraphTrainingSchedular::getAvailableMemory(string hostname) {
PerformanceSQLiteDBInterface *refToPerfDb = new PerformanceSQLiteDBInterface();
static long getAvailableMemory(string hostname) {
auto *refToPerfDb = new PerformanceSQLiteDBInterface();
refToPerfDb->init();
trainScheduler_logger.log("Fetching available host " + hostname + " memory", "info");
string perfSqlStatement =
Expand All @@ -151,8 +164,7 @@ long JasmineGraphTrainingSchedular::getAvailableMemory(string hostname) {
return availableMemory;
}

std::map<int, int> JasmineGraphTrainingSchedular::packPartitionsToMemory(vector<pair<int, int>> partitionMemoryList,
int capacity) {
static std::map<int, int> packPartitionsToMemory(vector<pair<int, int>> partitionMemoryList, int capacity) {
std::map<int, int> partitionToIteration;

int res = 0;
Expand Down Expand Up @@ -188,8 +200,7 @@ std::map<int, int> JasmineGraphTrainingSchedular::packPartitionsToMemory(vector<
* @param graphID ID of graph to be trained
* @return Map of host to maps containing schedule for that host given by schedulePartitionsBestFit method
*/
map<string, std::map<int, map<int, int>>> JasmineGraphTrainingSchedular::scheduleGradientPassingTraining(
std::string graphID) {
static map<string, std::map<int, map<int, int>>> scheduleGradientPassingTraining(std::string graphID) {
map<string, map<int, map<int, int>>> scheduleForEachHost;
vector<pair<string, string>> hostData;
auto *refToSqlite = new SQLiteDBInterface();
Expand Down Expand Up @@ -313,8 +324,7 @@ map<string, std::map<int, map<int, int>>> JasmineGraphTrainingSchedular::schedul
* @param availableMemory total memory of host (in KB)
* @return Vector of pairs containing partition ids and estimated memory (in KB) for distributed opt training process
*/
vector<pair<int, double>> JasmineGraphTrainingSchedular::estimateMemoryDistOpt(vector<vector<int>> partitionMetadata,
long availableMemory) {
static vector<pair<int, double>> estimateMemoryDistOpt(vector<vector<int>> partitionMetadata, long availableMemory) {
vector<pair<int, double>> partitionMemoryList; // Vector of estimated sizes of partitions

trainScheduler_logger.log("Estimating host partition size in memory", "info");
Expand Down Expand Up @@ -350,8 +360,8 @@ vector<pair<int, double>> JasmineGraphTrainingSchedular::estimateMemoryDistOpt(v
* @param capacity total memory of host
* @return Map from worker to maps from partition to order of loading into memory
*/
map<int, map<int, int>> JasmineGraphTrainingSchedular::schedulePartitionsBestFit(
vector<pair<int, double>> partitionMemoryList, map<int, int> partitionWorkerMap, int capacity) {
static map<int, map<int, int>> schedulePartitionsBestFit(vector<pair<int, double>> partitionMemoryList,
map<int, int> partitionWorkerMap, int capacity) {
std::map<int, map<int, int>> schedule; // Host schedule per worker and what partitions to load in which order
cout << "Host memory " << capacity << endl;
// Initialize the state of host workers as free
Expand Down
15 changes: 1 addition & 14 deletions src/ml/trainer/JasmineGraphTrainingSchedular.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,7 @@ limitations under the License.

class JasmineGraphTrainingSchedular {
public:
std::map<std::string, std::map<int, int>> schedulePartitionTraining(std::string graphID);

long getAvailableMemory(std::string hostname);
long estimateMemory(int edgeCount, std::string graph_id);

std::map<int, int> packPartitionsToMemory(std::vector<std::pair<int, int>> partitionMemoryList, int capacity);

std::map<std::string, std::map<int, std::map<int, int>>> scheduleGradientPassingTraining(std::string graphID);

std::vector<std::pair<int, double>> estimateMemoryDistOpt(std::vector<std::vector<int>> partitionMetadata,
long availableMemory);

std::map<int, std::map<int, int>> schedulePartitionsBestFit(std::vector<std::pair<int, double>> partitionMemoryList,
std::map<int, int> partitionWorkerMap, int capacity);
static std::map<std::string, std::map<int, int>> schedulePartitionTraining(std::string graphID);
};

#endif // JASMINEGRAPH_JASMINEGRAPHTRAININGSCHEDULAR_H
26 changes: 8 additions & 18 deletions src/server/JasmineGraphInstanceFileTransferService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pthread_mutex_t thread_lock = PTHREAD_MUTEX_INITIALIZER;
void *filetransferservicesession(void *dummyPt) {
filetransferservicesessionargs *sessionargs = (filetransferservicesessionargs *)dummyPt;
int connFd = sessionargs->connFd;
delete sessionargs;
char data[301];
bzero(data, 301);
read(connFd, data, 300);
Expand Down Expand Up @@ -82,29 +83,18 @@ void JasmineGraphInstanceFileTransferService::run(int dataPort) {

len = sizeof(clntAdd);

int connectionCounter = 0;
pthread_t threadA[100];

// TODO :: What is the maximum number of connections allowed?? Considered as 100 for now
while (connectionCounter < 100) {
while (true) {
file_service_logger.log("Worker FileTransfer Service listening on port " + to_string(dataPort), "info");
connFd = accept(listenFd, (struct sockaddr *)&clntAdd, &len);

if (connFd < 0) {
file_service_logger.log("Cannot accept connection to port " + to_string(dataPort), "error");
} else {
file_service_logger.log("Connection successful to port " + to_string(dataPort), "info");
struct filetransferservicesessionargs filetransferservicesessionargs1;
filetransferservicesessionargs1.connFd = connFd;

pthread_create(&threadA[connectionCounter], NULL, filetransferservicesession,
&filetransferservicesessionargs1);
connectionCounter++;
continue;
}
}

for (int i = 0; i < connectionCounter; i++) {
pthread_join(threadA[i], NULL);
std::cout << "FT Threads joined" << std::endl;
file_service_logger.log("Connection successful to port " + to_string(dataPort), "info");
filetransferservicesessionargs *sessionargs = new filetransferservicesessionargs;
sessionargs->connFd = connFd;
pthread_t pt;
pthread_create(&pt, NULL, filetransferservicesession, sessionargs);
}
}
1 change: 0 additions & 1 deletion src/server/JasmineGraphInstanceProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ const int INSTANCE_DATA_LENGTH = 300;
const int FED_DATA_LENGTH = 300;
const int INSTANCE_LONG_DATA_LENGTH = 1024;
const int INSTANCE_FILE_BUFFER_LENGTH = 1024;
const int MAX_CONNECTION_COUNT = 300;
const int MAX_STREAMING_DATA_LENGTH = 1024;

const int TOP_K_PAGE_RANK = 100;
Expand Down
Loading

0 comments on commit daeda1c

Please sign in to comment.