Skip to content

Commit

Permalink
Msg leaks somewhere (#742)
Browse files Browse the repository at this point in the history
* machinarium/channels: add get_size funcs

Can be used in some debuggin workarounds

Signed-off-by: rkhapov <[email protected]>

* machinarium/wait_list*

This patch adds mm_wait_list_t type
which will be used as condition variable everywhere it needed.

Ex.: wait_bus in router creates it's own wait_list, but with channels
and therefore it has memory leaks because of inconsistency of it usages.

Signed-off-by: rkhapov <[email protected]>

* sources/route.h: fix wait_bus inconsistency

route->wait_bus was implemented with channels.
This leads to memory leaks because of inconsystency of queued
clients notifications.

This patch replaces channel with wait_list, which will not lead
to memory leaks.

Signed-off-by: rkhapov <[email protected]>

* sources/config_reader.c: fix small memleak

just to silence asan

Signed-off-by: rkhapov <[email protected]>

---------

Signed-off-by: rkhapov <[email protected]>
Co-authored-by: rkhapov <[email protected]>
  • Loading branch information
rkhapov and rkhapov authored Dec 16, 2024
1 parent 87fa7d1 commit 806856e
Show file tree
Hide file tree
Showing 18 changed files with 653 additions and 13 deletions.
4 changes: 4 additions & 0 deletions sources/config_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,10 @@ static int od_config_reader_address(od_config_reader_t *reader,

od_address_range_t address_range;
address_range = od_address_range_create_default();
// created with strdup inside
if (address_range.string_value != NULL) {
free(address_range.string_value);
}
address_range.string_value = NULL;
address_range.string_value_len = 0;
address_range.is_default = 0;
Expand Down
20 changes: 7 additions & 13 deletions sources/route.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct od_route {
kiwi_params_lock_t params;
int64_t tcp_connections;
int last_heartbeat;
machine_channel_t *wait_bus;
machine_wait_list_t *wait_bus;
pthread_mutex_t lock;

od_error_logger_t *err_logger;
Expand Down Expand Up @@ -68,7 +68,7 @@ static inline void od_route_free(od_route_t *route)

kiwi_params_lock_free(&route->params);
if (route->wait_bus)
machine_channel_free(route->wait_bus);
machine_wait_list_destroy(route->wait_bus);
if (route->stats.enable_quantiles) {
od_stat_free(&route->stats);
}
Expand All @@ -88,7 +88,7 @@ static inline od_route_t *od_route_allocate()
if (route == NULL)
return NULL;
od_route_init(route, true);
route->wait_bus = machine_channel_create();
route->wait_bus = machine_wait_list_create();
if (route->wait_bus == NULL) {
od_route_free(route);
return NULL;
Expand Down Expand Up @@ -194,23 +194,17 @@ static inline void od_route_reload_pool(od_route_t *route)

static inline int od_route_wait(od_route_t *route, uint32_t time_ms)
{
machine_msg_t *msg;
msg = machine_channel_read(route->wait_bus, time_ms);
if (msg) {
machine_msg_free(msg);
int rc;
rc = machine_wait_list_wait(route->wait_bus, time_ms);
if (rc == 0) {
return 0;
}
return -1;
}

static inline int od_route_signal(od_route_t *route)
{
machine_msg_t *msg;
msg = machine_msg_create(0);
if (msg == NULL) {
return -1;
}
machine_channel_write(route->wait_bus, msg);
machine_wait_list_notify(route->wait_bus);
return 0;
}

Expand Down
4 changes: 4 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ set(od_test_src
machinarium/test_accept_timeout.c
machinarium/test_accept_cancel.c
machinarium/test_advice_keepalive_usr_timeout.c
machinarium/test_wait_list_without_notify.c
machinarium/test_wait_list_notify_after_wait.c
machinarium/test_wait_list_one_producer_multiple_consumers.c
machinarium/test_wait_list_one_producer_multiple_consumers_threads.c
machinarium/test_getaddrinfo0.c
machinarium/test_getaddrinfo1.c
machinarium/test_getaddrinfo2.c
Expand Down
67 changes: 67 additions & 0 deletions test/machinarium/test_wait_list_notify_after_wait.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include <machinarium.h>
#include <odyssey_test.h>

static inline void producer_coroutine(void *arg)
{
machine_wait_list_t *wl = arg;

machine_sleep(500);
machine_wait_list_notify(wl);
}

static inline void consumer_coroutine(void *arg)
{
machine_wait_list_t *wl = arg;

uint64_t start, end, total_time;
int rc;

start = machine_time_ms();
rc = machine_wait_list_wait(wl, 1000);
end = machine_time_ms();
test(rc == 0);
total_time = end - start;
test(total_time > 400 && total_time < 1000);
}

static inline void test_notify_after_wait(void *arg)
{
(void)arg;

machine_wait_list_t *wl = machine_wait_list_create();

int consumer_id;
consumer_id = machine_coroutine_create(consumer_coroutine, wl);
test(consumer_id != -1);

int producer_id;
producer_id = machine_coroutine_create(producer_coroutine, wl);
test(producer_id != -1);

machine_sleep(0);

int rc;
rc = machine_join(producer_id);
test(rc == 0);

rc = machine_join(consumer_id);
test(rc == 0);

machine_wait_list_destroy(wl);
}

void machinarium_test_wait_list_notify_after_wait()
{
machinarium_init();

int id;
id = machine_create("test_wait_list_notify_after_wait",
test_notify_after_wait, NULL);
test(id != -1);

int rc;
rc = machine_wait(id);
test(rc != -1);

machinarium_free();
}
108 changes: 108 additions & 0 deletions test/machinarium/test_wait_list_one_producer_multiple_consumers.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include <machinarium.h>
#include <odyssey_test.h>

static inline void producer_coroutine(void *arg)
{
machine_wait_list_t *wl = arg;

uint64_t start, current;
start = machine_time_ms();
current = start;

while ((current - start) < 4000) {
machine_wait_list_notify(wl);
machine_sleep(300);
current = machine_time_ms();
}
}

typedef struct {
machine_wait_list_t *wl;
int count;
int64_t id;
} consumer_arg_t;

static inline void consumer_thread(void *arg)
{
consumer_arg_t *ca = arg;
machine_wait_list_t *wl = ca->wl;
int *count = &ca->count;

uint64_t start, current;
int rc;
start = machine_time_ms();
current = start;

while ((current - start) < 3000) {
rc = machine_wait_list_wait(wl, 1000);
test(rc == 0);
++(*count);
current = machine_time_ms();
}
}

static inline void test_multiple_consumers(void *arg)
{
(void)arg;

machine_wait_list_t *wl = machine_wait_list_create();

int producer_id;
producer_id = machine_coroutine_create(producer_coroutine, wl);
test(producer_id != -1);

consumer_arg_t a1, a2, a3;
int c1, c2, c3;

a1.count = a2.count = a3.count = 0;
a1.wl = a2.wl = a3.wl = wl;

c1 = machine_create("consumer1", consumer_thread, &a1);
test(c1 != -1);
a1.id = c1;

c2 = machine_create("consumer2", consumer_thread, &a2);
test(c2 != -1);
a2.id = c2;

c3 = machine_create("consumer3", consumer_thread, &a3);
test(c3 != -1);
a3.id = c3;

machine_sleep(0);

int rc;
rc = machine_join(producer_id);
test(rc == 0);

rc = machine_wait(c1);
test(rc == 0);

rc = machine_wait(c2);
test(rc == 0);

rc = machine_wait(c3);
test(rc == 0);

test(a1.count >= 3);
test(a2.count >= 3);
test(a3.count >= 3);

machine_wait_list_destroy(wl);
}

void machinarium_test_wait_list_one_producer_multiple_consumers()
{
machinarium_init();

int id;
id = machine_create("test_wait_list_one_producer_multiple_consumers",
test_multiple_consumers, NULL);
test(id != -1);

int rc;
rc = machine_wait(id);
test(rc != -1);

machinarium_free();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include <machinarium.h>
#include <odyssey_test.h>

static inline void producer_coroutine(void *arg)
{
machine_wait_list_t *wl = arg;

uint64_t start, current;
start = machine_time_ms();
current = start;

while ((current - start) < 4000) {
machine_wait_list_notify(wl);
machine_sleep(300);
current = machine_time_ms();
}
}

typedef struct {
machine_wait_list_t *wl;
int count;
int64_t id;
} consumer_arg_t;

static inline void consumer_coroutine(void *arg)
{
consumer_arg_t *ca = arg;
machine_wait_list_t *wl = ca->wl;
int *count = &ca->count;

uint64_t start, current;
int rc;
start = machine_time_ms();
current = start;

while ((current - start) < 3000) {
rc = machine_wait_list_wait(wl, 1000);
test(rc == 0);
++(*count);
current = machine_time_ms();
}
}

static inline void test_multiple_consumers(void *arg)
{
(void)arg;

machine_wait_list_t *wl = machine_wait_list_create();

int producer_id;
producer_id = machine_coroutine_create(producer_coroutine, wl);
test(producer_id != -1);

consumer_arg_t a1, a2, a3;
int c1, c2, c3;

a1.count = a2.count = a3.count = 0;
a1.wl = a2.wl = a3.wl = wl;

c1 = machine_coroutine_create(consumer_coroutine, &a1);
test(c1 != -1);
a1.id = c1;

c2 = machine_coroutine_create(consumer_coroutine, &a2);
test(c2 != -1);
a2.id = c2;

c3 = machine_coroutine_create(consumer_coroutine, &a3);
test(c3 != -1);
a3.id = c3;

machine_sleep(0);

int rc;
rc = machine_join(producer_id);
test(rc == 0);

test(a1.count >= 3);
test(a2.count >= 3);
test(a3.count >= 3);

machine_wait_list_destroy(wl);
}

void machinarium_test_wait_list_one_producer_multiple_consumers_threads()
{
machinarium_init();

int id;
id = machine_create(
"test_wait_list_one_producer_multiple_consumers_threads",
test_multiple_consumers, NULL);
test(id != -1);

int rc;
rc = machine_wait(id);
test(rc != -1);

machinarium_free();
}
Loading

0 comments on commit 806856e

Please sign in to comment.