Skip to content

Commit

Permalink
MINOR: quic: refactor buffered STREAM ACK consuming
Browse files Browse the repository at this point in the history
  • Loading branch information
a-denoyelle committed Oct 1, 2024
1 parent 72a9174 commit 773290c
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 121 deletions.
74 changes: 2 additions & 72 deletions src/quic_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,71 +201,6 @@ static int qc_pkt_decrypt(struct quic_conn *qc, struct quic_enc_level *qel,
return ret;
}

/* Remove from <stream> the acknowledged frames.
*
* Returns 1 if at least one frame was removed else 0.
*/
static int quic_stream_try_to_consume(struct quic_conn *qc,
struct qc_stream_desc *stream)
{
int ret;
struct eb64_node *frm_node;
struct qc_stream_buf *stream_buf;
struct eb64_node *buf_node;

TRACE_ENTER(QUIC_EV_CONN_ACKSTRM, qc);

ret = 0;
buf_node = eb64_first(&stream->buf_tree);
if (buf_node) {
stream_buf = eb64_entry(buf_node, struct qc_stream_buf,
offset_node);

frm_node = eb64_first(&stream_buf->acked_frms);
while (frm_node) {
struct qf_stream *strm_frm;
struct quic_frame *frm;
size_t offset;

strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
frm = container_of(strm_frm, struct quic_frame, stream);
offset = strm_frm->offset.key;

if (offset > stream->ack_offset)
break;

/* First delete frm from tree. This prevents BUG_ON() if
* stream_buf instance is removed via qc_stream_desc_ack().
*/
eb64_delete(frm_node);

if (qc_stream_desc_ack(stream, frm)) {
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
qc, strm_frm, stream);
ret = 1;
}
qc_release_frm(qc, frm);

/* Always retrieve first buffer as the previously used
* instance could have been removed during qc_stream_desc_ack().
*/
buf_node = eb64_first(&stream->buf_tree);
if (buf_node) {
stream_buf = eb64_entry(buf_node, struct qc_stream_buf,
offset_node);
frm_node = eb64_first(&stream_buf->acked_frms);
BUG_ON(!frm_node && !b_data(&stream_buf->buf));
}
else {
frm_node = NULL;
}
}
}

TRACE_LEAVE(QUIC_EV_CONN_ACKSTRM, qc);
return ret;
}

/* Handle <frm> frame whose packet it is attached to has just been acknowledged. The memory allocated
* for this frame will be at least released in every cases.
* Never fail.
Expand All @@ -281,7 +216,6 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f
struct qf_stream *strm_frm = &frm->stream;
struct eb64_node *node = NULL;
struct qc_stream_desc *stream = NULL;
int ack;

/* do not use strm_frm->stream as the qc_stream_desc instance
* might be freed at this stage. Use the id to do a proper
Expand All @@ -299,14 +233,10 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f
}
stream = eb64_entry(node, struct qc_stream_desc, by_id);

TRACE_DEVEL("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
ack = qc_stream_desc_ack(stream, frm);
if (!ack) {
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
if (!qc_stream_desc_ack(stream, frm)) {
TRACE_DEVEL("stream consumed on ACK received", QUIC_EV_CONN_ACKSTRM,
qc, strm_frm, stream);

quic_stream_try_to_consume(qc, stream);

if (qc_stream_desc_done(stream)) {
/* no need to continue if stream freed. */
TRACE_DEVEL("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);
Expand Down
123 changes: 74 additions & 49 deletions src/quic_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,83 +134,108 @@ void qc_stream_desc_release(struct qc_stream_desc *stream,
}
}

/* Acknowledged data for buffer <buf> attached to <stream> instance. The
* acknowledged STREAM starts at <offset> and is of length <len> with <fin>
* sets for the last frame of the stream.
*
* Returns <buf> if there is still data to acknowledged after completing the
* operation. Else, the next buffer instance of stream is returned if it exists
* or NULL in the latter case.
*/
static struct qc_stream_buf *qc_stream_buf_ack(struct qc_stream_buf *buf,
struct qc_stream_desc *stream,
uint64_t offset, uint64_t len, int fin)
{
if (offset + len > stream->ack_offset) {
const uint64_t diff = offset + len - stream->ack_offset;
b_del(&buf->buf, diff);
stream->ack_offset += diff;
}

if (!b_data(&buf->buf) && eb_is_empty(&buf->acked_frms)) {
qc_stream_buf_free(stream, &buf);
buf = NULL;
}

if (fin) {
/* Mark FIN as acknowledged. */
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
}

if (!buf && !eb_is_empty(&stream->buf_tree))
buf = eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node);
return buf;
}

/* Acknowledge <frm> STREAM frame whose content is managed by <stream>
* descriptor.
*
* Returns 0 if the frame has been handled and can be removed.
* Returns a positive value if the frame cannot be acknowledged and has been
* buffered.
* Returns a positive value if acknowledgement is out-of-order and
* corresponding STREAM frame has been buffered.
*/
int qc_stream_desc_ack(struct qc_stream_desc *stream, struct quic_frame *frm)
{
struct qf_stream *strm_frm = &frm->stream;
uint64_t offset = strm_frm->offset.key;
uint64_t len = strm_frm->len;
size_t offset = strm_frm->offset.key;
size_t len = strm_frm->len;
int fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;

struct qc_stream_buf *stream_buf = NULL;
struct eb64_node *buf_node;
struct buffer *buf = NULL;
size_t diff;
struct eb64_node *frm_node;
int ret = 0;

/* Cannot advertise FIN for an inferior data range. */
BUG_ON(fin && offset + len < stream->ack_offset);

if (offset + len < stream->ack_offset) {
return 0;
if (!len) {
BUG_ON(!fin); /* An empty STREAM frame can only be used to advertise FIN */
/* An empty FIN STREAM cannot be inferior to last ack offset. */
BUG_ON(offset < stream->ack_offset);

/* Empty STREAM frame with FIN can be acknowledged immediately. */
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
}
else if (offset > stream->ack_offset) {
buf_node = eb64_lookup_le(&stream->buf_tree, offset);
if (buf_node) {
stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
eb64_insert(&stream_buf->acked_frms, &strm_frm->offset);
return 1;
}
else {
ABORT_NOW();
return 0;
}
BUG_ON(!buf_node); /* Cannot acknowledged a STREAM frame for a non existing buffer. */
stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
eb64_insert(&stream_buf->acked_frms, &strm_frm->offset);
ret = 1;
}

diff = offset + len - stream->ack_offset;
if (diff) {
else if (offset + len > stream->ack_offset) {
/* Buf list cannot be empty if there is still unacked data. */
BUG_ON(eb_is_empty(&stream->buf_tree));

/* get oldest buffer from buf tree */
stream_buf = eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node);
buf = &stream_buf->buf;
stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);

stream->ack_offset += diff;
b_del(buf, diff);

/* Free oldest buffer if all data acknowledged. */
if (!b_data(buf)) {
/* Remove buffered ACK before deleting buffer instance. */
while (!eb_is_empty(&stream_buf->acked_frms)) {
struct quic_conn *qc = stream->qc;
struct eb64_node *frm_node;
struct qf_stream *strm_frm;
struct quic_frame *frm;

frm_node = eb64_first(&stream_buf->acked_frms);
eb64_delete(frm_node);

strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
frm = container_of(strm_frm, struct quic_frame, stream);
qc_release_frm(qc, frm);
}
qc_stream_buf_free(stream, &stream_buf);
buf = NULL;
}
}
/* some data were acknowledged, try to consume any remaining buffered ACK. */
frm_node = stream_buf ? eb64_first(&stream_buf->acked_frms) : NULL;
while (frm_node) {
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
frm = container_of(strm_frm, struct quic_frame, stream);

if (fin) {
/* Mark FIN as acknowledged. */
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
offset = strm_frm->offset.key;
len = strm_frm->len;
fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;

if (offset > stream->ack_offset)
break;

/* Delete frame before acknowledged it. This prevents BUG_ON()
* on non-empty acked_frms tree when stream_buf is empty and removed.
*/
eb64_delete(frm_node);
stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);
qc_release_frm(NULL, frm);

frm_node = stream_buf ? eb64_first(&stream_buf->acked_frms) : NULL;
}
}

return 0;
return ret;
}

/* Free the stream descriptor <stream> content. This function should be used
Expand Down

0 comments on commit 773290c

Please sign in to comment.