From 782d8177d14fe39409a87ec5f9ac725dad2322ac Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 12 Oct 2024 14:31:35 -0700 Subject: [PATCH] UDP: add some statistics --- include/nng/nng.h | 2 + src/sp/transport/udp/udp.c | 111 +++++++++++++++++++++++++++++++++---- 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/include/nng/nng.h b/include/nng/nng.h index 17965010d..1b3eae207 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -818,6 +818,8 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe); // low order 16 bits will be set. This is provided in native byte order, // which makes it more convenient than using the NNG_OPT_LOCADDR option. #define NNG_OPT_TCP_BOUND_PORT "tcp-bound-port" +// UDP alias for convenience uses the same value +#define NNG_OPT_UDP_BOUND_PORT NNG_OPT_TCP_BOUND_PORT // IPC options. These will largely vary depending on the platform, // as POSIX systems have very different options than Windows. diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c index 8bf5a4d9f..10e403978 100644 --- a/src/sp/transport/udp/udp.c +++ b/src/sp/transport/udp/udp.c @@ -12,6 +12,7 @@ #include "core/message.h" #include "core/nng_impl.h" #include "core/options.h" +#include "core/pipe.h" #include "core/platform.h" #include "nng/nng.h" @@ -219,6 +220,15 @@ struct udp_ep { #ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; + nni_stat_item st_rcv_reorder; + nni_stat_item st_rcv_toobig; + nni_stat_item st_rcv_nomatch; + nni_stat_item st_rcv_copy; + nni_stat_item st_rcv_nocopy; + nni_stat_item st_rcv_nobuf; + nni_stat_item st_snd_toobig; + nni_stat_item st_snd_nobuf; + nni_stat_item st_peer_inactive; #endif }; @@ -363,9 +373,12 @@ udp_check_pipe_sequence(udp_pipe *p, uint32_t seq) delta = (int32_t) (seq - p->peer_seq); if (delta < 0) { // out of order delivery + nni_stat_inc(&p->ep->st_rcv_reorder, 1); return (false); } - // TODO: bump a stat for misses if delta > 0. + if (delta > 0) { + nni_stat_inc(&p->ep->st_rcv_reorder, 1); + } p->peer_seq = seq + 1; // expected next sequence number return (true); } @@ -452,7 +465,7 @@ udp_queue_tx(udp_ep *ep, nng_sockaddr *sa, udp_sp_msg *msg, nni_msg *payload) if (ring->count == ring->size || !ep->started) { // ring is full - // TODO: bump a stat + nni_stat_inc(&ep->st_snd_nobuf, 1); if (payload != NULL) { nni_msg_free(payload); } @@ -621,7 +634,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) // NB: Peer ID endianness does not matter, as long we use it // consistently. if ((p = udp_find_pipe(ep, peer_id, send_id)) == NULL) { - // TODO: Bump a stat... + nni_stat_inc(&ep->st_rcv_nomatch, 1); udp_send_disc_full(ep, sa, send_id, peer_id, 0, DISC_NOTCONN); // Question: how do we store the sockaddr for that? return; @@ -637,6 +650,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) // Make sure the message wasn't truncated, and that it fits within // our maximum agreed upon payload. if ((dreq->us_length > len) || (dreq->us_length > p->rcvmax)) { + nni_stat_inc(&ep->st_rcv_toobig, 1); udp_send_disc(ep, p, DISC_MSGSIZE); return; } @@ -652,19 +666,21 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) if (!udp_check_pipe_sequence(p, dreq->us_sequence)) { // out of order delivery, drop it - // TODO: bump a stat return; } if (nni_lmq_full(&p->rx_mq)) { - // bump a NOBUF stat + nni_stat_inc(&ep->st_rcv_nobuf, 1); return; } // Short message, just alloc and copy if (len <= ep->short_msg) { + nni_stat_inc(&ep->st_rcv_copy, 1); if (nng_msg_alloc(&msg, len) != 0) { - // TODO: bump a stat + if (p->npipe != NULL) { + nni_pipe_bump_error(p->npipe, NNG_ENOMEM); + } return; } nni_msg_set_address(msg, sa); @@ -672,12 +688,15 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) nni_msg_append(msg, nni_msg_body(ep->rx_payload), len); nni_lmq_put(&p->rx_mq, msg); } else { + nni_stat_inc(&ep->st_rcv_nocopy, 1); // Message size larger than copy break, do zero copy msg = ep->rx_payload; if (nng_msg_alloc(&ep->rx_payload, ep->rcvmax + sizeof(ep->rx_msg)) != 0) { - // TODO: bump a stat ep->rx_payload = msg; // make sure we put it back + if (p->npipe != NULL) { + nni_pipe_bump_error(p->npipe, NNG_ENOMEM); + } return; } @@ -900,8 +919,6 @@ udp_rx_cb(void *arg) hdr->data.us_length = NNI_GET16LE(&hdr->data.us_length); #endif - // TODO: verify that incoming type matches us! - switch (hdr->data.us_op_code) { case OPCODE_DATA: udp_recv_data(ep, &hdr->data, n, sa); @@ -964,7 +981,7 @@ udp_pipe_send(void *arg, nni_aio *aio) // floor. this is on the sender, so there isn't a compelling // need to disconnect the pipe, since it we're not being // "ill-behaved" to our peer. - // TODO: bump a stat + nni_stat_inc(&ep->st_snd_toobig, 1); nni_msg_free(msg); return; } @@ -1241,6 +1258,7 @@ udp_timer_cb(void *arg) // This will probably not be received by the peer, // since we aren't getting anything from them. But // having it on the wire may help debugging later. + nni_stat_inc(&ep->st_peer_inactive, 1); udp_send_disc(ep, p, DISC_INACTIVE); continue; } @@ -1321,7 +1339,80 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock) .si_unit = NNG_UNIT_BYTES, .si_atomic = true, }; + static const nni_stat_info rcv_reorder_info = { + .si_name = "rcv_reorder", + .si_desc = "messages received out of order", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + .si_atomic = true, + }; + static const nni_stat_info rcv_toobig_info = { + .si_name = "rcv_toobig", + .si_desc = "received messages rejected because too big", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + .si_atomic = true, + }; + static const nni_stat_info rcv_nomatch_info = { + .si_name = "rcv_nomatch", + .si_desc = "received messages without a matching connection", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + .si_atomic = true, + }; + static const nni_stat_info rcv_copy_info = { + .si_name = "rcv_copy", + .si_desc = "received messages copied (small)", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + .si_atomic = true, + }; + static const nni_stat_info rcv_nocopy_info = { + .si_name = "rcv_nocopy", + .si_desc = "received messages zero copy (large)", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + .si_atomic = true, + }; + static const nni_stat_info rcv_nobuf_info = { + .si_name = "rcv_nobuf", + .si_desc = "received messages dropped no buffer", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + .si_atomic = true, + }; + static const nni_stat_info snd_toobig_info = { + .si_name = "snd_toobig", + .si_desc = "sent messages rejected because too big", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + .si_atomic = true, + }; + static const nni_stat_info snd_nobuf_info = { + .si_name = "snd_nobuf", + .si_desc = "sent messages dropped no buffer", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + .si_atomic = true, + }; + static const nni_stat_info peer_inactive_info = { + .si_name = "peer_inactive", + .si_desc = "connections closed due to inactive peer", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_EVENTS, + .si_atomic = true, + }; + nni_stat_init(&ep->st_rcv_max, &rcv_max_info); + nni_stat_init(&ep->st_rcv_copy, &rcv_copy_info); + nni_stat_init(&ep->st_rcv_nocopy, &rcv_nocopy_info); + nni_stat_init(&ep->st_rcv_reorder, &rcv_reorder_info); + nni_stat_init(&ep->st_rcv_toobig, &rcv_toobig_info); + nni_stat_init(&ep->st_rcv_nomatch, &rcv_nomatch_info); + nni_stat_init(&ep->st_rcv_nobuf, &rcv_nobuf_info); + nni_stat_init(&ep->st_snd_toobig, &snd_toobig_info); + nni_stat_init(&ep->st_snd_nobuf, &snd_nobuf_info); + nni_stat_init(&ep->st_peer_inactive, &peer_inactive_info); #endif // schedule our timer callback - forever for now