Skip to content

Commit

Permalink
docs + fix IO timer scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
boazsegev committed Nov 23, 2024
1 parent 507eacd commit 768c3e3
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 30 deletions.
31 changes: 16 additions & 15 deletions fio-stl.h
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ Sleep / Thread Scheduling Macros
#define FIO_THREAD_YIELD() __asm__ __volatile__("yield" ::: "memory")
#elif defined(_MSC_VER)
#define FIO_THREAD_YIELD() YieldProcessor()
#else FIO_OS_POSIX
#else /* FIO_OS_POSIX */
/** Yields the thread, hinting to the processor about spinlock loop. */
#define FIO_THREAD_YIELD() sched_yield()
#endif
Expand Down Expand Up @@ -6150,7 +6150,7 @@ Big Numbers

FIO_IFUNC void fio___uXXX_hex_read(uint64_t *t, char **p, size_t l) {
char *start = *p;
start += ((unsigned)(start[0] == '0' & start[1] == 'x') << 1);
start += (((unsigned)(start[0] == '0') & (start[1] == 'x')) << 1);
char *pos = start;
while (fio_i2c((uint8_t)*pos) < 16)
++pos;
Expand Down Expand Up @@ -20108,7 +20108,7 @@ SFUNC int fio_string_write_base32enc(fio_str_info_s *dest,
fio_string_realloc_fn reallocate,
const void *raw,
size_t raw_len) {
const static uint8_t base32ecncode[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567";
static const uint8_t base32ecncode[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567";
int r = 0;
size_t expected = ((raw_len * 8) / 5) + 1;
if (fio_string___write_validate_len(dest, reallocate, &expected)) {
Expand Down Expand Up @@ -35378,6 +35378,7 @@ FIO_IFUNC void fio___io_defer_no_wakeup(void (*task)(void *, void *),
void fio_io_run_every___(void);
/** Schedules a timer bound task, see `fio_timer_schedule`. */
SFUNC void fio_io_run_every FIO_NOOP(fio_timer_schedule_args_s args) {
args.start_at = FIO___IO.tick;
fio_timer_schedule FIO_NOOP(&FIO___IO.timer, args);
}

Expand Down Expand Up @@ -35895,7 +35896,7 @@ static void fio___io_poll_on_ready(void *io_, void *ignr_) {
size_t total = 0;
FIO___IO_FLAG_UNSET(io,
(FIO___IO_FLAG_POLLOUT_SET | FIO___IO_FLAG_WRITE_SCHD));
FIO_LOG_DDEBUG2("(%d) on_read callback for fd %d",
FIO_LOG_DDEBUG2("(%d) poll_on_ready callback for fd %d",
fio_io_pid(),
fio_io_fd(io));
if (!(io->flags & FIO___IO_FLAG_OPEN))
Expand Down Expand Up @@ -35930,11 +35931,7 @@ static void fio___io_poll_on_ready(void *io_, void *ignr_) {
io->total_sent += total;
#endif
}
if ((io->flags & FIO___IO_FLAG_CLOSE)) {
io->pr->io_functions.finish(io->fd, io->tls);
fio_io_close_now(io);
} else if (fio_stream_any(&io->out) ||
io->pr->io_functions.flush(io->fd, io->tls)) {
if (fio_stream_any(&io->out) || io->pr->io_functions.flush(io->fd, io->tls)) {
if (fio_stream_length(&io->out) >= FIO_IO_THROTTLE_LIMIT) {
if (!(io->flags & FIO___IO_FLAG_THROTTLED))
FIO_LOG_DDEBUG2("(%d), throttled IO %p (fd %d)",
Expand All @@ -35944,6 +35941,9 @@ static void fio___io_poll_on_ready(void *io_, void *ignr_) {
FIO___IO_FLAG_SET(io, FIO___IO_FLAG_THROTTLED);
}
fio___io_monitor_out(io);
} else if ((io->flags & FIO___IO_FLAG_CLOSE)) {
io->pr->io_functions.finish(io->fd, io->tls);
fio_io_close_now(io);
} else {
if ((io->flags & FIO___IO_FLAG_THROTTLED)) {
FIO___IO_FLAG_UNSET(io, FIO___IO_FLAG_THROTTLED);
Expand Down Expand Up @@ -36678,11 +36678,12 @@ static void fio___io_signal_handle(int sig, void *flg) {

FIO_SFUNC void fio___io_tick(int timeout) {
static size_t performed_idle = 0;
if (fio_poll_review(&FIO___IO.poll, timeout) > 0) {
performed_idle = 0;
} else if (timeout) {
if (!performed_idle && !FIO___IO.stop)
fio_state_callback_force(FIO_CALL_ON_IDLE);
size_t idle_round = (fio_poll_review(&FIO___IO.poll, timeout) == 0);
performed_idle &= idle_round;
idle_round &= (timeout > 0);
idle_round ^= performed_idle;
if ((idle_round & !FIO___IO.stop)) {
fio_state_callback_force(FIO_CALL_ON_IDLE);
performed_idle = 1;
}
FIO___IO.tick = FIO___IO_GET_TIME_MILLI();
Expand Down Expand Up @@ -38609,7 +38610,6 @@ FIO_SFUNC void fio___pubsub_protocol_on_data_worker(fio_io_s *io);
FIO_SFUNC void fio___pubsub_protocol_on_data_remote(fio_io_s *io);
FIO_SFUNC void fio___pubsub_protocol_on_close(void *buffer, void *udata);
FIO_SFUNC void fio___pubsub_protocol_on_timeout(fio_io_s *io);
FIO_SFUNC void fio___pubsub_protocol_on_iomem_free(void *p_);

static struct FIO___PUBSUB_POSTOFFICE {
fio_u128 uuid;
Expand Down Expand Up @@ -39568,6 +39568,7 @@ void fio_publish FIO_NOOP(fio_publish_args_s args) {
m = fio___pubsub_message_author(args);
m->data.is_json = ((!!args.is_json) | ((uint8_t)(uintptr_t)args.engine));

FIO_LOG_DDEBUG2("publishing pub/sub message (scheduling)");
fio_io_defer(fio___publish_message_task, m, NULL);
return;

Expand Down
46 changes: 46 additions & 0 deletions fio-stl.md
Original file line number Diff line number Diff line change
Expand Up @@ -9784,6 +9784,52 @@ Although many IO API calls are thread safe (they actually schedule events on the

**Note**: this will automatically include a large amount of the facil.io STL modules, which you may prefer to manually include beforehand in order to choose the appropriate memory allocator per module.

### Time Server Example

The following example uses the `FIO_PUBSUB` module together with the `FIO_IO` module to author a very simplistic time server (with no micro-second accuracy).

the `FIO_PUBSUB` module could have been replaced with a `fio_protocol_each` approach, assuming a single threaded implementation. But this approach is both simpler and (usually) more powerful.

```c
#define FIO_LOG
#define FIO_IO
#define FIO_PUBSUB
#define FIO_TIME
#include "fio-stl/include.h"

/** Called when an IO is attached to a protocol. */
FIO_SFUNC void time_protocol_on_attach(fio_io_s *io) {
/* .on_message is unnecessary, by default the message is sent to the IO. */
fio_subscribe(.io = io, .channel = FIO_BUF_INFO1("time"));
}

fio_io_protocol_s TIME_PROTOCOL = {
.on_attach = time_protocol_on_attach, /* subscribe after connection */
.on_timeout = fio_io_touch, /* never times out */
.on_pubsub = FIO_ON_MESSAGE_SEND_MESSAGE, /* write messages to IO */
};

/* timer callback for publishing time */
static int publish_time(void *ignore1_, void *ignore2_) {
char buf[32];
size_t len = fio_time2iso(buf, fio_time_real().tv_sec);
buf[len++] = '\r';
buf[len++] = '\n';
fio_publish(.channel = FIO_BUF_INFO1("time"),
.message = FIO_BUF_INFO2(buf, len));
return 0;
(void)ignore1_, (void)ignore2_;
}

int main(void) {
fio_io_run_every(.fn = publish_time, .every = 1000, .repetitions = -1);
FIO_ASSERT(fio_io_listen(.protocol = &TIME_PROTOCOL), "");
printf("* Time service starting up.\n");
printf(" Press ^C to stop server and exit.\n");
fio_io_start(0);
}
```

### IO Compiler Settings

The following macros control the IO reactor's behavior during compile-time.
Expand Down
2 changes: 1 addition & 1 deletion fio-stl/000 core.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ Sleep / Thread Scheduling Macros
#define FIO_THREAD_YIELD() __asm__ __volatile__("yield" ::: "memory")
#elif defined(_MSC_VER)
#define FIO_THREAD_YIELD() YieldProcessor()
#else FIO_OS_POSIX
#else /* FIO_OS_POSIX */
/** Yields the thread, hinting to the processor about spinlock loop. */
#define FIO_THREAD_YIELD() sched_yield()
#endif
Expand Down
2 changes: 1 addition & 1 deletion fio-stl/002 atol.h
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ Big Numbers

FIO_IFUNC void fio___uXXX_hex_read(uint64_t *t, char **p, size_t l) {
char *start = *p;
start += ((unsigned)(start[0] == '0' & start[1] == 'x') << 1);
start += (((unsigned)(start[0] == '0') & (start[1] == 'x')) << 1);
char *pos = start;
while (fio_i2c((uint8_t)*pos) < 16)
++pos;
Expand Down
2 changes: 1 addition & 1 deletion fio-stl/102 string core.h
Original file line number Diff line number Diff line change
Expand Up @@ -2021,7 +2021,7 @@ SFUNC int fio_string_write_base32enc(fio_str_info_s *dest,
fio_string_realloc_fn reallocate,
const void *raw,
size_t raw_len) {
const static uint8_t base32ecncode[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567";
static const uint8_t base32ecncode[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567";
int r = 0;
size_t expected = ((raw_len * 8) / 5) + 1;
if (fio_string___write_validate_len(dest, reallocate, &expected)) {
Expand Down
46 changes: 46 additions & 0 deletions fio-stl/400 io.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,52 @@ Although many IO API calls are thread safe (they actually schedule events on the

**Note**: this will automatically include a large amount of the facil.io STL modules, which you may prefer to manually include beforehand in order to choose the appropriate memory allocator per module.

### Time Server Example

The following example uses the `FIO_PUBSUB` module together with the `FIO_IO` module to author a very simplistic time server (with no micro-second accuracy).

the `FIO_PUBSUB` module could have been replaced with a `fio_protocol_each` approach, assuming a single threaded implementation. But this approach is both simpler and (usually) more powerful.

```c
#define FIO_LOG
#define FIO_IO
#define FIO_PUBSUB
#define FIO_TIME
#include "fio-stl/include.h"

/** Called when an IO is attached to a protocol. */
FIO_SFUNC void time_protocol_on_attach(fio_io_s *io) {
/* .on_message is unnecessary, by default the message is sent to the IO. */
fio_subscribe(.io = io, .channel = FIO_BUF_INFO1("time"));
}

fio_io_protocol_s TIME_PROTOCOL = {
.on_attach = time_protocol_on_attach, /* subscribe after connection */
.on_timeout = fio_io_touch, /* never times out */
.on_pubsub = FIO_ON_MESSAGE_SEND_MESSAGE, /* write messages to IO */
};

/* timer callback for publishing time */
static int publish_time(void *ignore1_, void *ignore2_) {
char buf[32];
size_t len = fio_time2iso(buf, fio_time_real().tv_sec);
buf[len++] = '\r';
buf[len++] = '\n';
fio_publish(.channel = FIO_BUF_INFO1("time"),
.message = FIO_BUF_INFO2(buf, len));
return 0;
(void)ignore1_, (void)ignore2_;
}

int main(void) {
fio_io_run_every(.fn = publish_time, .every = 1000, .repetitions = -1);
FIO_ASSERT(fio_io_listen(.protocol = &TIME_PROTOCOL), "");
printf("* Time service starting up.\n");
printf(" Press ^C to stop server and exit.\n");
fio_io_start(0);
}
```
### IO Compiler Settings
The following macros control the IO reactor's behavior during compile-time.
Expand Down
12 changes: 6 additions & 6 deletions fio-stl/401 io types.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ FIO_IFUNC void fio___io_defer_no_wakeup(void (*task)(void *, void *),
void fio_io_run_every___(void);
/** Schedules a timer bound task, see `fio_timer_schedule`. */
SFUNC void fio_io_run_every FIO_NOOP(fio_timer_schedule_args_s args) {
args.start_at = FIO___IO.tick;
fio_timer_schedule FIO_NOOP(&FIO___IO.timer, args);
}

Expand Down Expand Up @@ -845,7 +846,7 @@ static void fio___io_poll_on_ready(void *io_, void *ignr_) {
size_t total = 0;
FIO___IO_FLAG_UNSET(io,
(FIO___IO_FLAG_POLLOUT_SET | FIO___IO_FLAG_WRITE_SCHD));
FIO_LOG_DDEBUG2("(%d) on_read callback for fd %d",
FIO_LOG_DDEBUG2("(%d) poll_on_ready callback for fd %d",
fio_io_pid(),
fio_io_fd(io));
if (!(io->flags & FIO___IO_FLAG_OPEN))
Expand Down Expand Up @@ -880,11 +881,7 @@ static void fio___io_poll_on_ready(void *io_, void *ignr_) {
io->total_sent += total;
#endif
}
if ((io->flags & FIO___IO_FLAG_CLOSE)) {
io->pr->io_functions.finish(io->fd, io->tls);
fio_io_close_now(io);
} else if (fio_stream_any(&io->out) ||
io->pr->io_functions.flush(io->fd, io->tls)) {
if (fio_stream_any(&io->out) || io->pr->io_functions.flush(io->fd, io->tls)) {
if (fio_stream_length(&io->out) >= FIO_IO_THROTTLE_LIMIT) {
if (!(io->flags & FIO___IO_FLAG_THROTTLED))
FIO_LOG_DDEBUG2("(%d), throttled IO %p (fd %d)",
Expand All @@ -894,6 +891,9 @@ static void fio___io_poll_on_ready(void *io_, void *ignr_) {
FIO___IO_FLAG_SET(io, FIO___IO_FLAG_THROTTLED);
}
fio___io_monitor_out(io);
} else if ((io->flags & FIO___IO_FLAG_CLOSE)) {
io->pr->io_functions.finish(io->fd, io->tls);
fio_io_close_now(io);
} else {
if ((io->flags & FIO___IO_FLAG_THROTTLED)) {
FIO___IO_FLAG_UNSET(io, FIO___IO_FLAG_THROTTLED);
Expand Down
11 changes: 6 additions & 5 deletions fio-stl/402 io reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ static void fio___io_signal_handle(int sig, void *flg) {

FIO_SFUNC void fio___io_tick(int timeout) {
static size_t performed_idle = 0;
if (fio_poll_review(&FIO___IO.poll, timeout) > 0) {
performed_idle = 0;
} else if (timeout) {
if (!performed_idle && !FIO___IO.stop)
fio_state_callback_force(FIO_CALL_ON_IDLE);
size_t idle_round = (fio_poll_review(&FIO___IO.poll, timeout) == 0);
performed_idle &= idle_round;
idle_round &= (timeout > 0);
idle_round ^= performed_idle;
if ((idle_round & !FIO___IO.stop)) {
fio_state_callback_force(FIO_CALL_ON_IDLE);
performed_idle = 1;
}
FIO___IO.tick = FIO___IO_GET_TIME_MILLI();
Expand Down
2 changes: 1 addition & 1 deletion fio-stl/420 pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,6 @@ FIO_SFUNC void fio___pubsub_protocol_on_data_worker(fio_io_s *io);
FIO_SFUNC void fio___pubsub_protocol_on_data_remote(fio_io_s *io);
FIO_SFUNC void fio___pubsub_protocol_on_close(void *buffer, void *udata);
FIO_SFUNC void fio___pubsub_protocol_on_timeout(fio_io_s *io);
FIO_SFUNC void fio___pubsub_protocol_on_iomem_free(void *p_);

static struct FIO___PUBSUB_POSTOFFICE {
fio_u128 uuid;
Expand Down Expand Up @@ -1724,6 +1723,7 @@ void fio_publish FIO_NOOP(fio_publish_args_s args) {
m = fio___pubsub_message_author(args);
m->data.is_json = ((!!args.is_json) | ((uint8_t)(uintptr_t)args.engine));

FIO_LOG_DDEBUG2("publishing pub/sub message (scheduling)");
fio_io_defer(fio___publish_message_task, m, NULL);
return;

Expand Down

0 comments on commit 768c3e3

Please sign in to comment.