Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multitread version with dynamic resolving #4

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

build:
make clean
gcc src/hostname_resolver.c src/argparse.c src/socket.c src/udp_load_balancer.c -o build/udp_load_balancer
gcc src/hostname_resolver.c src/argparse.c src/socket.c src/udp_load_balancer.c -lpthread -o build/udp_load_balancer

install:
make build
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ Installation
------------
Compile it with ``make build`` or ``make install`` if you want to place the generated binary in ``/usr/local/bin/``

Features
--------
Support dynamic resolving of target servers in every 5 seconds. If subsequent resolv fails - use current servers until good resolv.

Usage
-----

udp_load_balancer [-h] [--port PORT] [--servers SERVERS]
udp_load_balancer [-hvd] [--port PORT] [--servers SERVERS]

The options are as follows:
-h, --help show this help message and exit
Expand All @@ -23,3 +27,6 @@ Usage
Servers list to balance the UDP messages
Example: "127.0.0.1:8123, 127.0.0.1:8124"
Example: "127.0.0.1:8123, localhost:8124, example.com:8123"
-v, --verbose Be verbose
-d, --debug Debug output enabled

1 change: 1 addition & 0 deletions TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* run_resolver(): change servers_list_resolved only when IP adresses have changed for a server
29 changes: 26 additions & 3 deletions src/argparse.c
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>

#include "udp_load_balancer.h"


unsigned int is_help_required(int argc, char **argv) {
unsigned int is_help_required_or_flags_set(int argc, char **argv) {
unsigned int i;
for (i = 0; i < argc; ++i)
{
if(strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
return 1;
} else if ( strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
verb_flag = 1;
} else if ( strcmp(argv[i], "-V") == 0 || strcmp(argv[i], "--version") == 0) {
version_flag = 1;
} else if ( strcmp(argv[i], "-d") == 0 || strcmp(argv[i], "--debug") == 0) {
verb_flag = 1;
debug_flag = 1;
}
}

Expand Down Expand Up @@ -100,14 +109,23 @@ char **get_servers_hosts(char **servers_list, int servers_amount) {
return servers_hosts;
}

void free_servers_hosts(char **servers_hosts, int servers_amount) {
unsigned int i;
for(i = 0; i < servers_amount; i++) {
free(servers_hosts[i]);
}
free(servers_hosts);
}

int *get_servers_ports(char **servers_list, int servers_amount) {
char *server_raw, *port_string;
char *server_raw, *free_pointer, *port_string;
int server_port;
int *servers_ports = malloc(MAX_SERVERS * sizeof(int*));

unsigned int i, j;
for(i = 0; i < servers_amount; i++) {
server_raw = strdup(servers_list[i]);
free_pointer = strdup(servers_list[i]);
server_raw = free_pointer;
strsep(&server_raw, ":");
port_string = strsep(&server_raw, ":");

Expand All @@ -117,7 +135,12 @@ int *get_servers_ports(char **servers_list, int servers_amount) {
}

servers_ports[i] = server_port;
free(free_pointer);
}

return servers_ports;
}

void free_servers_ports(int *servers_ports, int servers_amount) {
free(servers_ports);
}
8 changes: 6 additions & 2 deletions src/argparse.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
unsigned int is_help_required(int argc, char **argv);
unsigned int is_help_required_or_flags_set(int argc, char **argv);

unsigned int get_udp_load_balancer_port(int argc, char **argv);

Expand All @@ -8,4 +8,8 @@ char **get_servers_list(int argc, char **argv);

char **get_servers_hosts(char **servers_list, int servers_amount);

int *get_servers_ports(char **servers_list, int servers_amount);
void free_servers_hosts(char **servers_hosts, int servers_amount);

int *get_servers_ports(char **servers_list, int servers_amount);

void free_servers_ports(int *servers_ports, int servers_amount);
5 changes: 3 additions & 2 deletions src/hostname_resolver.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ int is_ip(char *address)
return result != 0;
}

char *get_ip_from_hostname(char *hostname)
char *get_one_ip_from_hostname(char *hostname)
{
struct hostent *he = gethostbyname(hostname);

Expand All @@ -27,4 +27,5 @@ char *get_ip_from_hostname(char *hostname)

/* return the first one, because one host can have multiple addresses */
return inet_ntoa(*addr_list[0]);
}

}
2 changes: 1 addition & 1 deletion src/hostname_resolver.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
int is_ip(char *address);

char *get_ip_from_hostname(char *hostname);
char *get_one_ip_from_hostname(char *hostname);
170 changes: 138 additions & 32 deletions src/udp_load_balancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
#include "socket.h"
#include "hostname_resolver.h"

#include <pthread.h>
#include <unistd.h>
#include <netdb.h>

unsigned int servers_amount_resolved = 0;

unsigned int get_servers_amount(char **servers_list) {
unsigned int i = 0;
Expand All @@ -21,36 +26,130 @@ unsigned int get_servers_amount(char **servers_list) {
}
}

void run_dispatcher(char **servers_list, int socket_)
void* run_resolver( void* call_arg )
{
char message[MAX_PAYLOAD_LENGTH];
unsigned int message_length;

char **servers_list = call_arg;
char **servers_list_resolved_tmp;
char **servers_list_resolved_free;
int *servers_ports_resolved_tmp;
int *servers_ports_resolved_free;
servers_list_resolved = NULL;
servers_ports_resolved = NULL;
int first_run = 1;
unsigned int servers_amount_resolved_prev;
unsigned int servers_amount = get_servers_amount(servers_list);

char **servers_hosts = get_servers_hosts(servers_list, servers_amount);
int *servers_ports = get_servers_ports(servers_list, servers_amount);

unsigned int i;

for (i = 0; i < servers_amount; i++) {
if (is_ip(servers_hosts[i]) == 0) {
servers_hosts[i] = get_ip_from_hostname(servers_hosts[i]);
while(1) {
servers_amount_resolved_prev = servers_amount_resolved;
servers_list_resolved_tmp = calloc(sizeof (char*), servers_amount);
servers_ports_resolved_tmp = calloc(sizeof (int), servers_amount);
if ( verb_flag ) printf("\n\nBalancing to servers:\n");
unsigned int i;
unsigned int j = 0;
for (i = 0; i < servers_amount; ++i) {
if ( verb_flag ) printf(" %s\n", servers_list[i]);
if (is_ip(servers_hosts[i]) == 0) {

struct hostent *he = gethostbyname(servers_hosts[i]);

if (he == NULL) {
printf("Host \"%s\" not found\n", servers_hosts[i]);
continue;
}

struct in_addr **addr_list = (struct in_addr **)he->h_addr_list;

while ( *addr_list ) {
if ( j >= servers_amount ) {
servers_list_resolved_tmp = realloc(servers_list_resolved_tmp, sizeof (char*) * (j+1));
servers_ports_resolved_tmp = realloc(servers_ports_resolved_tmp, sizeof (int) * (j+1));
}
servers_list_resolved_tmp[j] = strdup(inet_ntoa(**addr_list));
servers_ports_resolved_tmp[j] = servers_ports[i];
if ( verb_flag ) printf(" %s %d\n",servers_list_resolved_tmp[j], servers_ports_resolved_tmp[j]);
j++;
addr_list += 1;
}
} else {
if ( j >= servers_amount ) {
servers_list_resolved_tmp = realloc(servers_list_resolved_tmp, sizeof (char*) * (j+1));
servers_ports_resolved_tmp = realloc(servers_ports_resolved_tmp, sizeof (int) * (j+1));
}
servers_list_resolved_tmp[j] = strdup(servers_hosts[i]);
servers_ports_resolved_tmp[j] = servers_ports[i];
j++;
}
servers_amount_resolved = j;
}
if ( j < 1 ) {
printf( "Can't resolve any of servers, waiting for 5sec\n" );
sleep(5);
continue ;
}
if ( first_run == 0 ) {
pthread_mutex_lock(&resolv_mutex);
}
first_run = 0;
servers_list_resolved_free = servers_list_resolved;
servers_list_resolved = servers_list_resolved_tmp;
servers_ports_resolved_free = servers_ports_resolved;
servers_ports_resolved = servers_ports_resolved_tmp;
pthread_mutex_unlock(&resolv_mutex);
if ( debug_flag ) {
printf("servers_list_resolved array:\n");
for ( j = 0 ; j < servers_amount_resolved ; j++ ) {
printf(" (%u %s:%d)", j, servers_list_resolved_tmp[j], servers_ports_resolved_tmp[j]);
printf("\n");
dperfilyev marked this conversation as resolved.
Show resolved Hide resolved
}
}
free_servers_hosts(servers_list_resolved_free,servers_amount_resolved_prev);
free(servers_ports_resolved_free);

usleep(5000000); // resolve every 5sec
}
free_servers_hosts(servers_hosts,servers_amount);
free_servers_ports(servers_ports,servers_amount);
}

while(1) {
for(i = 0; i < servers_amount; i++) {
listen_udp_packet(message, &message_length, socket_);
void* run_dispatcher( void* call_arg )
{
dispatcher_arg_struct_t *arg = call_arg;
char message[MAX_PAYLOAD_LENGTH];
unsigned int message_length;
char srv[16];
int port;

send_udp_packet(servers_hosts[i], servers_ports[i], message, message_length, socket_);
unsigned int i;

i = 0;
while(1) {
if ( debug_flag ) printf("while waiting for a resolv_mutex\n");
listen_udp_packet(message, &message_length, arg->socket_);
if ( debug_flag ) printf("message:%s ", message );
dperfilyev marked this conversation as resolved.
Show resolved Hide resolved
pthread_mutex_lock(&resolv_mutex);
/* If number of servers' IPs has been decreased by resolver thread - begin from first IP */
if ( i >= servers_amount_resolved ) {
i = 0;
}
/* use srv & port temp variables for call send_udp_packet() outside of pthread_mutex_unlock() */
strcpy(srv,servers_list_resolved[i]);
port = servers_ports_resolved[i];
pthread_mutex_unlock(&resolv_mutex);
if ( debug_flag ) printf("%s\n", srv );
send_udp_packet(srv, port, message, message_length, arg->socket_);
i++;
}
}

void show_version() {
printf("udp_load_balancer %s\n",UDP_LOAD_BALANCER_VERSION);
}

void show_help() {
printf("\n"
"Usage: udp_load_balancer [-h] [--port PORT] [--servers SERVERS]\n"
"Usage: udp_load_balancer [-hvd] [--port PORT] [--servers SERVERS]\n"
"\n"
"The options are as follows:\n"
" -h, --help show this help message and exit\n"
Expand All @@ -61,16 +160,27 @@ void show_help() {
" -s, --servers SERVERS\n"
" Servers list to balance the UDP messages\n"
" Example: \"127.0.0.1:8123, localhost:8124, example.com:8123\"\n"
" -v, --verbose Be verbose\n"
" -V, --version Show version\n"
" -d, --debug Debug output enabled\n"
);
}

int main(int argc, char **argv)
{
if (is_help_required(argc, argv)) {
verb_flag = 0;
version_flag = 0;
debug_flag = 0;
if (is_help_required_or_flags_set(argc, argv)) {
show_help();
exit(0);
}

if ( version_flag ) {
show_version();
exit(0);
}

unsigned int udp_load_balancer_port = get_udp_load_balancer_port(argc, argv);
char **servers_list = get_servers_list(argc, argv);

Expand All @@ -79,30 +189,26 @@ int main(int argc, char **argv)
exit(2);
}

unsigned int servers_amount = get_servers_amount(servers_list);
char *udp_load_balancer_ip = get_udp_load_balancer_host(argc, argv);

printf("Listening on %s:%d", udp_load_balancer_ip, udp_load_balancer_port);
if ( verb_flag )printf("Listening on %s:%d\n", udp_load_balancer_ip, udp_load_balancer_port);
if(is_ip(udp_load_balancer_ip) == 0) {
udp_load_balancer_ip = get_ip_from_hostname(udp_load_balancer_ip);
printf(" (%s:%d)", udp_load_balancer_ip, udp_load_balancer_port);
udp_load_balancer_ip = get_one_ip_from_hostname(udp_load_balancer_ip);
if ( verb_flag ) printf(" (%s:%d)\n", udp_load_balancer_ip, udp_load_balancer_port);
}

printf("\n\nBalancing to servers:\n");
char **servers_hosts = get_servers_hosts(servers_list, servers_amount);
int *servers_ports = get_servers_ports(servers_list, servers_amount);
unsigned int i;
for (i = 0; i < servers_amount; ++i) {
printf(" %s", servers_list[i]);
if(is_ip(servers_hosts[i]) == 0) {
printf(" (%s:%d)", get_ip_from_hostname(servers_hosts[i]), servers_ports[i]);
}
printf("\n");
}
pthread_mutex_init (&resolv_mutex , NULL);
pthread_mutex_lock(&resolv_mutex); // initially lock resolv_mutex - unlock when resolv will be done

int socket_ = create_socket(udp_load_balancer_ip, udp_load_balancer_port);

run_dispatcher(servers_list, socket_);
pthread_t dispatcher_thread_id;
pthread_t resolver_thread_id;
dispatcher_arg_struct_t dispatcher_arg_struct = { servers_list , socket_ };
pthread_create(&resolver_thread_id, NULL, run_resolver, (void *) servers_list);
pthread_create(&dispatcher_thread_id, NULL, run_dispatcher, (void *) &dispatcher_arg_struct);
pthread_join(dispatcher_thread_id, NULL);
if ( verb_flag ) printf("main exit\n");

return 0;
}
25 changes: 24 additions & 1 deletion src/udp_load_balancer.h
Original file line number Diff line number Diff line change
@@ -1,2 +1,25 @@
#ifndef _DPLB_UDP_LOAD_BALANCER_
dperfilyev marked this conversation as resolved.
Show resolved Hide resolved
#define _DPLB_UDP_LOAD_BALANCER_

#define udp_load_balancer_version 1000000
#define UDP_LOAD_BALANCER_VERSION "1.0.0"
dperfilyev marked this conversation as resolved.
Show resolved Hide resolved

#define MAX_PAYLOAD_LENGTH 65507 // 65535 bytes − 8 byte UDP header − 20 byte IP header
#define MAX_SERVERS 1024
#define MAX_SERVERS 1024

pthread_mutex_t resolv_mutex;

int verb_flag;
int version_flag;
int debug_flag;

char **servers_list_resolved;
int *servers_ports_resolved;


typedef struct {
char **servers_list;
int socket_;
} dispatcher_arg_struct_t;

#endif /* _DPLB_UDP_LOAD_BALANCER_ */