Skip to content

Commit

Permalink
cm_data: Added code to synchronize at the end of each iteration via s…
Browse files Browse the repository at this point in the history
…ocket communication.

Refactored sockets communication code and moved it from complex folder to common folder for reuse,
added sychroniztion via sockets communication and added command line option "-q" to specify service port.

Signed-off-by: Shantonu Hossain <[email protected]>
  • Loading branch information
shantonu committed Jul 25, 2016
1 parent ba4bee1 commit 74a5e3d
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 171 deletions.
159 changes: 159 additions & 0 deletions common/shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ char test_name[50] = "custom";
int timeout = -1;
struct timespec start, end;

int listen_sock = -1;
int sock = -1;

struct fi_av_attr av_attr = {
.type = FI_AV_MAP,
.count = 1
Expand Down Expand Up @@ -1889,3 +1892,159 @@ int send_recv_greeting(struct fid_ep *ep)

return 0;
}

int ft_sock_listen(char *service)
{
struct addrinfo *ai, hints;
int val, ret;

memset(&hints, 0, sizeof hints);
hints.ai_flags = AI_PASSIVE;

ret = getaddrinfo(NULL, service, &hints, &ai);
if (ret) {
fprintf(stderr, "getaddrinfo() %s\n", gai_strerror(ret));
return ret;
}

listen_sock = socket(ai->ai_family, SOCK_STREAM, 0);
if (listen_sock < 0) {
perror("socket");
ret = listen_sock;
goto out;
}

val = 1;
ret = setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
if (ret) {
perror("setsockopt SO_REUSEADDR");
goto out;
}

ret = bind(listen_sock, ai->ai_addr, ai->ai_addrlen);
if (ret) {
perror("bind");
goto out;
}

ret = listen(listen_sock, 0);
if (ret)
perror("listen");

out:
if (ret && listen_sock >= 0)
close(listen_sock);
freeaddrinfo(ai);
return ret;
}

int ft_sock_connect(char *node, char *service)
{
struct addrinfo *ai;
int ret;

ret = getaddrinfo(node, service, NULL, &ai);
if (ret) {
perror("getaddrinfo");
return ret;
}

sock = socket(ai->ai_family, SOCK_STREAM, 0);
if (sock < 0) {
perror("socket");
ret = sock;
goto free;
}

ret = 1;
ret = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *) &ret, sizeof(ret));
if (ret)
perror("setsockopt");

ret = connect(sock, ai->ai_addr, ai->ai_addrlen);
if (ret) {
perror("connect");
close(sock);
}

free:
freeaddrinfo(ai);
return ret;
}

int ft_sock_accept()
{
int ret, op;

sock = accept(listen_sock, NULL, 0);
if (sock < 0) {
ret = sock;
perror("accept");
return ret;
}

op = 1;
ret = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
(void *) &op, sizeof(op));
if (ret)
perror("setsockopt");

return 0;
}

int ft_sock_send(int fd, void *msg, size_t len)
{
int ret;

ret = send(fd, msg, len, 0);
if (ret == len) {
return 0;
} else if (ret < 0) {
perror("send");
return -errno;
} else {
perror("send aborted");
return -FI_ECONNABORTED;
}
}

int ft_sock_recv(int fd, void *msg, size_t len)
{
int ret;

ret = recv(fd, msg, len, MSG_WAITALL);
if (ret == len) {
return 0;
} else if (ret == 0) {
return -FI_ENOTCONN;
} else if (ret < 0) {
FT_PRINTERR("ft_fw_recv", ret);
perror("recv");
return -errno;
} else {
perror("recv aborted");
return -FI_ECONNABORTED;
}
}

int ft_sock_sync(int value)
{
int result = -FI_EOTHER;

if (listen_sock < 0) {
ft_sock_send(sock, &value, sizeof value);
ft_sock_recv(sock, &result, sizeof result);
} else {
ft_sock_recv(sock, &result, sizeof result);
ft_sock_send(sock, &value, sizeof value);
}

return result;
}

void ft_sock_shutdown(int fd)
{
shutdown(fd, SHUT_RDWR);
close(fd);
}

4 changes: 2 additions & 2 deletions complex/ft_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ static int ft_load_av(void)
}

msg.len = (uint32_t) len;
ret = ft_fw_send(sock, &msg, sizeof msg);
ret = ft_sock_send(sock, &msg, sizeof msg);
if (ret)
return ret;

ret = ft_fw_recv(sock, &msg, sizeof msg);
ret = ft_sock_recv(sock, &msg, sizeof msg);
if (ret)
return ret;

Expand Down
Loading

0 comments on commit 74a5e3d

Please sign in to comment.