Skip to content

Commit

Permalink
feat: threadsafe rpc client
Browse files Browse the repository at this point in the history
  • Loading branch information
kevmo314 committed Sep 13, 2024
1 parent 55d9c03 commit e6e537c
Showing 1 changed file with 44 additions and 8 deletions.
52 changes: 44 additions & 8 deletions client.cu
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <nvml.h>
#include <unistd.h>
#include <pthread.h>
#include <atomic>

int sockfd;

Expand All @@ -28,8 +29,6 @@ int open_rpc_client()
}

bzero(&servaddr, sizeof(servaddr));

// assign IP, PORT
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
servaddr.sin_port = htons(14833);
Expand All @@ -41,21 +40,58 @@ int open_rpc_client()
return sockfd;
}

int nextRequestId = 0, responseId = -1;

int send_rpc_message(const char *op, const void *args, const int argslen)
int send_rpc_message(void **response, const char *op, const void *args, const int argslen)
{
char oplen = (char)strlen(op);
if (write(sockfd, (void *)&nextRequestId, sizeof(int)) < 0)
static int next_request_id = 0, active_response_id = -1;
static pthread_mutex_t mutex;
static pthread_cond_t cond;

// write the request atomically
pthread_mutex_lock(&mutex);

int request_id = next_request_id++;

uint8_t oplen = (uint8_t)strlen(op);
if (write(sockfd, (void *)&request_id, sizeof(int)) < 0)
return -1;
if (write(sockfd, (void *)&oplen, sizeof(char)) < 0)

if (write(sockfd, (void *)&oplen, sizeof(uint8_t)) < 0)
return -1;
if (write(sockfd, op, oplen) < 0)
return -1;
if (write(sockfd, (void *)&argslen, sizeof(int)) < 0)
return -1;
if (write(sockfd, args, argslen) < 0)
return -1;

// wait for the response
while (true)
{
while (active_response_id != request_id && active_response_id != -1)
pthread_cond_wait(&cond, &mutex);

// we currently own mutex. if active response id is -1, read the response id
if (active_response_id == -1)
{
if (read(sockfd, (void *)&active_response_id, sizeof(int)) < 0)
return -1;
continue;
}
else
{
// it's our turn to read the response.
int len;
if (read(sockfd, (void *)&len, sizeof(int)) < 0)
return -1;
*response = malloc(len);
if (read(sockfd, *response, len) < 0)
return -1;

// we are done, unlock and return.
pthread_mutex_unlock(&mutex);
return len;
}
}
}

void close_rpc_client()
Expand Down

0 comments on commit e6e537c

Please sign in to comment.