diff --git a/sources/config_reader.c b/sources/config_reader.c index e92de11b..356f64df 100644 --- a/sources/config_reader.c +++ b/sources/config_reader.c @@ -1785,6 +1785,10 @@ static int od_config_reader_address(od_config_reader_t *reader, od_address_range_t address_range; address_range = od_address_range_create_default(); + // created with strdup inside + if (address_range.string_value != NULL) { + free(address_range.string_value); + } address_range.string_value = NULL; address_range.string_value_len = 0; address_range.is_default = 0; diff --git a/sources/route.h b/sources/route.h index 46d7b0fd..876fb664 100644 --- a/sources/route.h +++ b/sources/route.h @@ -23,7 +23,7 @@ struct od_route { kiwi_params_lock_t params; int64_t tcp_connections; int last_heartbeat; - machine_channel_t *wait_bus; + machine_wait_list_t *wait_bus; pthread_mutex_t lock; od_error_logger_t *err_logger; @@ -68,7 +68,7 @@ static inline void od_route_free(od_route_t *route) kiwi_params_lock_free(&route->params); if (route->wait_bus) - machine_channel_free(route->wait_bus); + machine_wait_list_destroy(route->wait_bus); if (route->stats.enable_quantiles) { od_stat_free(&route->stats); } @@ -88,7 +88,7 @@ static inline od_route_t *od_route_allocate() if (route == NULL) return NULL; od_route_init(route, true); - route->wait_bus = machine_channel_create(); + route->wait_bus = machine_wait_list_create(); if (route->wait_bus == NULL) { od_route_free(route); return NULL; @@ -194,10 +194,9 @@ static inline void od_route_reload_pool(od_route_t *route) static inline int od_route_wait(od_route_t *route, uint32_t time_ms) { - machine_msg_t *msg; - msg = machine_channel_read(route->wait_bus, time_ms); - if (msg) { - machine_msg_free(msg); + int rc; + rc = machine_wait_list_wait(route->wait_bus, time_ms); + if (rc == 0) { return 0; } return -1; @@ -205,12 +204,7 @@ static inline int od_route_wait(od_route_t *route, uint32_t time_ms) static inline int od_route_signal(od_route_t *route) { - machine_msg_t *msg; - msg = machine_msg_create(0); - if (msg == NULL) { - return -1; - } - machine_channel_write(route->wait_bus, msg); + machine_wait_list_notify(route->wait_bus); return 0; } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 6d09b84c..49996505 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -41,6 +41,10 @@ set(od_test_src machinarium/test_accept_timeout.c machinarium/test_accept_cancel.c machinarium/test_advice_keepalive_usr_timeout.c + machinarium/test_wait_list_without_notify.c + machinarium/test_wait_list_notify_after_wait.c + machinarium/test_wait_list_one_producer_multiple_consumers.c + machinarium/test_wait_list_one_producer_multiple_consumers_threads.c machinarium/test_getaddrinfo0.c machinarium/test_getaddrinfo1.c machinarium/test_getaddrinfo2.c diff --git a/test/machinarium/test_wait_list_notify_after_wait.c b/test/machinarium/test_wait_list_notify_after_wait.c new file mode 100644 index 00000000..168b6572 --- /dev/null +++ b/test/machinarium/test_wait_list_notify_after_wait.c @@ -0,0 +1,67 @@ +#include +#include + +static inline void producer_coroutine(void *arg) +{ + machine_wait_list_t *wl = arg; + + machine_sleep(500); + machine_wait_list_notify(wl); +} + +static inline void consumer_coroutine(void *arg) +{ + machine_wait_list_t *wl = arg; + + uint64_t start, end, total_time; + int rc; + + start = machine_time_ms(); + rc = machine_wait_list_wait(wl, 1000); + end = machine_time_ms(); + test(rc == 0); + total_time = end - start; + test(total_time > 400 && total_time < 1000); +} + +static inline void test_notify_after_wait(void *arg) +{ + (void)arg; + + machine_wait_list_t *wl = machine_wait_list_create(); + + int consumer_id; + consumer_id = machine_coroutine_create(consumer_coroutine, wl); + test(consumer_id != -1); + + int producer_id; + producer_id = machine_coroutine_create(producer_coroutine, wl); + test(producer_id != -1); + + machine_sleep(0); + + int rc; + rc = machine_join(producer_id); + test(rc == 0); + + rc = machine_join(consumer_id); + test(rc == 0); + + machine_wait_list_destroy(wl); +} + +void machinarium_test_wait_list_notify_after_wait() +{ + machinarium_init(); + + int id; + id = machine_create("test_wait_list_notify_after_wait", + test_notify_after_wait, NULL); + test(id != -1); + + int rc; + rc = machine_wait(id); + test(rc != -1); + + machinarium_free(); +} \ No newline at end of file diff --git a/test/machinarium/test_wait_list_one_producer_multiple_consumers.c b/test/machinarium/test_wait_list_one_producer_multiple_consumers.c new file mode 100644 index 00000000..c7e740c0 --- /dev/null +++ b/test/machinarium/test_wait_list_one_producer_multiple_consumers.c @@ -0,0 +1,108 @@ +#include +#include + +static inline void producer_coroutine(void *arg) +{ + machine_wait_list_t *wl = arg; + + uint64_t start, current; + start = machine_time_ms(); + current = start; + + while ((current - start) < 4000) { + machine_wait_list_notify(wl); + machine_sleep(300); + current = machine_time_ms(); + } +} + +typedef struct { + machine_wait_list_t *wl; + int count; + int64_t id; +} consumer_arg_t; + +static inline void consumer_thread(void *arg) +{ + consumer_arg_t *ca = arg; + machine_wait_list_t *wl = ca->wl; + int *count = &ca->count; + + uint64_t start, current; + int rc; + start = machine_time_ms(); + current = start; + + while ((current - start) < 3000) { + rc = machine_wait_list_wait(wl, 1000); + test(rc == 0); + ++(*count); + current = machine_time_ms(); + } +} + +static inline void test_multiple_consumers(void *arg) +{ + (void)arg; + + machine_wait_list_t *wl = machine_wait_list_create(); + + int producer_id; + producer_id = machine_coroutine_create(producer_coroutine, wl); + test(producer_id != -1); + + consumer_arg_t a1, a2, a3; + int c1, c2, c3; + + a1.count = a2.count = a3.count = 0; + a1.wl = a2.wl = a3.wl = wl; + + c1 = machine_create("consumer1", consumer_thread, &a1); + test(c1 != -1); + a1.id = c1; + + c2 = machine_create("consumer2", consumer_thread, &a2); + test(c2 != -1); + a2.id = c2; + + c3 = machine_create("consumer3", consumer_thread, &a3); + test(c3 != -1); + a3.id = c3; + + machine_sleep(0); + + int rc; + rc = machine_join(producer_id); + test(rc == 0); + + rc = machine_wait(c1); + test(rc == 0); + + rc = machine_wait(c2); + test(rc == 0); + + rc = machine_wait(c3); + test(rc == 0); + + test(a1.count >= 3); + test(a2.count >= 3); + test(a3.count >= 3); + + machine_wait_list_destroy(wl); +} + +void machinarium_test_wait_list_one_producer_multiple_consumers() +{ + machinarium_init(); + + int id; + id = machine_create("test_wait_list_one_producer_multiple_consumers", + test_multiple_consumers, NULL); + test(id != -1); + + int rc; + rc = machine_wait(id); + test(rc != -1); + + machinarium_free(); +} \ No newline at end of file diff --git a/test/machinarium/test_wait_list_one_producer_multiple_consumers_threads.c b/test/machinarium/test_wait_list_one_producer_multiple_consumers_threads.c new file mode 100644 index 00000000..65c9a092 --- /dev/null +++ b/test/machinarium/test_wait_list_one_producer_multiple_consumers_threads.c @@ -0,0 +1,100 @@ +#include +#include + +static inline void producer_coroutine(void *arg) +{ + machine_wait_list_t *wl = arg; + + uint64_t start, current; + start = machine_time_ms(); + current = start; + + while ((current - start) < 4000) { + machine_wait_list_notify(wl); + machine_sleep(300); + current = machine_time_ms(); + } +} + +typedef struct { + machine_wait_list_t *wl; + int count; + int64_t id; +} consumer_arg_t; + +static inline void consumer_coroutine(void *arg) +{ + consumer_arg_t *ca = arg; + machine_wait_list_t *wl = ca->wl; + int *count = &ca->count; + + uint64_t start, current; + int rc; + start = machine_time_ms(); + current = start; + + while ((current - start) < 3000) { + rc = machine_wait_list_wait(wl, 1000); + test(rc == 0); + ++(*count); + current = machine_time_ms(); + } +} + +static inline void test_multiple_consumers(void *arg) +{ + (void)arg; + + machine_wait_list_t *wl = machine_wait_list_create(); + + int producer_id; + producer_id = machine_coroutine_create(producer_coroutine, wl); + test(producer_id != -1); + + consumer_arg_t a1, a2, a3; + int c1, c2, c3; + + a1.count = a2.count = a3.count = 0; + a1.wl = a2.wl = a3.wl = wl; + + c1 = machine_coroutine_create(consumer_coroutine, &a1); + test(c1 != -1); + a1.id = c1; + + c2 = machine_coroutine_create(consumer_coroutine, &a2); + test(c2 != -1); + a2.id = c2; + + c3 = machine_coroutine_create(consumer_coroutine, &a3); + test(c3 != -1); + a3.id = c3; + + machine_sleep(0); + + int rc; + rc = machine_join(producer_id); + test(rc == 0); + + test(a1.count >= 3); + test(a2.count >= 3); + test(a3.count >= 3); + + machine_wait_list_destroy(wl); +} + +void machinarium_test_wait_list_one_producer_multiple_consumers_threads() +{ + machinarium_init(); + + int id; + id = machine_create( + "test_wait_list_one_producer_multiple_consumers_threads", + test_multiple_consumers, NULL); + test(id != -1); + + int rc; + rc = machine_wait(id); + test(rc != -1); + + machinarium_free(); +} \ No newline at end of file diff --git a/test/machinarium/test_wait_list_without_notify.c b/test/machinarium/test_wait_list_without_notify.c new file mode 100644 index 00000000..a06e4585 --- /dev/null +++ b/test/machinarium/test_wait_list_without_notify.c @@ -0,0 +1,56 @@ +#include +#include + +static inline void test_wait_without_notify_coroutine(void *arg) +{ + (void)arg; + + machine_wait_list_t *wait_list = machine_wait_list_create(); + + uint64_t start, end; + start = machine_time_ms(); + + int rc; + rc = machine_wait_list_wait(wait_list, 1000); + end = machine_time_ms(); + test(rc == 1); + test(end - start >= 1000); + + // notify without waiters should be ignored + machine_wait_list_notify(wait_list); + rc = machine_wait_list_wait(wait_list, 1000); + test(rc == 1); + + machine_wait_list_destroy(wait_list); +} + +static inline void test_wait_without_notify(void *arg) +{ + (void)arg; + + int id; + id = machine_coroutine_create(test_wait_without_notify_coroutine, NULL); + test(id != -1); + + machine_sleep(0); + + int rc; + rc = machine_join(id); + test(rc == 0); +} + +void machinarium_test_wait_list_without_notify() +{ + machinarium_init(); + + int id; + id = machine_create("test_wait_list_wait_without_notify", + test_wait_without_notify, NULL); + test(id != -1); + + int rc; + rc = machine_wait(id); + test(rc != -1); + + machinarium_free(); +} \ No newline at end of file diff --git a/test/odyssey_test.c b/test/odyssey_test.c index 003e8ac4..c70f59b2 100644 --- a/test/odyssey_test.c +++ b/test/odyssey_test.c @@ -78,6 +78,11 @@ extern void machinarium_test_tls_read_10mb1(void); extern void machinarium_test_tls_read_10mb2(void); extern void machinarium_test_tls_read_multithread(void); extern void machinarium_test_tls_read_var(void); +extern void machinarium_test_wait_list_without_notify(void); +extern void machinarium_test_wait_list_notify_after_wait(void); +extern void machinarium_test_wait_list_one_producer_multiple_consumers(void); +extern void +machinarium_test_wait_list_one_producer_multiple_consumers_threads(void); extern void odyssey_test_tdigest(void); extern void odyssey_test_attribute(void); @@ -158,6 +163,12 @@ int main(int argc, char *argv[]) odyssey_test(machinarium_test_tls_read_10mb2); odyssey_test(machinarium_test_tls_read_multithread); odyssey_test(machinarium_test_tls_read_var); + odyssey_test(machinarium_test_wait_list_without_notify); + odyssey_test(machinarium_test_wait_list_notify_after_wait); + odyssey_test( + machinarium_test_wait_list_one_producer_multiple_consumers); + odyssey_test( + machinarium_test_wait_list_one_producer_multiple_consumers_threads); odyssey_test(odyssey_test_tdigest); odyssey_test(odyssey_test_attribute); odyssey_test(odyssey_test_util); diff --git a/third_party/machinarium/sources/CMakeLists.txt b/third_party/machinarium/sources/CMakeLists.txt index d08c5672..38d41971 100644 --- a/third_party/machinarium/sources/CMakeLists.txt +++ b/third_party/machinarium/sources/CMakeLists.txt @@ -40,6 +40,7 @@ set(machine_src accept.c shutdown.c dns.c + wait_list.c zpq_stream.c compression.c cert_hash.c) diff --git a/third_party/machinarium/sources/atomic.h b/third_party/machinarium/sources/atomic.h new file mode 100644 index 00000000..2977539d --- /dev/null +++ b/third_party/machinarium/sources/atomic.h @@ -0,0 +1,47 @@ +#ifndef MM_ATOMIC_H +#define MM_ATOMIC_H + +static inline uint64_t mm_atomic_u64_inc(uint64_t *ptr) +{ + return __sync_fetch_and_add(ptr, 1ULL); +} + +static inline uint64_t mm_atomic_u64_dec(uint64_t *ptr) +{ + return __sync_fetch_and_sub(ptr, 1ULL); +} + +static inline uint64_t mm_atomic_u64_value(uint64_t *ptr) +{ + return __sync_fetch_and_add(ptr, 0ULL); +} + +static inline int mm_atomic_u64_cas(uint64_t *ptr, uint64_t old, uint64_t new) +{ + return __sync_bool_compare_and_swap(ptr, old, new); +} + +static inline void mm_atomic_u64_set(uint64_t *ptr, uint64_t value) +{ + for (;;) { + uint64_t v = mm_atomic_u64_value(ptr); + + if (mm_atomic_u64_cas(ptr, v, value)) { + break; + } + } +} + +#define mm_atomic_u64_once(ptr, ...) \ + for (;;) { \ + uint64_t v = mm_atomic_u64_value((ptr)); \ + if (v) { \ + break; \ + } \ + if (mm_atomic_u64_cas((ptr), 0ULL, 1ULL)) { \ + __VA_ARGS__ \ + break; \ + } \ + } + +#endif diff --git a/third_party/machinarium/sources/channel.h b/third_party/machinarium/sources/channel.h index d77883d2..667ece0d 100644 --- a/third_party/machinarium/sources/channel.h +++ b/third_party/machinarium/sources/channel.h @@ -34,4 +34,9 @@ mm_retcode_t mm_channel_write(mm_channel_t *, mm_msg_t *); mm_msg_t *mm_channel_read(mm_channel_t *, uint32_t); mm_msg_t *mm_channel_read_back(mm_channel_t *, uint32_t); +static inline int mm_channel_get_size(mm_channel_t *chan) +{ + return chan->msg_list_count; +} + #endif /* MM_CHANNEL_H */ diff --git a/third_party/machinarium/sources/channel_api.c b/third_party/machinarium/sources/channel_api.c index 73bae877..e8450b0e 100644 --- a/third_party/machinarium/sources/channel_api.c +++ b/third_party/machinarium/sources/channel_api.c @@ -112,3 +112,14 @@ MACHINE_API machine_msg_t *machine_channel_read_back(machine_channel_t *obj, msg = mm_channelfast_read(channel, time_ms); return (machine_msg_t *)msg; } + +MACHINE_API size_t machine_channel_get_size(machine_channel_t *chan) +{ + mm_channeltype_t *type; + type = mm_cast(mm_channeltype_t *, chan); + if (type->is_shared) { + return mm_channel_get_size(mm_cast(mm_channel_t *, chan)); + } + + return mm_channelfast_get_size(mm_cast(mm_channelfast_t *, chan)); +} diff --git a/third_party/machinarium/sources/channel_fast.h b/third_party/machinarium/sources/channel_fast.h index fe662c85..040acb4b 100644 --- a/third_party/machinarium/sources/channel_fast.h +++ b/third_party/machinarium/sources/channel_fast.h @@ -35,4 +35,9 @@ mm_retcode_t mm_channelfast_write(mm_channelfast_t *, mm_msg_t *); mm_msg_t *mm_channelfast_read(mm_channelfast_t *, uint32_t); +static inline int mm_channelfast_get_size(mm_channelfast_t *chan) +{ + return chan->incoming_count; +} + #endif /* MM_CHANNEL_FAST_H */ diff --git a/third_party/machinarium/sources/list.h b/third_party/machinarium/sources/list.h index de13712f..3e40e570 100644 --- a/third_party/machinarium/sources/list.h +++ b/third_party/machinarium/sources/list.h @@ -59,4 +59,6 @@ static inline mm_list_t *mm_list_pop_back(mm_list_t *list) #define mm_list_foreach_safe(H, I, N) \ for (I = (H)->next; I != H && (N = I->next); I = N) +#define mm_list_peek(H, type) mm_container_of((H).next, type, link) + #endif /* MM_LIST_H */ diff --git a/third_party/machinarium/sources/machinarium.h b/third_party/machinarium/sources/machinarium.h index a79c1292..39f270ab 100644 --- a/third_party/machinarium/sources/machinarium.h +++ b/third_party/machinarium/sources/machinarium.h @@ -41,6 +41,7 @@ typedef struct machine_channel_private machine_channel_t; typedef struct machine_tls_private machine_tls_t; typedef struct machine_iov_private machine_iov_t; typedef struct machine_io_private machine_io_t; +typedef struct machine_wait_list machine_wait_list_t; /* configuration */ @@ -161,6 +162,8 @@ MACHINE_API machine_msg_t *machine_channel_read(machine_channel_t *, MACHINE_API machine_msg_t *machine_channel_read_back(machine_channel_t *, uint32_t time_ms); +MACHINE_API size_t machine_channel_get_size(machine_channel_t *chan); + /* tls */ MACHINE_API machine_tls_t *machine_tls_create(void); @@ -302,6 +305,13 @@ machine_compression_choose_alg(char *client_compression_algorithms); MACHINE_API const char *machine_get_backtrace_string(); MACHINE_API int machine_get_backtrace(void **entries, int max); +/* wait list */ +MACHINE_API machine_wait_list_t *machine_wait_list_create(); +MACHINE_API void machine_wait_list_destroy(machine_wait_list_t *wait_list); +MACHINE_API int machine_wait_list_wait(machine_wait_list_t *wait_list, + uint32_t timeout_ms); +MACHINE_API void machine_wait_list_notify(machine_wait_list_t *wait_list); + #ifdef __cplusplus } #endif diff --git a/third_party/machinarium/sources/machinarium_private.h b/third_party/machinarium/sources/machinarium_private.h index 8820f7e5..302411cc 100644 --- a/third_party/machinarium/sources/machinarium_private.h +++ b/third_party/machinarium/sources/machinarium_private.h @@ -49,6 +49,8 @@ #include #include +#include "atomic.h" + #include "build.h" #include "macro.h" #include "util.h" @@ -94,6 +96,8 @@ #include "machine_mgr.h" #include "mm.h" +#include "wait_list.h" + #include "iov.h" #include "io.h" #include "tls.h" diff --git a/third_party/machinarium/sources/wait_list.c b/third_party/machinarium/sources/wait_list.c new file mode 100644 index 00000000..824292d2 --- /dev/null +++ b/third_party/machinarium/sources/wait_list.c @@ -0,0 +1,177 @@ +/* + * machinarium. + * + * cooperative multitasking engine. + */ + +#include +#include + +static inline void release_sleepy(mm_wait_list_t *wait_list, + mm_sleepy_t *sleepy) +{ + mm_atomic_u64_once(&sleepy->released, { + mm_list_unlink(&sleepy->link); + mm_atomic_u64_dec(&wait_list->sleepies_count); + }); +} + +static inline void init_sleepy(mm_sleepy_t *sleepy) +{ + if (mm_self != NULL && mm_self->scheduler.current != NULL) { + sleepy->coro_id = mm_self->scheduler.current->id; + } else { + sleepy->coro_id = MM_SLEEPY_NO_CORO_ID; + } + + mm_atomic_u64_set(&sleepy->released, 0ULL); + + mm_list_init(&sleepy->link); + mm_eventmgr_add(&mm_self->event_mgr, &sleepy->event); +} + +static inline void release_sleepy_with_lock(mm_wait_list_t *wait_list, + mm_sleepy_t *sleepy) +{ + // in case sleepy wasn't released from list due to timeout + + if (mm_atomic_u64_value(&sleepy->released) == 0) { + mm_sleeplock_lock(&wait_list->lock); + + // need to check once again, in case some notify has released + // this sleepy between first check and locking + // once-again checking will be performed inside release_sleepy + release_sleepy(wait_list, sleepy); + + mm_sleeplock_unlock(&wait_list->lock); + } +} + +mm_wait_list_t *mm_wait_list_create() +{ + mm_wait_list_t *wait_list = malloc(sizeof(mm_wait_list_t)); + if (wait_list == NULL) { + return NULL; + } + memset(wait_list, 0, sizeof(mm_wait_list_t)); + + mm_sleeplock_init(&wait_list->lock); + + mm_list_init(&wait_list->sleepies); + mm_atomic_u64_set(&wait_list->sleepies_count, 0ULL); + + return wait_list; +} + +void mm_wait_list_destroy(mm_wait_list_t *wait_list) +{ + mm_sleeplock_lock(&wait_list->lock); + + mm_list_t *i, *n; + mm_list_foreach_safe(&wait_list->sleepies, i, n) + { + mm_sleepy_t *sleepy = mm_container_of(i, mm_sleepy_t, link); + mm_call_cancel(&sleepy->event.call, NULL); + + release_sleepy(wait_list, sleepy); + } + + mm_sleeplock_unlock(&wait_list->lock); + + free(wait_list); +} + +static inline int wait_sleepy(mm_wait_list_t *wait_list, mm_sleepy_t *sleepy, + uint32_t timeout_ms) +{ + mm_eventmgr_wait(&mm_self->event_mgr, &sleepy->event, timeout_ms); + + release_sleepy_with_lock(wait_list, sleepy); + + // timeout or cancel + if (sleepy->event.call.status != 0) { + return 1; + } + + return 0; +} + +int mm_wait_list_wait(mm_wait_list_t *wait_list, uint32_t timeout_ms) +{ + mm_sleepy_t this; + init_sleepy(&this); + + mm_sleeplock_lock(&wait_list->lock); + + mm_list_append(&wait_list->sleepies, &this.link); + mm_atomic_u64_inc(&wait_list->sleepies_count); + + mm_sleeplock_unlock(&wait_list->lock); + + int rc; + rc = wait_sleepy(wait_list, &this, timeout_ms); + + return rc; +} + +void mm_wait_list_notify(mm_wait_list_t *wait_list) +{ + mm_sleeplock_lock(&wait_list->lock); + + if (mm_atomic_u64_value(&wait_list->sleepies_count) == 0ULL) { + mm_sleeplock_unlock(&wait_list->lock); + return; + } + + mm_sleepy_t *sleepy; + sleepy = mm_list_peek(wait_list->sleepies, mm_sleepy_t); + + release_sleepy(wait_list, sleepy); + + mm_sleeplock_unlock(&wait_list->lock); + + int event_mgr_fd; + event_mgr_fd = mm_eventmgr_signal(&sleepy->event); + if (event_mgr_fd > 0) { + mm_eventmgr_wakeup(event_mgr_fd); + } +} + +MACHINE_API machine_wait_list_t *machine_wait_list_create() +{ + mm_wait_list_t *wl; + wl = mm_wait_list_create(); + if (wl == NULL) { + return NULL; + } + + return mm_cast(machine_wait_list_t *, wl); +} + +MACHINE_API void machine_wait_list_destroy(machine_wait_list_t *wait_list) +{ + mm_wait_list_t *wl; + wl = mm_cast(mm_wait_list_t *, wait_list); + + mm_wait_list_destroy(wl); +} + +MACHINE_API int machine_wait_list_wait(machine_wait_list_t *wait_list, + uint32_t timeout_ms) +{ + mm_wait_list_t *wl; + wl = mm_cast(mm_wait_list_t *, wait_list); + + int rc; + rc = mm_wait_list_wait(wl, timeout_ms); + + return rc; +} + +MACHINE_API void machine_wait_list_notify(machine_wait_list_t *wait_list) +{ + mm_wait_list_t *wl; + wl = mm_cast(mm_wait_list_t *, wait_list); + + mm_wait_list_notify(wl); +} diff --git a/third_party/machinarium/sources/wait_list.h b/third_party/machinarium/sources/wait_list.h new file mode 100644 index 00000000..99ac47f1 --- /dev/null +++ b/third_party/machinarium/sources/wait_list.h @@ -0,0 +1,34 @@ +#ifndef MM_WAIT_LIST_H +#define MM_WAIT_LIST_H + +/* + * machinarium. + * + * cooperative multitasking engine. + */ + +#define MM_SLEEPY_NO_CORO_ID (~0ULL) + +typedef struct mm_sleepy { + mm_event_t event; + mm_list_t link; + + // we can store coroutine id and we will, in case of some debugging + uint64_t coro_id; + + uint64_t released; +} mm_sleepy_t; + +typedef struct mm_wait_list { + mm_sleeplock_t lock; + mm_list_t sleepies; + uint64_t sleepies_count; +} mm_wait_list_t; + +mm_wait_list_t *mm_wait_list_create(); +void mm_wait_list_destroy(mm_wait_list_t *wait_list); + +int mm_wait_list_wait(mm_wait_list_t *wait_list, uint32_t timeout_ms); +void mm_wait_list_notify(mm_wait_list_t *wait_list); + +#endif /* MM_WAIT_LIST_H */