From 8d68717a4148329fd30c0015423581291836d97d Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Mon, 30 Sep 2024 09:48:29 +0200 Subject: [PATCH] MEDIUM: quic: refactor buffered STREAM ACK consuming For the moment, streamdesc layer can only deal with in-order ACK at the stream level. Received out-of-order ACKs are buffered in a tree attached to a streambuf instance. Previously, caller of qc_stream_desc_ack() was responsible to implement consumption of these buffered ACKs. Refactor this by implementing it directly at the streamdesc layer within qc_stream_desc_ack(). This simplifies quic_rx ACK handling and ensure buffered ACKs are consumed as soon as possible. --- src/quic_rx.c | 70 +------------------------ src/quic_stream.c | 128 ++++++++++++++++++++++++++++++++-------------- 2 files changed, 91 insertions(+), 107 deletions(-) diff --git a/src/quic_rx.c b/src/quic_rx.c index 5537d63f3..08e73b4ec 100644 --- a/src/quic_rx.c +++ b/src/quic_rx.c @@ -201,71 +201,6 @@ static int qc_pkt_decrypt(struct quic_conn *qc, struct quic_enc_level *qel, return ret; } -/* Remove from the acknowledged frames. - * - * Returns 1 if at least one frame was removed else 0. - */ -static int quic_stream_try_to_consume(struct quic_conn *qc, - struct qc_stream_desc *stream) -{ - int ret; - struct eb64_node *frm_node; - struct qc_stream_buf *stream_buf; - struct eb64_node *buf_node; - - TRACE_ENTER(QUIC_EV_CONN_ACKSTRM, qc); - - ret = 0; - buf_node = eb64_first(&stream->buf_tree); - if (buf_node) { - stream_buf = eb64_entry(buf_node, struct qc_stream_buf, - offset_node); - - frm_node = eb64_first(&stream_buf->acked_frms); - while (frm_node) { - struct qf_stream *strm_frm; - struct quic_frame *frm; - size_t offset; - - strm_frm = eb64_entry(frm_node, struct qf_stream, offset); - frm = container_of(strm_frm, struct quic_frame, stream); - offset = strm_frm->offset.key; - - if (offset > stream->ack_offset) - break; - - /* First delete frm from tree. This prevents BUG_ON() if - * stream_buf instance is removed via qc_stream_desc_ack(). - */ - eb64_delete(frm_node); - - if (qc_stream_desc_ack(stream, frm)) { - TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM, - qc, strm_frm, stream); - ret = 1; - } - qc_release_frm(qc, frm); - - /* Always retrieve first buffer as the previously used - * instance could have been removed during qc_stream_desc_ack(). - */ - buf_node = eb64_first(&stream->buf_tree); - if (buf_node) { - stream_buf = eb64_entry(buf_node, struct qc_stream_buf, - offset_node); - frm_node = eb64_first(&stream_buf->acked_frms); - BUG_ON(!frm_node && !b_data(&stream_buf->buf)); - } - else { - frm_node = NULL; - } - } - } - - TRACE_LEAVE(QUIC_EV_CONN_ACKSTRM, qc); - return ret; -} - /* Handle frame whose packet it is attached to has just been acknowledged. The memory allocated * for this frame will be at least released in every cases. * Never fail. @@ -298,13 +233,10 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f } stream = eb64_entry(node, struct qc_stream_desc, by_id); - TRACE_DEVEL("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream); if (!qc_stream_desc_ack(stream, frm)) { - TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM, + TRACE_DEVEL("stream consumed on ACK received", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream); - quic_stream_try_to_consume(qc, stream); - if (qc_stream_desc_done(stream)) { /* no need to continue if stream freed. */ TRACE_DEVEL("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc); diff --git a/src/quic_stream.c b/src/quic_stream.c index 47546b579..9d0b3ed8f 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -134,6 +134,79 @@ void qc_stream_desc_release(struct qc_stream_desc *stream, } } +/* Acknowledges data for buffer attached to instance. This covers + * the range strating at and of length , with sets for the + * last stream frame. + * + * Returns if there is still data to acknowledge or buffered ACK to + * consume after completing the operation. Else, the next buffer instance of + * stream is returned if it exists or NULL in the contrary case. + */ +static struct qc_stream_buf *qc_stream_buf_ack(struct qc_stream_buf *buf, + struct qc_stream_desc *stream, + uint64_t offset, uint64_t len, int fin) +{ + /* This function does not deal with out-of-order ACK. */ + BUG_ON(offset > stream->ack_offset); + + if (offset + len > stream->ack_offset) { + const uint64_t diff = offset + len - stream->ack_offset; + b_del(&buf->buf, diff); + stream->ack_offset += diff; + } + + if (fin) { + /* Mark FIN as acknowledged. */ + stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN; + } + + if (!b_data(&buf->buf) && eb_is_empty(&buf->acked_frms)) { + qc_stream_buf_free(stream, &buf); + /* Retrieve next buffer instance. */ + buf = !eb_is_empty(&stream->buf_tree) ? + eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node) : + NULL; + } + + return buf; +} + +/* Consume buffered ACK starting at . If all buffer data is + * removed, is freed and consume will be conducted for following + * streambufs from if present. + */ +static void qc_stream_buf_consume(struct qc_stream_buf *stream_buf, + struct qc_stream_desc *stream) +{ + struct eb64_node *frm_node; + struct qf_stream *strm_frm; + struct quic_frame *frm; + uint64_t offset, len; + int fin; + + frm_node = eb64_first(&stream_buf->acked_frms); + while (frm_node) { + strm_frm = eb64_entry(frm_node, struct qf_stream, offset); + frm = container_of(strm_frm, struct quic_frame, stream); + + offset = strm_frm->offset.key; + len = strm_frm->len; + fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT; + + if (offset > stream->ack_offset) + break; + + /* Delete frame before acknowledged it. This prevents BUG_ON() + * on non-empty acked_frms tree when stream_buf is empty and removed. + */ + eb64_delete(frm_node); + stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin); + qc_release_frm(NULL, frm); + + frm_node = stream_buf ? eb64_first(&stream_buf->acked_frms) : NULL; + } +} + /* Acknowledge STREAM frame whose content is managed by * descriptor. * @@ -150,63 +223,42 @@ int qc_stream_desc_ack(struct qc_stream_desc *stream, struct quic_frame *frm) struct qc_stream_buf *stream_buf = NULL; struct eb64_node *buf_node; - struct buffer *buf = NULL; - size_t diff; + int ret = 0; /* Cannot advertise FIN for an inferior data range. */ BUG_ON(fin && offset + len < stream->ack_offset); - if (offset + len < stream->ack_offset) { - return 0; + /* Do nothing for offset + len < stream->ack_offset as data were + * already acknowledged and removed. + */ + + if (!len) { + BUG_ON(!fin); /* An empty STREAM frame is only needed for a late FIN reporting. */ + + /* Empty STREAM frame with FIN can be acknowledged out-of-order. */ + stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN; } else if (offset > stream->ack_offset) { buf_node = eb64_lookup_le(&stream->buf_tree, offset); BUG_ON(!buf_node); /* Cannot acknowledged a STREAM frame for a non existing buffer. */ - stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node); eb64_insert(&stream_buf->acked_frms, &strm_frm->offset); - return 1; + ret = 1; } - - diff = offset + len - stream->ack_offset; - if (diff) { + else if (offset + len > stream->ack_offset) { /* Buf list cannot be empty if there is still unacked data. */ BUG_ON(eb_is_empty(&stream->buf_tree)); /* get oldest buffer from buf tree */ stream_buf = eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node); - buf = &stream_buf->buf; + stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin); - stream->ack_offset += diff; - b_del(buf, diff); - - /* Free oldest buffer if all data acknowledged. */ - if (!b_data(buf)) { - /* Remove buffered ACK before deleting buffer instance. */ - while (!eb_is_empty(&stream_buf->acked_frms)) { - struct quic_conn *qc = stream->qc; - struct eb64_node *frm_node; - struct qf_stream *strm_frm; - struct quic_frame *frm; - - frm_node = eb64_first(&stream_buf->acked_frms); - eb64_delete(frm_node); - - strm_frm = eb64_entry(frm_node, struct qf_stream, offset); - frm = container_of(strm_frm, struct quic_frame, stream); - qc_release_frm(qc, frm); - } - qc_stream_buf_free(stream, &stream_buf); - buf = NULL; - } - } - - if (fin) { - /* Mark FIN as acknowledged. */ - stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN; + /* some data were acknowledged, try to consume buffered ACKs */ + if (stream_buf) + qc_stream_buf_consume(stream_buf, stream); } - return 0; + return ret; } /* Free the stream descriptor content. This function should be used