Skip to content

Commit

Permalink
HTTP2: Implement buffering of H2 stream data and trailers. (envoyprox…
Browse files Browse the repository at this point in the history
…y#19447)

* Implement buffering of H2 stream data and trailers.

Signed-off-by: Kevin Baichoo <[email protected]>
  • Loading branch information
KBaichoo authored Mar 7, 2022
1 parent 360f789 commit 1e9eaf3
Show file tree
Hide file tree
Showing 29 changed files with 1,111 additions and 93 deletions.
2 changes: 1 addition & 1 deletion docs/root/configuration/http/http_conn_man/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ On the upstream side all http2 statistics are rooted at *cluster.<name>.http2.*
keepalive_timeout, Counter, Total number of connections closed due to :ref:`keepalive timeout <envoy_v3_api_field_config.core.v3.KeepaliveSettings.timeout>`
streams_active, Gauge, Active streams as observed by the codec
pending_send_bytes, Gauge, Currently buffered body data in bytes waiting to be written when stream/connection window is opened.

deferred_stream_close, Gauge, Number of HTTP/2 streams where the stream has been closed but processing of the stream close has been deferred due to network backup. This is expected to be incremented when a downstream stream is backed up and the corresponding upstream stream has received end stream but we defer processing of the upstream stream close due to downstream backup. This is decremented as we finally delete the stream when either the deferred close stream has its buffered data drained or receives a reset.
.. attention::

The HTTP/2 `streams_active` gauge may be greater than the HTTP connection manager
Expand Down
2 changes: 1 addition & 1 deletion envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ class Stream : public StreamResetHandler {
* configured.
* @return uint32_t the stream's configured buffer limits.
*/
virtual uint32_t bufferLimit() PURE;
virtual uint32_t bufferLimit() const PURE;

/**
* @return string_view optionally return the reason behind codec level errors.
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ void StreamEncoderImpl::readDisable(bool disable) {
connection_.readDisable(disable);
}

uint32_t StreamEncoderImpl::bufferLimit() { return connection_.bufferLimit(); }
uint32_t StreamEncoderImpl::bufferLimit() const { return connection_.bufferLimit(); }

const Network::Address::InstanceConstSharedPtr& StreamEncoderImpl::connectionLocalAddress() {
return connection_.connection().connectionInfoProvider().localAddress();
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class StreamEncoderImpl : public virtual StreamEncoder,
// progress may be made with the codec.
void resetStream(StreamResetReason reason) override;
void readDisable(bool disable) override;
uint32_t bufferLimit() override;
uint32_t bufferLimit() const override;
absl::string_view responseDetails() override { return details_; }
const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override;
void setFlushTimeout(std::chrono::milliseconds) override {
Expand Down
212 changes: 179 additions & 33 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,19 @@ ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_l
local_end_stream_sent_(false), remote_end_stream_(false), data_deferred_(false),
received_noninformational_headers_(false),
pending_receive_buffer_high_watermark_called_(false),
pending_send_buffer_high_watermark_called_(false), reset_due_to_messaging_error_(false) {
pending_send_buffer_high_watermark_called_(false), reset_due_to_messaging_error_(false),
defer_processing_backedup_streams_(
Runtime::runtimeFeatureEnabled(Runtime::defer_processing_backedup_streams)) {
parent_.stats_.streams_active_.inc();
if (buffer_limit > 0) {
setWriteBufferWatermarks(buffer_limit);
}
}

void ConnectionImpl::StreamImpl::destroy() {
// Cancel any pending buffered data callback for the stream.
process_buffered_data_callback_.reset();

MultiplexedStreamImplBase::destroy();
parent_.stats_.streams_active_.dec();
parent_.stats_.pending_send_bytes_.sub(pending_send_data_->length());
Expand Down Expand Up @@ -349,6 +354,44 @@ void ConnectionImpl::StreamImpl::encodeMetadata(const MetadataMapVector& metadat
}
}

void ConnectionImpl::StreamImpl::processBufferedData() {
ENVOY_CONN_LOG(debug, "Stream {} processing buffered data.", parent_.connection_, stream_id_);

if (stream_manager_.body_buffered_ && continueProcessingBufferedData()) {
decodeData();
}

if (stream_manager_.trailers_buffered_ && continueProcessingBufferedData()) {
ASSERT(!stream_manager_.body_buffered_);
decodeTrailers();
ASSERT(!stream_manager_.trailers_buffered_);
}

// Reset cases are handled by resetStream and directly invoke onStreamClose,
// which consumes the buffered_on_stream_close_ so we don't invoke
// onStreamClose twice.
if (stream_manager_.buffered_on_stream_close_ && !stream_manager_.hasBufferedBodyOrTrailers()) {
ASSERT(!reset_reason_.has_value());
ENVOY_CONN_LOG(debug, "invoking onStreamClose for stream: {} via processBufferedData",
parent_.connection_, stream_id_);
// We only buffer the onStreamClose if we had no errors.
parent_.onStreamClose(this, 0);
}
}

void ConnectionImpl::StreamImpl::grantPeerAdditionalStreamWindow() {
if (parent_.use_new_codec_wrapper_) {
parent_.adapter_->MarkDataConsumedForStream(stream_id_, unconsumed_bytes_);
} else {
nghttp2_session_consume(parent_.session_, stream_id_, unconsumed_bytes_);
}
unconsumed_bytes_ = 0;
if (parent_.sendPendingFramesAndHandleError()) {
// Intended to check through coverage that this error case is tested
return;
}
}

void ConnectionImpl::StreamImpl::readDisable(bool disable) {
ENVOY_CONN_LOG(debug, "Stream {} {}, unconsumed_bytes {} read_disable_count {}",
parent_.connection_, stream_id_, (disable ? "disabled" : "enabled"),
Expand All @@ -359,32 +402,75 @@ void ConnectionImpl::StreamImpl::readDisable(bool disable) {
ASSERT(read_disable_count_ > 0);
--read_disable_count_;
if (!buffersOverrun()) {
if (parent_.use_new_codec_wrapper_) {
parent_.adapter_->MarkDataConsumedForStream(stream_id_, unconsumed_bytes_);
} else {
nghttp2_session_consume(parent_.session_, stream_id_, unconsumed_bytes_);
}
unconsumed_bytes_ = 0;
if (parent_.sendPendingFramesAndHandleError()) {
// Intended to check through coverage that this error case is tested
return;
}
scheduleProcessingOfBufferedData();
grantPeerAdditionalStreamWindow();
}
}
}

void ConnectionImpl::StreamImpl::scheduleProcessingOfBufferedData() {
if (defer_processing_backedup_streams_) {
if (!process_buffered_data_callback_) {
process_buffered_data_callback_ = parent_.connection_.dispatcher().createSchedulableCallback(
[this]() { processBufferedData(); });
}

// We schedule processing to occur in another callback to avoid
// reentrant and deep call stacks.
process_buffered_data_callback_->scheduleCallbackCurrentIteration();
}
}

void ConnectionImpl::StreamImpl::pendingRecvBufferHighWatermark() {
ENVOY_CONN_LOG(debug, "recv buffer over limit ", parent_.connection_);
ASSERT(!pending_receive_buffer_high_watermark_called_);
pending_receive_buffer_high_watermark_called_ = true;
readDisable(true);
// If `defer_processing_backedup_streams_`, read disabling here can become
// dangerous as it can prevent us from processing buffered data.
if (!defer_processing_backedup_streams_) {
ENVOY_CONN_LOG(debug, "recv buffer over limit ", parent_.connection_);
ASSERT(!pending_receive_buffer_high_watermark_called_);
pending_receive_buffer_high_watermark_called_ = true;
readDisable(true);
}
}

void ConnectionImpl::StreamImpl::pendingRecvBufferLowWatermark() {
ENVOY_CONN_LOG(debug, "recv buffer under limit ", parent_.connection_);
ASSERT(pending_receive_buffer_high_watermark_called_);
pending_receive_buffer_high_watermark_called_ = false;
readDisable(false);
// If `defer_processing_backedup_streams_`, we don't read disable on
// high watermark, so we shouldn't read disable here.
if (defer_processing_backedup_streams_) {
if (shouldAllowPeerAdditionalStreamWindow()) {
// We should grant additional stream window here, in case the
// `pending_recv_buffer_` was blocking flow control updates
// from going to the peer.
grantPeerAdditionalStreamWindow();
}
} else {
ENVOY_CONN_LOG(debug, "recv buffer under limit ", parent_.connection_);
ASSERT(pending_receive_buffer_high_watermark_called_);
pending_receive_buffer_high_watermark_called_ = false;
readDisable(false);
}
}

void ConnectionImpl::StreamImpl::decodeData() {
if (defer_processing_backedup_streams_ && buffersOverrun()) {
ENVOY_CONN_LOG(trace, "Stream {} buffering decodeData() call.", parent_.connection_,
stream_id_);
stream_manager_.body_buffered_ = true;
return;
}

// Any buffered data will be consumed.
stream_manager_.body_buffered_ = false;

// It's possible that we are waiting to send a deferred reset, so only raise data if local
// is not complete.
if (!deferred_reset_) {
decoder().decodeData(*pending_recv_data_, sendEndStream());
}

// TODO(kbaichoo): If dumping buffered data, we should do so in default read
// size chunks rather than dumping the entire buffer, which can have fairness
// issues.
pending_recv_data_->drain(pending_recv_data_->length());
}

void ConnectionImpl::ClientStreamImpl::decodeHeaders() {
Expand All @@ -404,11 +490,35 @@ void ConnectionImpl::ClientStreamImpl::decodeHeaders() {
ASSERT(!remote_end_stream_);
response_decoder_.decode1xxHeaders(std::move(headers));
} else {
response_decoder_.decodeHeaders(std::move(headers), remote_end_stream_);
response_decoder_.decodeHeaders(std::move(headers), sendEndStream());
}
}

bool ConnectionImpl::StreamImpl::maybeDeferDecodeTrailers() {
ASSERT(!deferred_reset_.has_value());
// Buffer trailers if we're deferring processing and not flushing all data
// through and either
// 1) Buffers are overrun
// 2) There's buffered body which should get processed before these trailers
// to avoid losing data.
if (defer_processing_backedup_streams_ && (buffersOverrun() || stream_manager_.body_buffered_)) {
stream_manager_.trailers_buffered_ = true;
ENVOY_CONN_LOG(trace, "Stream {} buffering decodeTrailers() call.", parent_.connection_,
stream_id_);
return true;
}

return false;
}

void ConnectionImpl::ClientStreamImpl::decodeTrailers() {
if (maybeDeferDecodeTrailers()) {
return;
}

// Consume any buffered trailers.
stream_manager_.trailers_buffered_ = false;

response_decoder_.decodeTrailers(
std::move(absl::get<ResponseTrailerMapPtr>(headers_or_trailers_)));
}
Expand All @@ -418,10 +528,17 @@ void ConnectionImpl::ServerStreamImpl::decodeHeaders() {
if (Http::Utility::isH2UpgradeRequest(*headers)) {
Http::Utility::transformUpgradeRequestFromH2toH1(*headers);
}
request_decoder_->decodeHeaders(std::move(headers), remote_end_stream_);
request_decoder_->decodeHeaders(std::move(headers), sendEndStream());
}

void ConnectionImpl::ServerStreamImpl::decodeTrailers() {
if (maybeDeferDecodeTrailers()) {
return;
}

// Consume any buffered trailers.
stream_manager_.trailers_buffered_ = false;

request_decoder_->decodeTrailers(
std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));
}
Expand Down Expand Up @@ -634,9 +751,23 @@ void ConnectionImpl::ServerStreamImpl::resetStream(StreamResetReason reason) {
}

void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) {
reset_reason_ = reason;

// Higher layers expect calling resetStream() to immediately raise reset callbacks.
runResetCallbacks(reason);

// If we've bufferedOnStreamClose for this stream, we shouldn't propagate this
// reset as nghttp2 will have forgotten about the stream.
if (stream_manager_.buffered_on_stream_close_) {
ENVOY_CONN_LOG(
trace, "Stopped propagating reset to nghttp2 as we've buffered onStreamClose for stream {}",
parent_.connection_, stream_id_);
// The stream didn't originally have an NGHTTP2 error, since we buffered
// its stream close.
parent_.onStreamClose(this, 0);
return;
}

// If we submit a reset, nghttp2 will cancel outbound frames that have not yet been sent.
// We want these frames to go out so we defer the reset until we send all of the frames that
// end the local stream. However, if we're resetting the stream due to
Expand Down Expand Up @@ -890,7 +1021,7 @@ int ConnectionImpl::onData(int32_t stream_id, const uint8_t* data, size_t len) {
stream->pending_recv_data_->add(data, len);
// Update the window to the peer unless some consumer of this stream's data has hit a flow control
// limit and disabled reads on this stream
if (!stream->buffersOverrun()) {
if (stream->shouldAllowPeerAdditionalStreamWindow()) {
if (use_new_codec_wrapper_) {
adapter_->MarkDataConsumedForStream(stream_id, len);
} else {
Expand Down Expand Up @@ -1070,14 +1201,7 @@ Status ConnectionImpl::onFrameReceived(const nghttp2_frame* frame) {
}
case NGHTTP2_DATA: {
stream->remote_end_stream_ = frame->hd.flags & NGHTTP2_FLAG_END_STREAM;

// It's possible that we are waiting to send a deferred reset, so only raise data if local
// is not complete.
if (!stream->deferred_reset_) {
stream->decoder().decodeData(*stream->pending_recv_data_, stream->remote_end_stream_);
}

stream->pending_recv_data_->drain(stream->pending_recv_data_->length());
stream->decodeData();
break;
}
case NGHTTP2_RST_STREAM: {
Expand Down Expand Up @@ -1232,10 +1356,18 @@ ssize_t ConnectionImpl::onSend(const uint8_t* data, size_t length) {
return length;
}

int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) {
StreamImpl* stream = getStream(stream_id);
int ConnectionImpl::onStreamClose(StreamImpl* stream, uint32_t error_code) {
if (stream) {
ENVOY_CONN_LOG(debug, "stream closed: {}", connection_, error_code);
const int32_t stream_id = stream->stream_id_;

// Consume buffered on stream_close.
if (stream->stream_manager_.buffered_on_stream_close_) {
stream->stream_manager_.buffered_on_stream_close_ = false;
stats_.deferred_stream_close_.dec();
}

ENVOY_CONN_LOG(debug, "stream {} closed: {}", connection_, stream_id, error_code);

if (!stream->remote_end_stream_ || !stream->local_end_stream_) {
StreamResetReason reason;
if (stream->reset_due_to_messaging_error_) {
Expand Down Expand Up @@ -1265,6 +1397,16 @@ int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) {
}

stream->runResetCallbacks(reason);

} else if (stream->defer_processing_backedup_streams_ && !stream->reset_reason_.has_value() &&
stream->stream_manager_.hasBufferedBodyOrTrailers()) {
ASSERT(error_code == NGHTTP2_NO_ERROR);
ENVOY_CONN_LOG(debug, "buffered onStreamClose for stream: {}", connection_, stream_id);
// Buffer the call, rely on the stream->process_buffered_data_callback_
// to end up invoking.
stream->stream_manager_.buffered_on_stream_close_ = true;
stats_.deferred_stream_close_.inc();
return 0;
}

stream->destroy();
Expand All @@ -1289,6 +1431,10 @@ int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) {
return 0;
}

int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) {
return onStreamClose(getStream(stream_id), error_code);
}

int ConnectionImpl::onMetadataReceived(int32_t stream_id, const uint8_t* data, size_t len) {
ENVOY_CONN_LOG(trace, "recv {} bytes METADATA", connection_, len);

Expand Down
Loading

0 comments on commit 1e9eaf3

Please sign in to comment.