Skip to content

Commit

Permalink
MEDIUM: quic: refactor buffered STREAM ACK consuming
Browse files Browse the repository at this point in the history
For the moment, streamdesc layer can only deal with in-order ACK at the
stream level. Received out-of-order ACKs are buffered in a tree attached
to a streambuf instance.

Previously, caller of qc_stream_desc_ack() was responsible to implement
consumption of these buffered ACKs. Refactor this by implementing it
directly at the streamdesc layer within qc_stream_desc_ack(). This
simplifies quic_rx ACK handling and ensure buffered ACKs are consumed as
soon as possible.
  • Loading branch information
a-denoyelle committed Oct 1, 2024
1 parent cc4384a commit 8d68717
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 107 deletions.
70 changes: 1 addition & 69 deletions src/quic_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,71 +201,6 @@ static int qc_pkt_decrypt(struct quic_conn *qc, struct quic_enc_level *qel,
return ret;
}

/* Remove from <stream> the acknowledged frames.
*
* Returns 1 if at least one frame was removed else 0.
*/
static int quic_stream_try_to_consume(struct quic_conn *qc,
struct qc_stream_desc *stream)
{
int ret;
struct eb64_node *frm_node;
struct qc_stream_buf *stream_buf;
struct eb64_node *buf_node;

TRACE_ENTER(QUIC_EV_CONN_ACKSTRM, qc);

ret = 0;
buf_node = eb64_first(&stream->buf_tree);
if (buf_node) {
stream_buf = eb64_entry(buf_node, struct qc_stream_buf,
offset_node);

frm_node = eb64_first(&stream_buf->acked_frms);
while (frm_node) {
struct qf_stream *strm_frm;
struct quic_frame *frm;
size_t offset;

strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
frm = container_of(strm_frm, struct quic_frame, stream);
offset = strm_frm->offset.key;

if (offset > stream->ack_offset)
break;

/* First delete frm from tree. This prevents BUG_ON() if
* stream_buf instance is removed via qc_stream_desc_ack().
*/
eb64_delete(frm_node);

if (qc_stream_desc_ack(stream, frm)) {
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
qc, strm_frm, stream);
ret = 1;
}
qc_release_frm(qc, frm);

/* Always retrieve first buffer as the previously used
* instance could have been removed during qc_stream_desc_ack().
*/
buf_node = eb64_first(&stream->buf_tree);
if (buf_node) {
stream_buf = eb64_entry(buf_node, struct qc_stream_buf,
offset_node);
frm_node = eb64_first(&stream_buf->acked_frms);
BUG_ON(!frm_node && !b_data(&stream_buf->buf));
}
else {
frm_node = NULL;
}
}
}

TRACE_LEAVE(QUIC_EV_CONN_ACKSTRM, qc);
return ret;
}

/* Handle <frm> frame whose packet it is attached to has just been acknowledged. The memory allocated
* for this frame will be at least released in every cases.
* Never fail.
Expand Down Expand Up @@ -298,13 +233,10 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f
}
stream = eb64_entry(node, struct qc_stream_desc, by_id);

TRACE_DEVEL("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
if (!qc_stream_desc_ack(stream, frm)) {
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
TRACE_DEVEL("stream consumed on ACK received", QUIC_EV_CONN_ACKSTRM,
qc, strm_frm, stream);

quic_stream_try_to_consume(qc, stream);

if (qc_stream_desc_done(stream)) {
/* no need to continue if stream freed. */
TRACE_DEVEL("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);
Expand Down
128 changes: 90 additions & 38 deletions src/quic_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,79 @@ void qc_stream_desc_release(struct qc_stream_desc *stream,
}
}

/* Acknowledges data for buffer <buf> attached to <stream> instance. This covers
* the range strating at <offset> and of length <len>, with <fin> sets for the
* last stream frame.
*
* Returns <buf> if there is still data to acknowledge or buffered ACK to
* consume after completing the operation. Else, the next buffer instance of
* stream is returned if it exists or NULL in the contrary case.
*/
static struct qc_stream_buf *qc_stream_buf_ack(struct qc_stream_buf *buf,
struct qc_stream_desc *stream,
uint64_t offset, uint64_t len, int fin)
{
/* This function does not deal with out-of-order ACK. */
BUG_ON(offset > stream->ack_offset);

if (offset + len > stream->ack_offset) {
const uint64_t diff = offset + len - stream->ack_offset;
b_del(&buf->buf, diff);
stream->ack_offset += diff;
}

if (fin) {
/* Mark FIN as acknowledged. */
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
}

if (!b_data(&buf->buf) && eb_is_empty(&buf->acked_frms)) {
qc_stream_buf_free(stream, &buf);
/* Retrieve next buffer instance. */
buf = !eb_is_empty(&stream->buf_tree) ?
eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node) :
NULL;
}

return buf;
}

/* Consume buffered ACK starting at <stream_buf>. If all buffer data is
* removed, <stream_buf> is freed and consume will be conducted for following
* streambufs from <stream> if present.
*/
static void qc_stream_buf_consume(struct qc_stream_buf *stream_buf,
struct qc_stream_desc *stream)
{
struct eb64_node *frm_node;
struct qf_stream *strm_frm;
struct quic_frame *frm;
uint64_t offset, len;
int fin;

frm_node = eb64_first(&stream_buf->acked_frms);
while (frm_node) {
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
frm = container_of(strm_frm, struct quic_frame, stream);

offset = strm_frm->offset.key;
len = strm_frm->len;
fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;

if (offset > stream->ack_offset)
break;

/* Delete frame before acknowledged it. This prevents BUG_ON()
* on non-empty acked_frms tree when stream_buf is empty and removed.
*/
eb64_delete(frm_node);
stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);
qc_release_frm(NULL, frm);

frm_node = stream_buf ? eb64_first(&stream_buf->acked_frms) : NULL;
}
}

/* Acknowledge <frm> STREAM frame whose content is managed by <stream>
* descriptor.
*
Expand All @@ -150,63 +223,42 @@ int qc_stream_desc_ack(struct qc_stream_desc *stream, struct quic_frame *frm)

struct qc_stream_buf *stream_buf = NULL;
struct eb64_node *buf_node;
struct buffer *buf = NULL;
size_t diff;
int ret = 0;

/* Cannot advertise FIN for an inferior data range. */
BUG_ON(fin && offset + len < stream->ack_offset);

if (offset + len < stream->ack_offset) {
return 0;
/* Do nothing for offset + len < stream->ack_offset as data were
* already acknowledged and removed.
*/

if (!len) {
BUG_ON(!fin); /* An empty STREAM frame is only needed for a late FIN reporting. */

/* Empty STREAM frame with FIN can be acknowledged out-of-order. */
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
}
else if (offset > stream->ack_offset) {
buf_node = eb64_lookup_le(&stream->buf_tree, offset);
BUG_ON(!buf_node); /* Cannot acknowledged a STREAM frame for a non existing buffer. */

stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
eb64_insert(&stream_buf->acked_frms, &strm_frm->offset);
return 1;
ret = 1;
}

diff = offset + len - stream->ack_offset;
if (diff) {
else if (offset + len > stream->ack_offset) {
/* Buf list cannot be empty if there is still unacked data. */
BUG_ON(eb_is_empty(&stream->buf_tree));

/* get oldest buffer from buf tree */
stream_buf = eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node);
buf = &stream_buf->buf;
stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);

stream->ack_offset += diff;
b_del(buf, diff);

/* Free oldest buffer if all data acknowledged. */
if (!b_data(buf)) {
/* Remove buffered ACK before deleting buffer instance. */
while (!eb_is_empty(&stream_buf->acked_frms)) {
struct quic_conn *qc = stream->qc;
struct eb64_node *frm_node;
struct qf_stream *strm_frm;
struct quic_frame *frm;

frm_node = eb64_first(&stream_buf->acked_frms);
eb64_delete(frm_node);

strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
frm = container_of(strm_frm, struct quic_frame, stream);
qc_release_frm(qc, frm);
}
qc_stream_buf_free(stream, &stream_buf);
buf = NULL;
}
}

if (fin) {
/* Mark FIN as acknowledged. */
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
/* some data were acknowledged, try to consume buffered ACKs */
if (stream_buf)
qc_stream_buf_consume(stream_buf, stream);
}

return 0;
return ret;
}

/* Free the stream descriptor <stream> content. This function should be used
Expand Down

0 comments on commit 8d68717

Please sign in to comment.