Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDP: burst testing to improve coverage #1892

Merged
merged 4 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ include(CheckSanitizer)
CheckSanitizer()
if (NOT NNG_SANITIZER STREQUAL "none")
set(NNG_SANITIZER_FLAGS "-fsanitize=${NNG_SANITIZER}")
add_definitions(-DNNG_SANITIZER)
endif ()

if (NNG_ENABLE_COVERAGE)
Expand All @@ -195,6 +196,7 @@ if (NNG_ENABLE_COVERAGE)
else ()
message(FATAL_ERROR "Unable to enable coverage for your compiler.")
endif ()
add_definitions(-DNNG_CONVERAGE)
endif ()

set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${NNG_WARN_FLAGS} ${NNG_COVERAGE_C_FLAGS} ${NNG_SANITIZER_FLAGS}")
Expand Down
46 changes: 35 additions & 11 deletions src/sp/transport/udp/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ udp_pipe_send(void *arg, nni_aio *aio)
udp_ep *ep;
udp_sp_data dreq;
nng_msg *msg;
size_t count = 0;

if (nni_aio_begin(aio) != 0) {
// No way to give the message back to the protocol,
Expand All @@ -981,6 +982,10 @@ udp_pipe_send(void *arg, nni_aio *aio)
msg = nni_aio_get_msg(aio);
ep = p->ep;

if (msg != NULL) {
count = nni_msg_len(msg) + nni_msg_header_len(msg);
}

nni_mtx_lock(&ep->mtx);
if ((nni_msg_len(msg) + nni_msg_header_len(msg)) > p->sndmax) {
nni_mtx_unlock(&ep->mtx);
Expand All @@ -999,14 +1004,13 @@ udp_pipe_send(void *arg, nni_aio *aio)
dreq.us_sender_id = p->self_id;
dreq.us_peer_id = p->peer_id;
dreq.us_sequence = p->self_seq++;
dreq.us_length =
msg != NULL ? nni_msg_len(msg) + nni_msg_header_len(msg) : 0;
dreq.us_length = (uint16_t) count;

// Just queue it, or fail it.
udp_queue_tx(ep, &p->peer_addr, (void *) &dreq, msg);
nni_mtx_unlock(&ep->mtx);
nni_aio_finish(
aio, 0, msg ? nni_msg_len(msg) + nni_msg_header_len(msg) : 0);

nni_aio_finish(aio, 0, count);
}

static void
Expand Down Expand Up @@ -1289,7 +1293,8 @@ udp_timer_cb(void *arg)
}

static int
udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer,
nni_listener *listener)
{
udp_ep *ep;
int rv;
Expand Down Expand Up @@ -1432,7 +1437,29 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
nni_stat_init(&ep->st_snd_nobuf, &snd_nobuf_info);
nni_stat_init(&ep->st_peer_inactive, &peer_inactive_info);

nni_stat_set_value(&ep->st_rcv_max, ep->rcvmax);
if (listener) {
nni_listener_add_stat(listener, &ep->st_rcv_max);
nni_listener_add_stat(listener, &ep->st_copy_max);
nni_listener_add_stat(listener, &ep->st_rcv_copy);
nni_listener_add_stat(listener, &ep->st_rcv_nocopy);
nni_listener_add_stat(listener, &ep->st_rcv_reorder);
nni_listener_add_stat(listener, &ep->st_rcv_toobig);
nni_listener_add_stat(listener, &ep->st_rcv_nomatch);
nni_listener_add_stat(listener, &ep->st_rcv_nobuf);
nni_listener_add_stat(listener, &ep->st_snd_toobig);
nni_listener_add_stat(listener, &ep->st_snd_nobuf);
} else {
nni_dialer_add_stat(dialer, &ep->st_rcv_max);
nni_dialer_add_stat(dialer, &ep->st_copy_max);
nni_dialer_add_stat(dialer, &ep->st_rcv_copy);
nni_dialer_add_stat(dialer, &ep->st_rcv_nocopy);
nni_dialer_add_stat(dialer, &ep->st_rcv_reorder);
nni_dialer_add_stat(dialer, &ep->st_rcv_toobig);
nni_dialer_add_stat(dialer, &ep->st_rcv_nomatch);
nni_dialer_add_stat(dialer, &ep->st_rcv_nobuf);
nni_dialer_add_stat(dialer, &ep->st_snd_toobig);
nni_dialer_add_stat(dialer, &ep->st_snd_nobuf);
}

// schedule our timer callback - forever for now
// adjusted automatically as we add pipes or other
Expand Down Expand Up @@ -1474,11 +1501,10 @@ udp_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
return (rv);
}

if ((rv = udp_ep_init(&ep, url, sock)) != 0) {
if ((rv = udp_ep_init(&ep, url, sock, ndialer, NULL)) != 0) {
return (rv);
}

nni_dialer_add_stat(ndialer, &ep->st_rcv_max);
*dp = ep;
return (0);
}
Expand All @@ -1497,13 +1523,11 @@ udp_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
return (rv);
}

if ((rv = udp_ep_init(&ep, url, sock)) != 0) {
if ((rv = udp_ep_init(&ep, url, sock, NULL, nlistener)) != 0) {
return (rv);
}
ep->self_sa = sa;

nni_listener_add_stat(nlistener, &ep->st_rcv_max);

*lp = ep;
return (0);
}
Expand Down
122 changes: 119 additions & 3 deletions src/sp/transport/udp/udp_tran_test.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
// Copyright 2018 Devolutions <[email protected]>
// Copyright 2018 Cody Piersall <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
Expand Down Expand Up @@ -195,6 +192,123 @@ test_udp_recv_copy(void)
NUTS_CLOSE(s1);
}

void
test_udp_multi_send_recv(void)
{
char msg[256];
char buf[256];
nng_socket s0;
nng_socket s1;
nng_listener l;
nng_dialer d;
size_t sz;
char *addr;

NUTS_ADDR(addr, "udp");

NUTS_OPEN(s0);
NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 100));
NUTS_PASS(nng_listener_create(&l, s0, addr));
NUTS_PASS(nng_listener_set_size(l, NNG_OPT_UDP_COPY_MAX, 100));
NUTS_PASS(nng_listener_get_size(l, NNG_OPT_UDP_COPY_MAX, &sz));
NUTS_TRUE(sz == 100);
NUTS_PASS(nng_listener_start(l, 0));

NUTS_OPEN(s1);
NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 100));
NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100));
NUTS_PASS(nng_dialer_create(&d, s1, addr));
NUTS_PASS(nng_dialer_set_size(d, NNG_OPT_UDP_COPY_MAX, 100));
NUTS_PASS(nng_dialer_get_size(d, NNG_OPT_UDP_COPY_MAX, &sz));
NUTS_PASS(nng_dialer_start(d, 0));
nng_msleep(100);

for (int i = 0; i < 1000; i++) {
NUTS_PASS(nng_send(s1, msg, 95, 0));
NUTS_PASS(nng_recv(s0, buf, &sz, 0));
NUTS_TRUE(sz == 95);
NUTS_PASS(nng_send(s0, msg, 95, 0));
NUTS_PASS(nng_recv(s1, buf, &sz, 0));
NUTS_TRUE(sz == 95);
}
NUTS_CLOSE(s0);
NUTS_CLOSE(s1);
}

void
test_udp_multi_small_burst(void)
{
char msg[256];
char buf[256];
nng_socket s0;
nng_socket s1;
nng_listener l;
nng_dialer d;
size_t sz;
char *addr;

NUTS_ADDR(addr, "udp");

NUTS_OPEN(s0);
NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 10));
NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 1000));
NUTS_PASS(nng_listener_create(&l, s0, addr));
NUTS_PASS(nng_listener_set_size(l, NNG_OPT_UDP_COPY_MAX, 100));
NUTS_PASS(nng_listener_get_size(l, NNG_OPT_UDP_COPY_MAX, &sz));
NUTS_TRUE(sz == 100);
NUTS_PASS(nng_listener_start(l, 0));

NUTS_OPEN(s1);
NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 10));
NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 1000));
NUTS_PASS(nng_dialer_create(&d, s1, addr));
NUTS_PASS(nng_dialer_set_size(d, NNG_OPT_UDP_COPY_MAX, 100));
NUTS_PASS(nng_dialer_get_size(d, NNG_OPT_UDP_COPY_MAX, &sz));
NUTS_PASS(nng_dialer_start(d, 0));
nng_msleep(100);

float actual = 0;
float expect = 0;
float require = 0.50;
int burst = 4;
int count = 20;

#if defined(NNG_SANITIZER) || defined(NNG_COVERAGE)
// sanitizers may drop a lot, so can coverage
require = 0.0;
#elif defined(NNG_PLATFORM_WINDOWS)
// Windows seems to drop a lot - maybe because of virtualization
burst = 2;
count = 10;
require = 0.10;
#endif

// Experimentally at least on Darwin, we see some packet losses
// even for loopback. Loss rates appear depressingly high.
for (int i = 0; i < count; i++) {
for (int j = 0; j < burst; j++) {
NUTS_PASS(nng_send(s1, msg, 95, 0));
expect++;
}
for (int j = 0; j < burst; j++) {
if (nng_recv(s0, buf, &sz, 0) == 0) {
NUTS_TRUE(sz == 95);
actual++;
}
}
NUTS_PASS(nng_send(s0, msg, 95, 0));
NUTS_PASS(nng_recv(s1, buf, &sz, 0));
NUTS_TRUE(sz == 95);
}
NUTS_TRUE(actual <= expect);
NUTS_TRUE(actual / expect > require);
NUTS_MSG("Packet loss: %.02f (got %.f of %.f)", 1.0 - actual / expect,
actual, expect);
NUTS_CLOSE(s0);
NUTS_CLOSE(s1);
}

NUTS_TESTS = {

{ "udp wild card connect fail", test_udp_wild_card_connect_fail },
Expand All @@ -205,5 +319,7 @@ NUTS_TESTS = {
{ "udp malformed address", test_udp_malformed_address },
{ "udp recv max", test_udp_recv_max },
{ "udp recv copy", test_udp_recv_copy },
{ "udp multi send recv", test_udp_multi_send_recv },
{ "udp multi small burst", test_udp_multi_small_burst },
{ NULL, NULL },
};
Loading