Skip to content

Commit

Permalink
Styles, minorities, move more math to core and WIP on HTTP/WS/SSE client
Browse files Browse the repository at this point in the history
  • Loading branch information
boazsegev committed Oct 14, 2024
1 parent 2a8403d commit f7903b2
Show file tree
Hide file tree
Showing 27 changed files with 4,035 additions and 2,573 deletions.
25 changes: 18 additions & 7 deletions examples/chat.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,15 @@ FIO_SFUNC void on_close(void *udata) {

/** Performs "login" logic (saves user handle) */
FIO_SFUNC void on_data_first_line(fio_s *io, char *name, size_t len) {
if (!len)
goto error_name_too_short;
do
--len;
while (len && (name[len] == '\r' || name[len] == '\n' || name[len] == ' ' ||
name[len] == '\t'));
++len;
if (len < 2)
goto error_name_too_short;
--len;
if (len > 30)
goto error_name_too_long;
client_s *c = fio_udata_get(io);
Expand Down Expand Up @@ -196,6 +202,10 @@ FIO_SFUNC void on_shutdown(fio_s *io) {
Starting the program - main()
***************************************************************************** */

static void print_chat(fio_msg_s *m) {
printf("%.*s", (int)m->message.len, m->message.buf);
}

int main(int argc, char const *argv[]) {
// FIO_NAME_TEST(stl, letter)();
/* initialize the CLI options */
Expand All @@ -215,15 +225,16 @@ int main(int argc, char const *argv[]) {
"\tNAME tcp://localhost:3000/\n"
"\tNAME localhost://3000\n",
FIO_CLI_BOOL("--verbose -V -d print out debugging messages."),
FIO_CLI_BOOL("--log -v log HTTP messages."),
FIO_CLI_INT("--workers -w (4) number of worker processes to use."),
FIO_CLI_PRINT_LINE(
"NOTE: requests are limited to 32Kb and 16 headers each."));
FIO_CLI_BOOL("--log -v logs chat messages."),
FIO_CLI_INT("--workers -w (2) number of worker processes to use."));

/* review CLI for logging */
if (fio_cli_get_bool("-V")) {
if (fio_cli_get_bool("-V"))
FIO_LOG_LEVEL = FIO_LOG_LEVEL_DEBUG;
}

if (fio_cli_get_bool("-v"))
fio_subscribe(.on_message = print_chat, .master_only = 1);

/* review CLI connection address (in URL format) */
FIO_ASSERT(fio_srv_listen(.url = fio_cli_unnamed(0),
.protocol = &CHAT_PROTOCOL_LOGIN),
Expand Down
214 changes: 183 additions & 31 deletions examples/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ Feel free to copy, use and enjoy according to the license provided.
***************************************************************************** */

/* *****************************************************************************
This is a simple TCP/IP and Unix Socket client example. UDP is also available
but untested.
This is a simple TCP/IP and Unix Socket client example with support for HTTP,
WebSocket and SSE client connections. UDP is also available but untested.
Note that this program uses a single thread, which allows it to ignore some
possible race conditions.
This program uses a single thread, which reduces complexity.
***************************************************************************** */

/* include some of the modules we use... */
Expand All @@ -36,14 +35,37 @@ static fio_protocol_s CLIENT_PROTOCOL = {
.timeout = 30,
};

/** Called there's incoming data (from STDIN / the client socket). */
/** Callback for HTTP requests (server) or responses (client). */
FIO_SFUNC void client_on_http(fio_http_s *h);
/** Called once a WebSocket / SSE connection upgrade is complete. */
FIO_SFUNC void client_on_open(fio_http_s *h);

/** Called when a WebSocket message is received. */
FIO_SFUNC void client_on_message(fio_http_s *h,
fio_buf_info_s msg,
uint8_t is_text);
/** Called when an EventSource event is received. */
FIO_SFUNC void client_on_eventsource(fio_http_s *h,
fio_buf_info_s id,
fio_buf_info_s event,
fio_buf_info_s data);
/** Called after a WebSocket / SSE connection is closed (for cleanup). */
FIO_SFUNC void client_on_close(fio_http_s *h);

/** Called for show... when the outgoing buffer appears empty. */
FIO_SFUNC void client_on_ready(fio_http_s *h);

/** Called there's incoming data from STDIN. */
FIO_SFUNC void on_input(fio_s *io);
/** Called when STDIN closed. */
FIO_SFUNC void on_input_closed(void *udata);
/** Called if connection failed to establish. */
FIO_SFUNC void on_failed(void *arg);

/** STDIN protocol (REPL) */
static fio_protocol_s STDIN_PROTOCOL = {
.on_data = on_input,
.on_close = on_input_closed,
};

/* Opens the client connection after the server starts (avoid SIGPIPE) */
Expand Down Expand Up @@ -73,11 +95,13 @@ int main(int argc, char const *argv[]) {
"\tNAME udp://localhost:3000/\n",
FIO_CLI_INT("--timeout -t (50) ongoing connection timeout in seconds."),
FIO_CLI_INT("--wait -w (5) connection attempt timeout in seconds."),
FIO_CLI_BOOL("--body -b print out body only, ignore headers."),
FIO_CLI_BOOL("--verbose -V -d print out debugging messages."));

/* review CLI for logging */
if (fio_cli_get_bool("-V"))
if (fio_cli_get_bool("-V")) {
FIO_LOG_LEVEL = FIO_LOG_LEVEL_DEBUG;
}
/* review connection timeout */
if (fio_cli_get_i("-t") > 0)
CLIENT_PROTOCOL.timeout = (uint32_t)fio_cli_get_i("-t") * 1000;
Expand Down Expand Up @@ -111,7 +135,6 @@ int main(int argc, char const *argv[]) {
fio_state_callback_add(FIO_CALL_ON_START, open_client_connection, is_http);
/* start server, connection termination will stop it. */
fio_srv_start(0);
FIO_LOG_INFO("* connection terminated.\n");
return 0;
}

Expand All @@ -122,12 +145,21 @@ Opening the client connection.
FIO_SFUNC void open_client_connection(void *is_http) {

if (is_http) {
/* TODO! HTTP / WebSocket / SSE client */
FIO_LOG_FATAL("HTTP, WebSocket and SSE clients aren't supported yet"
"\n\t\tfor URL: %s",
fio_cli_unnamed(0));
fio_srv_stop();
/* HTTP / WebSocket / SSE Client */
FIO_ASSERT(fio_http_connect(fio_cli_unnamed(0),
NULL,
.on_http = client_on_http,
.on_open = client_on_open,
.on_message = client_on_message,
.on_ready = client_on_ready,
.on_eventsource = client_on_eventsource,
.on_close = client_on_close,
.on_finish = client_on_close,
.timeout = (fio_cli_get_i("-t") * 1000),
.ws_timeout = (fio_cli_get_i("-w") * 1000)),
"HTTP/WS Connection error!");
} else {
/* Raw TCP/IP / UDP Client */
FIO_ASSERT(fio_srv_connect(fio_cli_unnamed(0),
.protocol = &CLIENT_PROTOCOL,
.on_failed = on_failed,
Expand All @@ -136,6 +168,52 @@ FIO_SFUNC void open_client_connection(void *is_http) {
}
}

/* *****************************************************************************
Input from STDIN - directed to the client's socket using pub/sub
***************************************************************************** */

/** Called there's incoming data (from STDIN / the client socket). */
FIO_SFUNC void on_input(fio_s *io) {
struct {
size_t len;
char buf[4080];
} info;
for (; FIO_SOCK_WAIT_R(fileno(stdin), 50) == POLLIN;) { /* read until done */
FIO_LOG_DEBUG2("reading from STDIN...");
info.len = fread(info.buf, 1, 4080, stdin);
if (!info.len)
return;
FIO_LOG_DEBUG2("Publishing: %.*s", (int)info.len, info.buf);
fio_publish(.from = io,
.channel = FIO_BUF_INFO1("client"),
.message = FIO_BUF_INFO2(info.buf, info.len));
}
}

/** Called when STDIN closed. */
FIO_SFUNC void on_input_closed(void *udata) {
FIO_LOG_DEBUG2("STDIN input stream closed.");
fio_srv_stop();
(void)udata;
}

/* Debug messages for STDIN round-trip */
void debug_subscriber(fio_msg_s *msg) {
FIO_LOG_DEBUG2("Subscriber received: %.*s",
msg->message.len,
msg->message.buf);
}

/* Attach STDIN */
FIO_SFUNC void attach_stdin(void) {
FIO_LOG_DEBUG2("listening to user input on STDIN.");
fio_srv_attach_fd(fileno(stdin), &STDIN_PROTOCOL, NULL, NULL);
if (fio_cli_get_bool("-V"))
fio_subscribe(.channel = FIO_BUF_INFO1("client"),
.on_message = debug_subscriber,
.master_only = 1);
}

/* *****************************************************************************
IO callback(s)
***************************************************************************** */
Expand All @@ -144,13 +222,11 @@ IO callback(s)
FIO_SFUNC void on_attach(fio_s *io) {
fio_subscribe(.io = io, .channel = FIO_BUF_INFO1("client"));
fio_udata_set(io, (void *)1);
FIO_LOG_INFO("* connection established.\n");
FIO_LOG_DEBUG2("* connection established.\n");
FIO_LOG_DEBUG2("Connected client IO to pub/sub");
/* attach STDIN */
FIO_LOG_DEBUG2("listening to user input on STDIN.");
fio_srv_attach_fd(fileno(stdin), &STDIN_PROTOCOL, NULL, NULL);
attach_stdin();
}
/** Called there's incoming data (from STDIN / the client socket. */
/** Called there's incoming data from the client socket. */
FIO_SFUNC void on_data(fio_s *io) {
FIO_LOG_DEBUG2("on_data callback called for: %p", io);
char buf[4080];
Expand All @@ -174,19 +250,95 @@ FIO_SFUNC void on_failed(void *arg) {
FIO_LOG_ERROR("Connection failed / no data received: %s", fio_cli_unnamed(0));
on_close(arg);
}
/** Called there's incoming data (from STDIN / the client socket). */
FIO_SFUNC void on_input(fio_s *io) {
struct {
size_t len;
char buf[4080];
} info;
for (;;) { /* read until done */
info.len = fread(info.buf, 1, 4080, stdin);
if (!info.len)
return;
FIO_LOG_DEBUG2("Publishing: %.*s", (int)info.len, info.buf);
fio_publish(.from = io,
.channel = FIO_BUF_INFO1("client"),
.message = FIO_BUF_INFO2(info.buf, info.len));

/* *****************************************************************************
HTTP callback(s)
***************************************************************************** */

FIO_SFUNC int client_print_header(fio_http_s *h,
fio_str_info_s k,
fio_str_info_s v,
void *_) {
printf("%s:%s\n", k.buf, v.buf);
return 0;
(void)_, (void)h;
}

FIO_SFUNC void client_print_response_headers(fio_http_s *h) {
if (fio_cli_get_bool("-b"))
return;
FIO_LOG_DEBUG2("HTTP response received");
printf("%zu %s %s\n",
fio_http_status(h),
fio_http_status2str(fio_http_status(h)).buf,
fio_http_version(h).buf);
fio_http_response_header_each(h, client_print_header, NULL);
printf("\n");
}

/** Callback for HTTP requests (server) or responses (client). */
FIO_SFUNC void client_on_http(fio_http_s *h) {
client_print_response_headers(h);
for (;;) {
fio_str_info_s buf = fio_http_body_read(h, 1024);
if (!buf.len)
break;
printf("%.*s", (int)buf.len, buf.buf);
}

fio_srv_stop();
}

/** Called once a WebSocket / SSE connection upgrade is complete. */
FIO_SFUNC void client_on_open(fio_http_s *h) {
client_print_response_headers(h);
FIO_LOG_DEBUG2("Connection Established with: %s", fio_http_path(h).buf);
/* WebSocket only code - read from STDIN and publish to WebSocket. */
if (!fio_http_is_websocket(h))
return;
attach_stdin();
fio_http_subscribe(h, .channel = FIO_BUF_INFO1("client"));
(void)h;
}

/** Called when a WebSocket message is received. */
FIO_SFUNC void client_on_message(fio_http_s *h,
fio_buf_info_s msg,
uint8_t is_text) {
msg.len -= (msg.len > 0 && msg.buf[msg.len - 1] == '\n');
msg.len -= (msg.len > 0 && msg.buf[msg.len - 1] == '\r');
printf("Received (%s): %.*s\n",
(is_text ? "txt" : "binary"),
(int)msg.len,
msg.buf);
(void)h;
}

/** Called when an EventSource event is received. */
FIO_SFUNC void client_on_eventsource(fio_http_s *h,
fio_buf_info_s id,
fio_buf_info_s event,
fio_buf_info_s data) {
printf("Received SSE:\nid: %.*s\nevent: %.*s\ndata: %.*s\n\n",
(int)id.len,
id.buf,
(int)event.len,
event.buf,
(int)data.len,
data.buf);
(void)h;
}
/** Called after a WebSocket / SSE connection is closed (for cleanup). */
FIO_SFUNC void client_on_close(fio_http_s *h) {
if (!fio_cli_get_bool("-b"))
FIO_LOG_INFO("Connection Closed");
fio_srv_stop();
(void)h;
}

/** Called for show. */
FIO_SFUNC void client_on_ready(fio_http_s *h) {
FIO_LOG_DEBUG2("ON_READY Called! %zu bytes in outgoing buffer.",
fio_srv_backlog(fio_http_io(h)));
(void)h;
}
Loading

0 comments on commit f7903b2

Please sign in to comment.