@@ -94,6 +94,10 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void
94
94
void qc_stream_desc_release (struct qc_stream_desc * stream ,
95
95
uint64_t final_size , void * new_ctx )
96
96
{
97
+ struct eb64_node * frm_node ;
98
+ struct qf_stream * strm_frm ;
99
+ uint64_t diff ;
100
+
97
101
if (!stream )
98
102
return ;
99
103
@@ -116,9 +120,18 @@ void qc_stream_desc_release(struct qc_stream_desc *stream,
116
120
if (final_size < tail_offset )
117
121
b_sub (buf , tail_offset - final_size );
118
122
119
- if (stream -> notify_room && b_room ( buf ) ) {
123
+ if (stream -> notify_room ) {
120
124
/* notify MUX about available buffers. */
121
- stream -> notify_room (stream , b_room (buf ));
125
+ diff = b_room (& stream -> buf -> buf );
126
+ frm_node = eb64_first (& stream -> buf -> acked_frms );
127
+ while (frm_node ) {
128
+ strm_frm = eb64_entry (frm_node , struct qf_stream , offset );
129
+ diff += strm_frm -> len ;
130
+ frm_node = eb64_next (frm_node );
131
+ }
132
+
133
+ if (diff )
134
+ stream -> notify_room (stream , diff );
122
135
}
123
136
124
137
if (!b_data (buf ))
@@ -175,7 +188,27 @@ int qc_stream_desc_ack(struct qc_stream_desc *stream, size_t offset, size_t len,
175
188
}
176
189
else if (stream -> buf != stream_buf ) {
177
190
/* inactive buffer, notify immediately about newly acknowledged data. */
178
- stream -> notify_room (stream , diff );
191
+ struct qf_stream * strm_frm ;
192
+ struct eb64_node * frm_node ;
193
+ uint64_t diff_buffered = 0 ;
194
+
195
+ frm_node = eb64_lookup_le (& stream_buf -> acked_frms , offset );
196
+ if (frm_node ) {
197
+ strm_frm = eb64_entry (frm_node , struct qf_stream , offset );
198
+ if (strm_frm -> offset .key + strm_frm -> len > offset ) {
199
+ if (strm_frm -> offset .key + strm_frm -> len >= offset + len ) {
200
+ diff_buffered = diff ;
201
+ }
202
+ else {
203
+ diff_buffered = strm_frm -> offset .key + strm_frm -> len - offset ;
204
+ }
205
+ }
206
+ }
207
+
208
+ if (diff_buffered < diff ) {
209
+ }
210
+
211
+ stream -> notify_room (stream , diff - diff_buffered );
179
212
}
180
213
}
181
214
@@ -333,17 +366,28 @@ struct buffer *qc_stream_buf_realloc(struct qc_stream_desc *stream)
333
366
*/
334
367
void qc_stream_buf_release (struct qc_stream_desc * stream )
335
368
{
336
- struct buffer * buf ;
369
+ struct qc_stream_buf * stream_buf = stream -> buf ;
370
+ struct eb64_node * frm_node ;
371
+ struct qf_stream * strm_frm ;
372
+ size_t diff ;
337
373
338
374
/* current buffer already released */
339
- BUG_ON (!stream -> buf );
375
+ BUG_ON (!stream_buf );
376
+
377
+ diff = b_room (& stream_buf -> buf );
378
+ frm_node = eb64_first (& stream_buf -> acked_frms );
379
+ while (frm_node ) {
380
+ strm_frm = eb64_entry (frm_node , struct qf_stream , offset );
381
+ BUG_ON (strm_frm -> offset .key < stream -> ack_offset );
382
+ diff += strm_frm -> len ;
383
+ frm_node = eb64_next (frm_node );
384
+ }
340
385
341
- buf = & stream -> buf -> buf ;
342
386
stream -> buf = NULL ;
343
387
stream -> buf_offset = 0 ;
344
388
345
- if (stream -> notify_room && b_room ( buf ) )
346
- stream -> notify_room (stream , b_room ( buf ) );
389
+ if (stream -> notify_room && diff )
390
+ stream -> notify_room (stream , diff );
347
391
}
348
392
349
393
static int create_sbuf_pool (void )
0 commit comments