Skip to content

Commit

Permalink
fix IPC for pub/sub protocol - buffer size info missing + minor stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
boazsegev committed Nov 27, 2024
1 parent b422dee commit 3a1f84a
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 251 deletions.
81 changes: 41 additions & 40 deletions examples/chat.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ Callbacks and object used by main()
***************************************************************************** */

/** Called when a new connection is created and login process starts. */
FIO_SFUNC void on_login_start(fio_s *io);
FIO_SFUNC void on_login_start(fio_io_s *io);
/** Called there's incoming data (from STDIN / the client socket. */
FIO_SFUNC void on_data_login(fio_s *io);
FIO_SFUNC void on_data_chat(fio_s *io);
FIO_SFUNC void on_data_login(fio_io_s *io);
FIO_SFUNC void on_data_chat(fio_io_s *io);
/** Called when a login process should be performed. */
FIO_SFUNC void on_shutdown(fio_s *io);
FIO_SFUNC void on_shutdown(fio_io_s *io);
/** Called when the monitored IO is closed or has a fatal error. */
FIO_SFUNC void on_close(void *udata);
FIO_SFUNC void on_close(void *buf, void *udata);

static fio_protocol_s CHAT_PROTOCOL_LOGIN = {
static fio_io_protocol_s CHAT_PROTOCOL_LOGIN = {
.on_attach = on_login_start,
.on_data = on_data_login,
.on_close = on_close,
};
static fio_protocol_s CHAT_PROTOCOL_CHAT = {
static fio_io_protocol_s CHAT_PROTOCOL_CHAT = {
.on_data = on_data_chat,
.on_close = on_close,
.on_shutdown = on_shutdown,
Expand Down Expand Up @@ -70,8 +70,8 @@ FIO_IFUNC void client_free(client_s *c) {
}

/** Called when a new connection is created and login process starts. */
FIO_SFUNC void on_login_start(fio_s *io) {
fio_udata_set(io, client_new());
FIO_SFUNC void on_login_start(fio_io_s *io) {
fio_io_udata_set(io, client_new());
FIO_STR_INFO_TMP_VAR(node_msg, 1024);
fio_string_write2(
&node_msg,
Expand All @@ -80,13 +80,13 @@ FIO_SFUNC void on_login_start(fio_s *io) {
FIO_STRING_WRITE_UNUM(getpid()),
FIO_STRING_WRITE_STR1(
"\nPlease enter a login handle (up to 30 characters long)\n"));
fio_write(io, node_msg.buf, node_msg.len);
fio_io_write(io, node_msg.buf, node_msg.len);
if (fio_cli_get_bool("-v"))
FIO_LOG_INFO("(%d) %p connected", getpid(), (void *)io);
}

/** Called when the monitored IO is closed or has a fatal error. */
FIO_SFUNC void on_close(void *udata) {
FIO_SFUNC void on_close(void *buf, void *udata) {
FIO_STR_INFO_TMP_VAR(s, CHAT_MAX_HANDLE_LEN + 32);
client_s *c = udata;
fio_string_write2(&s,
Expand All @@ -95,10 +95,11 @@ FIO_SFUNC void on_close(void *udata) {
FIO_STRING_WRITE_STR1(" left the chat.\n"));
fio_publish(.message = FIO_STR2BUF_INFO(s));
client_free(c);
(void)buf;
}

/** Performs "login" logic (saves user handle) */
FIO_SFUNC void on_data_first_line(fio_s *io, char *name, size_t len) {
FIO_SFUNC void on_data_first_line(fio_io_s *io, char *name, size_t len) {
if (!len)
goto error_name_too_short;
do
Expand All @@ -110,7 +111,7 @@ FIO_SFUNC void on_data_first_line(fio_s *io, char *name, size_t len) {
goto error_name_too_short;
if (len > 30)
goto error_name_too_long;
client_s *c = fio_udata(io);
client_s *c = fio_io_udata(io);
memcpy(c->name, name, len);
c->name[len] = 0;
c->name[31] = (char)len;
Expand All @@ -122,23 +123,23 @@ FIO_SFUNC void on_data_first_line(fio_s *io, char *name, size_t len) {
FIO_STRING_WRITE_STR1("You are connected to node: "),
FIO_STRING_WRITE_UNUM(getpid()),
FIO_STRING_WRITE_STR1("\n"));
fio_write2(io,
.buf = welcome,
.len = fio_bstr_len(welcome),
.dealloc = (void (*)(void *))fio_bstr_free);
fio_io_write2(io,
.buf = welcome,
.len = fio_bstr_len(welcome),
.dealloc = (void (*)(void *))fio_bstr_free);
return;
error_name_too_long:
fio_write(io, "ERROR! login handle too long. Goodbye.\n", 39);
fio_close(io);
fio_io_write(io, "ERROR! login handle too long. Goodbye.\n", 39);
fio_io_close(io);
return;
error_name_too_short:
fio_write(io, "ERROR! login handle too short (empty?). Goodbye.\n", 49);
fio_close(io);
fio_io_write(io, "ERROR! login handle too short (empty?). Goodbye.\n", 49);
fio_io_close(io);
}

/** Manages chat messages */
FIO_SFUNC void on_data_message_line(fio_s *io, char *msg, size_t len) {
client_s *c = fio_udata(io);
FIO_SFUNC void on_data_message_line(fio_io_s *io, char *msg, size_t len) {
client_s *c = fio_io_udata(io);
char *buf = fio_bstr_write2(NULL,
FIO_STRING_WRITE_STR2(c->name, c->name[31]),
FIO_STRING_WRITE_STR2(": ", 2),
Expand All @@ -153,27 +154,27 @@ FIO_SFUNC void on_data_message_line(fio_s *io, char *msg, size_t len) {
((msg[2] | 32) == 'o') & ((msg[3] | 32) == 'd') &
((msg[4] | 32) == 'b') & ((msg[5] | 32) == 'y') &
((msg[6] | 32) == 'e')))) {
fio_write(io, "Goodbye.\n", 9);
fio_close(io);
fio_io_write(io, "Goodbye.\n", 9);
fio_io_close(io);
}
}

FIO_IFUNC int on_data_read(fio_s *io) {
FIO_IFUNC int on_data_read(fio_io_s *io) {
char buf[CHAT_MAX_MESSAGE_LEN];
size_t r = fio_read(io, buf, CHAT_MAX_MESSAGE_LEN);
size_t r = fio_io_read(io, buf, CHAT_MAX_MESSAGE_LEN);
if (!r)
return -1;
client_s *c = fio_udata(io);
client_s *c = fio_io_udata(io);
fio_stream_add(&c->input, fio_stream_pack_data(buf, r, 0, 1, NULL));
return 0;
}

FIO_IFUNC int on_data_process_line(fio_s *io,
void(task)(fio_s *, char *, size_t)) {
FIO_IFUNC int on_data_process_line(fio_io_s *io,
void(task)(fio_io_s *, char *, size_t)) {
char tmp[CHAT_MAX_MESSAGE_LEN];
char *buf = tmp;
size_t len = CHAT_MAX_MESSAGE_LEN;
client_s *c = fio_udata(io);
client_s *c = fio_io_udata(io);
fio_stream_read(&c->input, &buf, &len);
if (!len)
return -1;
Expand All @@ -187,27 +188,27 @@ FIO_IFUNC int on_data_process_line(fio_s *io,
}

/** for the first input line of the Chat protocol. */
FIO_SFUNC void on_data_login(fio_s *io) {
FIO_SFUNC void on_data_login(fio_io_s *io) {
if (on_data_read(io))
return;
if (on_data_process_line(io, on_data_first_line))
return;
fio_protocol_set(io, &CHAT_PROTOCOL_CHAT);
fio_io_protocol_set(io, &CHAT_PROTOCOL_CHAT);
fio_subscribe(.io = io);
on_data_chat(io);
}

/** for each subsequent message / line in the Chat protocol. */
FIO_SFUNC void on_data_chat(fio_s *io) {
FIO_SFUNC void on_data_chat(fio_io_s *io) {
if (on_data_read(io))
return;
while (!on_data_process_line(io, on_data_message_line))
;
}

/** Called when a login process should be performed. */
FIO_SFUNC void on_shutdown(fio_s *io) {
fio_write(io, "Server shutting down, goodbye...\n", 33);
FIO_SFUNC void on_shutdown(fio_io_s *io) {
fio_io_write(io, "Server shutting down, goodbye...\n", 33);
}

/* *****************************************************************************
Expand Down Expand Up @@ -248,14 +249,14 @@ int main(int argc, char const *argv[]) {
fio_subscribe(.on_message = print_chat, .master_only = 1);

/* review CLI connection address (in URL format) */
FIO_ASSERT(fio_srv_listen(.url = fio_cli_unnamed(0),
.protocol = &CHAT_PROTOCOL_LOGIN),
FIO_ASSERT(fio_io_listen(.url = fio_cli_unnamed(0),
.protocol = &CHAT_PROTOCOL_LOGIN),
"Could not open listening socket as requested.");
FIO_LOG_INFO("\n\tStarting plain text Chat server example app."
"\n\tEngine: " FIO_POLL_ENGINE_STR "\n\tWorkers: %d"
"\n\tPress ^C to exit.",
fio_srv_workers(fio_cli_get_i("-w")));
fio_srv_start(fio_cli_get_i("-w"));
fio_io_workers(fio_cli_get_i("-w")));
fio_io_start(fio_cli_get_i("-w"));
FIO_LOG_INFO("Shutdown complete.");
fio_cli_end();
return 0;
Expand Down
2 changes: 1 addition & 1 deletion examples/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ FIO_SFUNC void on_input_closed(void *buf, void *udata) {
}

/* Debug messages for STDIN round-trip */
void debug_subscriber(fio_msg_s *msg) {
FIO_SFUNC void debug_subscriber(fio_msg_s *msg) {
FIO_LOG_DEBUG2("Subscriber received: %.*s",
msg->message.len,
msg->message.buf);
Expand Down
Loading

0 comments on commit 3a1f84a

Please sign in to comment.