diff --git a/docs/root/configuration/http/http_conn_man/stats.rst b/docs/root/configuration/http/http_conn_man/stats.rst index efb77be6f17e..b6730d14ffcb 100644 --- a/docs/root/configuration/http/http_conn_man/stats.rst +++ b/docs/root/configuration/http/http_conn_man/stats.rst @@ -173,7 +173,7 @@ On the upstream side all http2 statistics are rooted at *cluster..http2.* keepalive_timeout, Counter, Total number of connections closed due to :ref:`keepalive timeout ` streams_active, Gauge, Active streams as observed by the codec pending_send_bytes, Gauge, Currently buffered body data in bytes waiting to be written when stream/connection window is opened. - + deferred_stream_close, Gauge, Number of HTTP/2 streams where the stream has been closed but processing of the stream close has been deferred due to network backup. This is expected to be incremented when a downstream stream is backed up and the corresponding upstream stream has received end stream but we defer processing of the upstream stream close due to downstream backup. This is decremented as we finally delete the stream when either the deferred close stream has its buffered data drained or receives a reset. .. attention:: The HTTP/2 `streams_active` gauge may be greater than the HTTP connection manager diff --git a/envoy/http/codec.h b/envoy/http/codec.h index 7445fb20eec5..d32a8690c6c5 100644 --- a/envoy/http/codec.h +++ b/envoy/http/codec.h @@ -329,7 +329,7 @@ class Stream : public StreamResetHandler { * configured. * @return uint32_t the stream's configured buffer limits. */ - virtual uint32_t bufferLimit() PURE; + virtual uint32_t bufferLimit() const PURE; /** * @return string_view optionally return the reason behind codec level errors. diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index 98e290b99259..7cad7ec80a9e 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -369,7 +369,7 @@ void StreamEncoderImpl::readDisable(bool disable) { connection_.readDisable(disable); } -uint32_t StreamEncoderImpl::bufferLimit() { return connection_.bufferLimit(); } +uint32_t StreamEncoderImpl::bufferLimit() const { return connection_.bufferLimit(); } const Network::Address::InstanceConstSharedPtr& StreamEncoderImpl::connectionLocalAddress() { return connection_.connection().connectionInfoProvider().localAddress(); diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index e1e508747fa0..161ef63b1164 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -60,7 +60,7 @@ class StreamEncoderImpl : public virtual StreamEncoder, // progress may be made with the codec. void resetStream(StreamResetReason reason) override; void readDisable(bool disable) override; - uint32_t bufferLimit() override; + uint32_t bufferLimit() const override; absl::string_view responseDetails() override { return details_; } const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override; void setFlushTimeout(std::chrono::milliseconds) override { diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 3fa1c835d56e..c2ffa00fb731 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -158,7 +158,9 @@ ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_l local_end_stream_sent_(false), remote_end_stream_(false), data_deferred_(false), received_noninformational_headers_(false), pending_receive_buffer_high_watermark_called_(false), - pending_send_buffer_high_watermark_called_(false), reset_due_to_messaging_error_(false) { + pending_send_buffer_high_watermark_called_(false), reset_due_to_messaging_error_(false), + defer_processing_backedup_streams_( + Runtime::runtimeFeatureEnabled(Runtime::defer_processing_backedup_streams)) { parent_.stats_.streams_active_.inc(); if (buffer_limit > 0) { setWriteBufferWatermarks(buffer_limit); @@ -166,6 +168,9 @@ ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_l } void ConnectionImpl::StreamImpl::destroy() { + // Cancel any pending buffered data callback for the stream. + process_buffered_data_callback_.reset(); + MultiplexedStreamImplBase::destroy(); parent_.stats_.streams_active_.dec(); parent_.stats_.pending_send_bytes_.sub(pending_send_data_->length()); @@ -349,6 +354,44 @@ void ConnectionImpl::StreamImpl::encodeMetadata(const MetadataMapVector& metadat } } +void ConnectionImpl::StreamImpl::processBufferedData() { + ENVOY_CONN_LOG(debug, "Stream {} processing buffered data.", parent_.connection_, stream_id_); + + if (stream_manager_.body_buffered_ && continueProcessingBufferedData()) { + decodeData(); + } + + if (stream_manager_.trailers_buffered_ && continueProcessingBufferedData()) { + ASSERT(!stream_manager_.body_buffered_); + decodeTrailers(); + ASSERT(!stream_manager_.trailers_buffered_); + } + + // Reset cases are handled by resetStream and directly invoke onStreamClose, + // which consumes the buffered_on_stream_close_ so we don't invoke + // onStreamClose twice. + if (stream_manager_.buffered_on_stream_close_ && !stream_manager_.hasBufferedBodyOrTrailers()) { + ASSERT(!reset_reason_.has_value()); + ENVOY_CONN_LOG(debug, "invoking onStreamClose for stream: {} via processBufferedData", + parent_.connection_, stream_id_); + // We only buffer the onStreamClose if we had no errors. + parent_.onStreamClose(this, 0); + } +} + +void ConnectionImpl::StreamImpl::grantPeerAdditionalStreamWindow() { + if (parent_.use_new_codec_wrapper_) { + parent_.adapter_->MarkDataConsumedForStream(stream_id_, unconsumed_bytes_); + } else { + nghttp2_session_consume(parent_.session_, stream_id_, unconsumed_bytes_); + } + unconsumed_bytes_ = 0; + if (parent_.sendPendingFramesAndHandleError()) { + // Intended to check through coverage that this error case is tested + return; + } +} + void ConnectionImpl::StreamImpl::readDisable(bool disable) { ENVOY_CONN_LOG(debug, "Stream {} {}, unconsumed_bytes {} read_disable_count {}", parent_.connection_, stream_id_, (disable ? "disabled" : "enabled"), @@ -359,32 +402,75 @@ void ConnectionImpl::StreamImpl::readDisable(bool disable) { ASSERT(read_disable_count_ > 0); --read_disable_count_; if (!buffersOverrun()) { - if (parent_.use_new_codec_wrapper_) { - parent_.adapter_->MarkDataConsumedForStream(stream_id_, unconsumed_bytes_); - } else { - nghttp2_session_consume(parent_.session_, stream_id_, unconsumed_bytes_); - } - unconsumed_bytes_ = 0; - if (parent_.sendPendingFramesAndHandleError()) { - // Intended to check through coverage that this error case is tested - return; - } + scheduleProcessingOfBufferedData(); + grantPeerAdditionalStreamWindow(); } } } +void ConnectionImpl::StreamImpl::scheduleProcessingOfBufferedData() { + if (defer_processing_backedup_streams_) { + if (!process_buffered_data_callback_) { + process_buffered_data_callback_ = parent_.connection_.dispatcher().createSchedulableCallback( + [this]() { processBufferedData(); }); + } + + // We schedule processing to occur in another callback to avoid + // reentrant and deep call stacks. + process_buffered_data_callback_->scheduleCallbackCurrentIteration(); + } +} + void ConnectionImpl::StreamImpl::pendingRecvBufferHighWatermark() { - ENVOY_CONN_LOG(debug, "recv buffer over limit ", parent_.connection_); - ASSERT(!pending_receive_buffer_high_watermark_called_); - pending_receive_buffer_high_watermark_called_ = true; - readDisable(true); + // If `defer_processing_backedup_streams_`, read disabling here can become + // dangerous as it can prevent us from processing buffered data. + if (!defer_processing_backedup_streams_) { + ENVOY_CONN_LOG(debug, "recv buffer over limit ", parent_.connection_); + ASSERT(!pending_receive_buffer_high_watermark_called_); + pending_receive_buffer_high_watermark_called_ = true; + readDisable(true); + } } void ConnectionImpl::StreamImpl::pendingRecvBufferLowWatermark() { - ENVOY_CONN_LOG(debug, "recv buffer under limit ", parent_.connection_); - ASSERT(pending_receive_buffer_high_watermark_called_); - pending_receive_buffer_high_watermark_called_ = false; - readDisable(false); + // If `defer_processing_backedup_streams_`, we don't read disable on + // high watermark, so we shouldn't read disable here. + if (defer_processing_backedup_streams_) { + if (shouldAllowPeerAdditionalStreamWindow()) { + // We should grant additional stream window here, in case the + // `pending_recv_buffer_` was blocking flow control updates + // from going to the peer. + grantPeerAdditionalStreamWindow(); + } + } else { + ENVOY_CONN_LOG(debug, "recv buffer under limit ", parent_.connection_); + ASSERT(pending_receive_buffer_high_watermark_called_); + pending_receive_buffer_high_watermark_called_ = false; + readDisable(false); + } +} + +void ConnectionImpl::StreamImpl::decodeData() { + if (defer_processing_backedup_streams_ && buffersOverrun()) { + ENVOY_CONN_LOG(trace, "Stream {} buffering decodeData() call.", parent_.connection_, + stream_id_); + stream_manager_.body_buffered_ = true; + return; + } + + // Any buffered data will be consumed. + stream_manager_.body_buffered_ = false; + + // It's possible that we are waiting to send a deferred reset, so only raise data if local + // is not complete. + if (!deferred_reset_) { + decoder().decodeData(*pending_recv_data_, sendEndStream()); + } + + // TODO(kbaichoo): If dumping buffered data, we should do so in default read + // size chunks rather than dumping the entire buffer, which can have fairness + // issues. + pending_recv_data_->drain(pending_recv_data_->length()); } void ConnectionImpl::ClientStreamImpl::decodeHeaders() { @@ -404,11 +490,35 @@ void ConnectionImpl::ClientStreamImpl::decodeHeaders() { ASSERT(!remote_end_stream_); response_decoder_.decode1xxHeaders(std::move(headers)); } else { - response_decoder_.decodeHeaders(std::move(headers), remote_end_stream_); + response_decoder_.decodeHeaders(std::move(headers), sendEndStream()); } } +bool ConnectionImpl::StreamImpl::maybeDeferDecodeTrailers() { + ASSERT(!deferred_reset_.has_value()); + // Buffer trailers if we're deferring processing and not flushing all data + // through and either + // 1) Buffers are overrun + // 2) There's buffered body which should get processed before these trailers + // to avoid losing data. + if (defer_processing_backedup_streams_ && (buffersOverrun() || stream_manager_.body_buffered_)) { + stream_manager_.trailers_buffered_ = true; + ENVOY_CONN_LOG(trace, "Stream {} buffering decodeTrailers() call.", parent_.connection_, + stream_id_); + return true; + } + + return false; +} + void ConnectionImpl::ClientStreamImpl::decodeTrailers() { + if (maybeDeferDecodeTrailers()) { + return; + } + + // Consume any buffered trailers. + stream_manager_.trailers_buffered_ = false; + response_decoder_.decodeTrailers( std::move(absl::get(headers_or_trailers_))); } @@ -418,10 +528,17 @@ void ConnectionImpl::ServerStreamImpl::decodeHeaders() { if (Http::Utility::isH2UpgradeRequest(*headers)) { Http::Utility::transformUpgradeRequestFromH2toH1(*headers); } - request_decoder_->decodeHeaders(std::move(headers), remote_end_stream_); + request_decoder_->decodeHeaders(std::move(headers), sendEndStream()); } void ConnectionImpl::ServerStreamImpl::decodeTrailers() { + if (maybeDeferDecodeTrailers()) { + return; + } + + // Consume any buffered trailers. + stream_manager_.trailers_buffered_ = false; + request_decoder_->decodeTrailers( std::move(absl::get(headers_or_trailers_))); } @@ -634,9 +751,23 @@ void ConnectionImpl::ServerStreamImpl::resetStream(StreamResetReason reason) { } void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) { + reset_reason_ = reason; + // Higher layers expect calling resetStream() to immediately raise reset callbacks. runResetCallbacks(reason); + // If we've bufferedOnStreamClose for this stream, we shouldn't propagate this + // reset as nghttp2 will have forgotten about the stream. + if (stream_manager_.buffered_on_stream_close_) { + ENVOY_CONN_LOG( + trace, "Stopped propagating reset to nghttp2 as we've buffered onStreamClose for stream {}", + parent_.connection_, stream_id_); + // The stream didn't originally have an NGHTTP2 error, since we buffered + // its stream close. + parent_.onStreamClose(this, 0); + return; + } + // If we submit a reset, nghttp2 will cancel outbound frames that have not yet been sent. // We want these frames to go out so we defer the reset until we send all of the frames that // end the local stream. However, if we're resetting the stream due to @@ -890,7 +1021,7 @@ int ConnectionImpl::onData(int32_t stream_id, const uint8_t* data, size_t len) { stream->pending_recv_data_->add(data, len); // Update the window to the peer unless some consumer of this stream's data has hit a flow control // limit and disabled reads on this stream - if (!stream->buffersOverrun()) { + if (stream->shouldAllowPeerAdditionalStreamWindow()) { if (use_new_codec_wrapper_) { adapter_->MarkDataConsumedForStream(stream_id, len); } else { @@ -1070,14 +1201,7 @@ Status ConnectionImpl::onFrameReceived(const nghttp2_frame* frame) { } case NGHTTP2_DATA: { stream->remote_end_stream_ = frame->hd.flags & NGHTTP2_FLAG_END_STREAM; - - // It's possible that we are waiting to send a deferred reset, so only raise data if local - // is not complete. - if (!stream->deferred_reset_) { - stream->decoder().decodeData(*stream->pending_recv_data_, stream->remote_end_stream_); - } - - stream->pending_recv_data_->drain(stream->pending_recv_data_->length()); + stream->decodeData(); break; } case NGHTTP2_RST_STREAM: { @@ -1232,10 +1356,18 @@ ssize_t ConnectionImpl::onSend(const uint8_t* data, size_t length) { return length; } -int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) { - StreamImpl* stream = getStream(stream_id); +int ConnectionImpl::onStreamClose(StreamImpl* stream, uint32_t error_code) { if (stream) { - ENVOY_CONN_LOG(debug, "stream closed: {}", connection_, error_code); + const int32_t stream_id = stream->stream_id_; + + // Consume buffered on stream_close. + if (stream->stream_manager_.buffered_on_stream_close_) { + stream->stream_manager_.buffered_on_stream_close_ = false; + stats_.deferred_stream_close_.dec(); + } + + ENVOY_CONN_LOG(debug, "stream {} closed: {}", connection_, stream_id, error_code); + if (!stream->remote_end_stream_ || !stream->local_end_stream_) { StreamResetReason reason; if (stream->reset_due_to_messaging_error_) { @@ -1265,6 +1397,16 @@ int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) { } stream->runResetCallbacks(reason); + + } else if (stream->defer_processing_backedup_streams_ && !stream->reset_reason_.has_value() && + stream->stream_manager_.hasBufferedBodyOrTrailers()) { + ASSERT(error_code == NGHTTP2_NO_ERROR); + ENVOY_CONN_LOG(debug, "buffered onStreamClose for stream: {}", connection_, stream_id); + // Buffer the call, rely on the stream->process_buffered_data_callback_ + // to end up invoking. + stream->stream_manager_.buffered_on_stream_close_ = true; + stats_.deferred_stream_close_.inc(); + return 0; } stream->destroy(); @@ -1289,6 +1431,10 @@ int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) { return 0; } +int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) { + return onStreamClose(getStream(stream_id), error_code); +} + int ConnectionImpl::onMetadataReceived(int32_t stream_id, const uint8_t* data, size_t len) { ENVOY_CONN_LOG(trace, "recv {} bytes METADATA", connection_, len); diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index 4f775f2cb7b4..9d5fbda1a3bf 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -255,7 +255,7 @@ class ConnectionImpl : public virtual Connection, void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacksHelper(callbacks); } void resetStream(StreamResetReason reason) override; void readDisable(bool disable) override; - uint32_t bufferLimit() override { return pending_recv_data_->highWatermark(); } + uint32_t bufferLimit() const override { return pending_recv_data_->highWatermark(); } const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override { return parent_.connection_.connectionInfoProvider().localAddress(); } @@ -299,6 +299,9 @@ class ConnectionImpl : public virtual Connection, // to the decoder_. virtual void decodeHeaders() PURE; virtual void decodeTrailers() PURE; + bool maybeDeferDecodeTrailers(); + // Consumes any decoded data, buffering if backed up. + void decodeData(); // Get MetadataEncoder for this stream. MetadataEncoder& getMetadataEncoderOld(); @@ -309,9 +312,14 @@ class ConnectionImpl : public virtual Connection, void onMetadataDecoded(MetadataMapPtr&& metadata_map_ptr); bool buffersOverrun() const { return read_disable_count_ > 0; } + bool shouldAllowPeerAdditionalStreamWindow() const { + return !buffersOverrun() && !pending_recv_data_->highWatermarkTriggered(); + } void encodeDataHelper(Buffer::Instance& data, bool end_stream, bool skip_encoding_empty_trailers); + // Called from either process_buffered_data_callback_. + void processBufferedData(); const StreamInfo::BytesMeterSharedPtr& bytesMeter() override { return bytes_meter_; } ConnectionImpl& parent_; @@ -324,8 +332,11 @@ class ConnectionImpl : public virtual Connection, // Note that in current implementation the watermark callbacks of the pending_recv_data_ are // never called. The watermark value is set to the size of the stream window. As a result this // watermark can never overflow because the peer can never send more bytes than the stream - // window without triggering protocol error and this buffer is drained after each DATA frame was - // dispatched through the filter chain. See source/docs/flow_control.md for more information. + // window without triggering protocol error. This buffer is drained after each DATA frame was + // dispatched through the filter chain unless + // envoy.reloadable_features.defer_processing_backedup_streams is enabled, + // in which case this buffer may accumulate data. + // See source/docs/flow_control.md for more information. Buffer::InstancePtr pending_recv_data_; Buffer::InstancePtr pending_send_data_; HeaderMapPtr pending_trailers_to_encode_; @@ -333,6 +344,9 @@ class ConnectionImpl : public virtual Connection, std::unique_ptr metadata_encoder_; std::unique_ptr metadata_encoder_old_; absl::optional deferred_reset_; + // Holds the reset reason for this stream. Useful if we have buffered data + // to determine whether we should continue processing that data. + absl::optional reset_reason_; HeaderString cookies_; bool local_end_stream_sent_ : 1; bool remote_end_stream_ : 1; @@ -341,13 +355,54 @@ class ConnectionImpl : public virtual Connection, bool pending_receive_buffer_high_watermark_called_ : 1; bool pending_send_buffer_high_watermark_called_ : 1; bool reset_due_to_messaging_error_ : 1; + bool defer_processing_backedup_streams_ : 1; absl::string_view details_; + /** + * Tracks buffering that may occur for a stream if it is backed up. + */ + struct BufferedStreamManager { + bool body_buffered_{false}; + bool trailers_buffered_{false}; + + // We received a call to onStreamClose for the stream, but deferred it + // as the stream had pending data to process and the stream was not reset. + bool buffered_on_stream_close_{false}; + + bool hasBufferedBodyOrTrailers() const { return body_buffered_ || trailers_buffered_; } + }; + + BufferedStreamManager stream_manager_; + Event::SchedulableCallbackPtr process_buffered_data_callback_; + protected: // Http::MultiplexedStreamImplBase bool hasPendingData() override { return pending_send_data_->length() > 0 || pending_trailers_to_encode_ != nullptr; } + bool continueProcessingBufferedData() const { + // We should stop processing buffered data if either + // 1) Buffers become overrun + // 2) The stream ends up getting reset + // Both of these can end up changing as a result of processing buffered data. + return !buffersOverrun() && !reset_reason_.has_value(); + } + + // Avoid inversion in the case where we saw trailers, acquiring the + // remote_end_stream_ being set to true, but the trailers ended up being + // buffered. + // All buffered body must be consumed before we send end stream. + bool sendEndStream() const { + return remote_end_stream_ && !stream_manager_.trailers_buffered_ && + !stream_manager_.body_buffered_; + } + + // Schedules a callback to process buffered data. + void scheduleProcessingOfBufferedData(); + + // Marks data consumed by the stream, granting the peer additional stream + // window. + void grantPeerAdditionalStreamWindow(); }; using StreamImplPtr = std::unique_ptr; @@ -593,7 +648,11 @@ class ConnectionImpl : public virtual Connection, int onError(absl::string_view error); virtual int onHeader(const nghttp2_frame* frame, HeaderString&& name, HeaderString&& value) PURE; int onInvalidFrame(int32_t stream_id, int error_code); + // Pass through invoking with the actual stream. int onStreamClose(int32_t stream_id, uint32_t error_code); + // Should be invoked directly in buffered onStreamClose scenarios + // where nghttp2 might have already forgotten about the stream. + int onStreamClose(StreamImpl* stream, uint32_t error_code); int onMetadataReceived(int32_t stream_id, const uint8_t* data, size_t len); int onMetadataFrameComplete(int32_t stream_id, bool end_metadata); // Called iff use_new_codec_wrapper_ is false. diff --git a/source/common/http/http2/codec_stats.h b/source/common/http/http2/codec_stats.h index 3bfcad7cc0b4..27babcb480dc 100644 --- a/source/common/http/http2/codec_stats.h +++ b/source/common/http/http2/codec_stats.h @@ -31,7 +31,8 @@ namespace Http2 { COUNTER(tx_reset) \ COUNTER(keepalive_timeout) \ GAUGE(streams_active, Accumulate) \ - GAUGE(pending_send_bytes, Accumulate) + GAUGE(pending_send_bytes, Accumulate) \ + GAUGE(deferred_stream_close, Accumulate) /** * Wrapper struct for the HTTP/2 codec stats. @see stats_macros.h diff --git a/source/common/quic/envoy_quic_stream.h b/source/common/quic/envoy_quic_stream.h index b1e8687b523b..95a6a905ad5f 100644 --- a/source/common/quic/envoy_quic_stream.h +++ b/source/common/quic/envoy_quic_stream.h @@ -77,7 +77,7 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder, void removeCallbacks(Http::StreamCallbacks& callbacks) override { removeCallbacksHelper(callbacks); } - uint32_t bufferLimit() override { return send_buffer_simulation_.highWatermark(); } + uint32_t bufferLimit() const override { return send_buffer_simulation_.highWatermark(); } const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override { return connection()->connectionInfoProvider().localAddress(); } diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 6ffe3e92d88d..221d9f840eff 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -67,6 +67,9 @@ FALSE_RUNTIME_GUARD(envoy_reloadable_features_allow_multiple_dns_addresses); FALSE_RUNTIME_GUARD(envoy_reloadable_features_unified_mux); // TODO(alyssar) flip false once issue complete. FALSE_RUNTIME_GUARD(envoy_restart_features_no_runtime_singleton); +// TODO(kbaichoo): Make this enabled by default when fairness and chunking +// are implemented, and we've had more cpu time. +FALSE_RUNTIME_GUARD(envoy_reloadable_features_defer_processing_backedup_streams); // Block of non-boolean flags. These are deprecated. Do not add more. ABSL_FLAG(uint64_t, envoy_headermap_lazy_map_min_size, 3, ""); // NOLINT @@ -140,6 +143,7 @@ constexpr absl::Flag* runtime_features[] = { &FLAGS_envoy_reloadable_features_conn_pool_new_stream_with_early_data_and_http3, &FLAGS_envoy_reloadable_features_correct_scheme_and_xfp, &FLAGS_envoy_reloadable_features_correctly_validate_alpn, + &FLAGS_envoy_reloadable_features_defer_processing_backedup_streams, &FLAGS_envoy_reloadable_features_deprecate_global_ints, &FLAGS_envoy_reloadable_features_disable_tls_inspector_injection, &FLAGS_envoy_reloadable_features_do_not_await_headers_on_upstream_timeout_to_emit_stats, diff --git a/source/common/runtime/runtime_features.h b/source/common/runtime/runtime_features.h index 983c58d295a7..6c0d1f6cbe42 100644 --- a/source/common/runtime/runtime_features.h +++ b/source/common/runtime/runtime_features.h @@ -21,6 +21,8 @@ void maybeSetRuntimeGuard(absl::string_view name, bool value); void maybeSetDeprecatedInts(absl::string_view name, uint32_t value); constexpr absl::string_view conn_pool_new_stream_with_early_data_and_http3 = "envoy.reloadable_features.conn_pool_new_stream_with_early_data_and_http3"; +constexpr absl::string_view defer_processing_backedup_streams = + "envoy.reloadable_features.defer_processing_backedup_streams"; class RuntimeFeatures { public: diff --git a/source/docs/flow_control.md b/source/docs/flow_control.md index e32bc4b2e850..e21b05c8ebba 100644 --- a/source/docs/flow_control.md +++ b/source/docs/flow_control.md @@ -101,6 +101,32 @@ upstream connections, the `readDisable(true)` calls are unwound in ClientConnectionImpl::onMessageComplete() to make sure that as connections are returned to the connection pool they are ready to read. +#### HTTP2 defer processing backed up streams + +This section will be further integrated with the rest of the document when it is +enabled by default (currently off by default). At a high level this change does +the following: + +* If a HTTP/2 stream is read disabled anywhere we will begin buffering body and + trailers in the receiving side of the codec to minimize work done on the + stream when it's read disabled. There will be a callback scheduled to drain + these when the stream is read enabled, it can also be drained if the stream is + read enabled, and the stream is invoked with additional data. +* The codec's recieve buffer high watermark is still used in consideration for + granting peer stream additional window (preserving existing protocol flow + control). The codec's recieve buffer high watermark was rarely used prior as we'd + eagerly dispatch data through the filter chain. An exceptions where it could be + triggered is if a filter in the filter chain either injects data triggering watermark. + The codec's recieve buffer no longer read disables the stream, as we could be + read disabled elsewhere, buffer data in codec's recieve buffer hitting high + watermark and never switch back to being read enabled. +* One side effect of deferring stream processing is the need to defer processing + stream close. See ``deferred_stream_close`` in + :ref:`config_http_conn_man_stats_per_codec` for additional details. + +As of now we push all buffered data through the other end, but will implement +chunking and fairness to avoid a stream starving the others. + ## HTTP/2 codec recv buffer Given the HTTP/2 `Envoy::Http::Http2::ConnectionImpl::StreamImpl::pending_recv_data_` is processed immediately diff --git a/test/common/http/codec_impl_fuzz_test.cc b/test/common/http/codec_impl_fuzz_test.cc index a9ad2219ff59..5083d0f43ff7 100644 --- a/test/common/http/codec_impl_fuzz_test.cc +++ b/test/common/http/codec_impl_fuzz_test.cc @@ -119,6 +119,7 @@ class HttpStream : public LinkedObject { bool local_closed_{false}; bool remote_closed_{false}; uint32_t read_disable_count_{}; + bool created_schedulable_callback_{false}; bool isLocalOpen() const { return !local_closed_; } @@ -144,11 +145,26 @@ class HttpStream : public LinkedObject { } request_, response_; + // Encapsulates configuration, connections information used in the HttpStream. + struct ConnectionContext { + MockConnectionManagerConfig* conn_manager_config_; + NiceMock& server_connection_; + NiceMock& client_connection_; + + ConnectionContext(MockConnectionManagerConfig* conn_manager_config, + NiceMock& server_connection, + NiceMock& client_connection) + : conn_manager_config_(conn_manager_config), server_connection_(server_connection), + client_connection_(client_connection) {} + }; + HttpStream(ClientConnection& client, const TestRequestHeaderMapImpl& request_headers, bool end_stream, StreamResetCallbackFn stream_reset_callback, - MockConnectionManagerConfig* config) - : stream_reset_callback_(stream_reset_callback), conn_manager_config_(config) { + ConnectionContext& context) + : http_protocol_(client.protocol()), stream_reset_callback_(stream_reset_callback), + context_(context) { request_.request_encoder_ = &client.newStream(response_.response_decoder_); + ON_CALL(request_.stream_callbacks_, onResetStream(_, _)) .WillByDefault(InvokeWithoutArgs([this] { ENVOY_LOG_MISC(trace, "reset request for stream index {}", stream_index_); @@ -225,8 +241,8 @@ class HttpStream : public LinkedObject { auto headers = fromSanitizedHeaders(directional_action.headers()); ConnectionManagerUtility::mutateResponseHeaders(headers, &request_.request_headers_, - *conn_manager_config_, /*via=*/"", - stream_info_, /*node_id=*/""); + *context_.conn_manager_config_, + /*via=*/"", stream_info_, /*node_id=*/""); if (headers.Status() == nullptr) { headers.setReferenceKey(Headers::get().Status, "200"); } @@ -324,12 +340,35 @@ class HttpStream : public LinkedObject { } else { --state.read_disable_count_; } + StreamEncoder* encoder; + Event::MockDispatcher* dispatcher{nullptr}; + if (response) { encoder = state.response_encoder_; + dispatcher = &context_.server_connection_.dispatcher_; } else { encoder = state.request_encoder_; + dispatcher = &context_.client_connection_.dispatcher_; } + + // With this feature enabled for http2 we end up creating a schedulable + // callback the first time we re-enable reading as it's used to process + // the backed up data. + if (Runtime::runtimeFeatureEnabled(Runtime::defer_processing_backedup_streams)) { + const bool expecting_schedulable_callback_creation = + http_protocol_ == Protocol::Http2 && state.read_disable_count_ == 0 && !disable && + !state.created_schedulable_callback_; + + if (expecting_schedulable_callback_creation) { + ASSERT(dispatcher != nullptr); + state.created_schedulable_callback_ = true; + // The unique pointer of this object will be returned in createSchedulableCallback_ of + // dispatcher, so there is no risk of object leak. + new Event::MockSchedulableCallback(dispatcher); + } + } + encoder->getStream().readDisable(disable); } break; @@ -397,9 +436,10 @@ class HttpStream : public LinkedObject { response_.stream_state_ != StreamState::Closed; } + Protocol http_protocol_; int32_t stream_index_{-1}; StreamResetCallbackFn stream_reset_callback_; - MockConnectionManagerConfig* conn_manager_config_; + ConnectionContext context_; testing::NiceMock stream_info_; }; @@ -495,6 +535,9 @@ void codecFuzz(const test::common::http::CodecImplFuzzTestCase& input, HttpVersi const envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction headers_with_underscores_action = envoy::config::core::v3::HttpProtocolOptions::ALLOW; + HttpStream::ConnectionContext connection_context(&conn_manager_config, server_connection, + client_connection); + Http1::CodecStats::AtomicPtr http1_stats; Http2::CodecStats::AtomicPtr http2_stats; ClientConnectionPtr client; @@ -616,7 +659,7 @@ void codecFuzz(const test::common::http::CodecImplFuzzTestCase& input, HttpVersi should_close_connection = true; } }, - &conn_manager_config); + connection_context); LinkedList::moveIntoListBack(std::move(stream), pending_streams); break; } diff --git a/test/common/http/http2/codec_impl_test.cc b/test/common/http/http2/codec_impl_test.cc index 6f3bc0fe3181..f64c0b4fffc4 100644 --- a/test/common/http/http2/codec_impl_test.cc +++ b/test/common/http/http2/codec_impl_test.cc @@ -49,7 +49,7 @@ namespace Http { namespace Http2 { using Http2SettingsTuple = ::testing::tuple; -using Http2SettingsTestParam = ::testing::tuple; +using Http2SettingsTestParam = ::testing::tuple; namespace CommonUtility = ::Envoy::Http2::Utility; class Http2CodecImplTestFixture { @@ -101,9 +101,10 @@ class Http2CodecImplTestFixture { Http2CodecImplTestFixture() = default; Http2CodecImplTestFixture(Http2SettingsTuple client_settings, Http2SettingsTuple server_settings, - bool enable_new_codec_wrapper) + bool enable_new_codec_wrapper, bool defer_processing_backedup_streams) : client_settings_(client_settings), server_settings_(server_settings), - enable_new_codec_wrapper_(enable_new_codec_wrapper) { + enable_new_codec_wrapper_(enable_new_codec_wrapper), + defer_processing_backedup_streams_(defer_processing_backedup_streams) { // Make sure we explicitly test for stream flush timer creation. EXPECT_CALL(client_connection_.dispatcher_, createTimer_(_)).Times(0); EXPECT_CALL(server_connection_.dispatcher_, createTimer_(_)).Times(0); @@ -138,6 +139,9 @@ class Http2CodecImplTestFixture { Runtime::LoaderSingleton::getExisting()->mergeValues( {{"envoy.reloadable_features.http2_new_codec_wrapper", enable_new_codec_wrapper_ ? "true" : "false"}}); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{std::string(Runtime::defer_processing_backedup_streams), + defer_processing_backedup_streams_ ? "true" : "false"}}); http2OptionsFromTuple(client_http2_options_, client_settings_); http2OptionsFromTuple(server_http2_options_, server_settings_); client_ = std::make_unique( @@ -321,6 +325,7 @@ class Http2CodecImplTestFixture { absl::optional client_settings_; absl::optional server_settings_; bool enable_new_codec_wrapper_ = false; + bool defer_processing_backedup_streams_ = false; bool allow_metadata_ = false; bool stream_error_on_invalid_http_messaging_ = false; Stats::TestUtil::TestStore client_stats_store_; @@ -366,7 +371,7 @@ class Http2CodecImplTest : public ::testing::TestWithParam(GetParam()), ::testing::get<1>(GetParam()), - ::testing::get<2>(GetParam())) {} + ::testing::get<2>(GetParam()), ::testing::get<3>(GetParam())) {} protected: void priorityFlood() { @@ -1581,13 +1586,31 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { // updates to be sent, and the client to flush all queued data. // For bonus corner case coverage, remove callback2 in the middle of runLowWatermarkCallbacks() // and ensure it is not called. + NiceMock* process_buffered_data_callback{nullptr}; + if (defer_processing_backedup_streams_) { + process_buffered_data_callback = + new NiceMock(&server_connection_.dispatcher_); + } + EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()).WillOnce(Invoke([&]() -> void { request_encoder_->getStream().removeCallbacks(callbacks2); })); EXPECT_CALL(callbacks2, onBelowWriteBufferLowWatermark()).Times(0); EXPECT_CALL(callbacks3, onBelowWriteBufferLowWatermark()); + + if (defer_processing_backedup_streams_) { + // NOLINTNEXTLINE(clang-analyzer-core.NullDereference) + EXPECT_FALSE(process_buffered_data_callback->enabled_); + } + server_->getStream(1)->readDisable(false); driveToCompletion(); + + if (defer_processing_backedup_streams_) { + EXPECT_TRUE(process_buffered_data_callback->enabled_); + process_buffered_data_callback->invokeCallback(); + } + EXPECT_EQ(0, client_->getStream(1)->pending_send_data_->length()); EXPECT_EQ(0, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); // The extra 1 byte sent won't trigger another window update, so the final window should be the @@ -1879,9 +1902,6 @@ TEST_P(Http2CodecImplFlowControlTest, WindowUpdateOnReadResumingFlood) { EXPECT_EQ(initial_stream_window, server_->getStream(1)->bufferLimit()); EXPECT_EQ(initial_stream_window, client_->getStream(1)->bufferLimit()); - auto* violation_callback = - new NiceMock(&server_connection_.dispatcher_); - // One large write gets broken into smaller frames. EXPECT_CALL(request_decoder_, decodeData(_, false)).Times(AnyNumber()); Buffer::OwnedImpl long_data(std::string(initial_stream_window / 2, 'a')); @@ -1901,14 +1921,28 @@ TEST_P(Http2CodecImplFlowControlTest, WindowUpdateOnReadResumingFlood) { } driveToCompletion(); + auto* violation_callback = + new NiceMock(&server_connection_.dispatcher_); + EXPECT_FALSE(violation_callback->enabled_); + NiceMock* process_buffered_data_callback{nullptr}; + if (defer_processing_backedup_streams_) { + process_buffered_data_callback = + new NiceMock(&server_connection_.dispatcher_); + + EXPECT_FALSE(process_buffered_data_callback->enabled_); + } + // Now unblock the server's stream. This will cause the bytes to be consumed, 2 flow control // updates to be sent, and overflow outbound frame queue. server_->getStream(1)->readDisable(false); driveToCompletion(); EXPECT_TRUE(violation_callback->enabled_); + if (defer_processing_backedup_streams_) { + EXPECT_TRUE(process_buffered_data_callback->enabled_); + } EXPECT_CALL(server_connection_, close(Envoy::Network::ConnectionCloseType::NoFlush)); violation_callback->invokeCallback(); @@ -2091,12 +2125,14 @@ TEST_P(Http2CodecImplStreamLimitTest, LazyDecreaseMaxConcurrentStreamsConsumeErr // Deferred reset tests use only small windows so that we can test certain conditions. INSTANTIATE_TEST_SUITE_P(Http2CodecImplDeferredResetTest, Http2CodecImplDeferredResetTest, ::testing::Combine(HTTP2SETTINGS_SMALL_WINDOW_COMBINE, - HTTP2SETTINGS_SMALL_WINDOW_COMBINE, ::testing::Bool())); + HTTP2SETTINGS_SMALL_WINDOW_COMBINE, ::testing::Bool(), + ::testing::Bool())); // Flow control tests only use only small windows so that we can test certain conditions. INSTANTIATE_TEST_SUITE_P(Http2CodecImplFlowControlTest, Http2CodecImplFlowControlTest, ::testing::Combine(HTTP2SETTINGS_SMALL_WINDOW_COMBINE, - HTTP2SETTINGS_SMALL_WINDOW_COMBINE, ::testing::Bool())); + HTTP2SETTINGS_SMALL_WINDOW_COMBINE, ::testing::Bool(), + ::testing::Bool())); // we separate default/edge cases here to avoid combinatorial explosion #define HTTP2SETTINGS_DEFAULT_COMBINE \ @@ -2110,11 +2146,13 @@ INSTANTIATE_TEST_SUITE_P(Http2CodecImplFlowControlTest, Http2CodecImplFlowContro // edge settings allow for the number of streams needed by the test. INSTANTIATE_TEST_SUITE_P(Http2CodecImplStreamLimitTest, Http2CodecImplStreamLimitTest, ::testing::Combine(HTTP2SETTINGS_DEFAULT_COMBINE, - HTTP2SETTINGS_DEFAULT_COMBINE, ::testing::Bool())); + HTTP2SETTINGS_DEFAULT_COMBINE, ::testing::Bool(), + ::testing::Bool())); INSTANTIATE_TEST_SUITE_P(Http2CodecImplTestDefaultSettings, Http2CodecImplTest, ::testing::Combine(HTTP2SETTINGS_DEFAULT_COMBINE, - HTTP2SETTINGS_DEFAULT_COMBINE, ::testing::Bool())); + HTTP2SETTINGS_DEFAULT_COMBINE, ::testing::Bool(), + ::testing::Bool())); #define HTTP2SETTINGS_EDGE_COMBINE \ ::testing::Combine( \ @@ -2127,17 +2165,18 @@ INSTANTIATE_TEST_SUITE_P(Http2CodecImplTestDefaultSettings, Http2CodecImplTest, ::testing::Values(CommonUtility::OptionsLimits::MIN_INITIAL_CONNECTION_WINDOW_SIZE, \ CommonUtility::OptionsLimits::MAX_INITIAL_CONNECTION_WINDOW_SIZE)) -// Make sure we have coverage for high and low values for various combinations and permutations +// Make sure we have coverage for high and low values for various combinations and permutations // of HTTP settings in at least one test fixture. // Use with caution as any test using this runs 255 times. using Http2CodecImplTestAll = Http2CodecImplTest; INSTANTIATE_TEST_SUITE_P(Http2CodecImplTestDefaultSettings, Http2CodecImplTestAll, ::testing::Combine(HTTP2SETTINGS_DEFAULT_COMBINE, - HTTP2SETTINGS_DEFAULT_COMBINE, ::testing::Bool())); + HTTP2SETTINGS_DEFAULT_COMBINE, ::testing::Bool(), + ::testing::Bool())); INSTANTIATE_TEST_SUITE_P(Http2CodecImplTestEdgeSettings, Http2CodecImplTestAll, ::testing::Combine(HTTP2SETTINGS_EDGE_COMBINE, HTTP2SETTINGS_EDGE_COMBINE, - ::testing::Bool())); + ::testing::Bool(), ::testing::Bool())); TEST(Http2CodecUtility, reconstituteCrumbledCookies) { { @@ -2191,8 +2230,9 @@ class Http2CustomSettingsTestBase : public Http2CodecImplTestFixture { Http2CustomSettingsTestBase(Http2SettingsTuple client_settings, Http2SettingsTuple server_settings, bool use_new_codec_wrapper, - bool validate_client) - : Http2CodecImplTestFixture(client_settings, server_settings, use_new_codec_wrapper), + bool defer_processing_backedup_streams, bool validate_client) + : Http2CodecImplTestFixture(client_settings, server_settings, use_new_codec_wrapper, + defer_processing_backedup_streams), validate_client_(validate_client) {} ~Http2CustomSettingsTestBase() override = default; @@ -2236,16 +2276,17 @@ class Http2CustomSettingsTestBase : public Http2CodecImplTestFixture { class Http2CustomSettingsTest : public Http2CustomSettingsTestBase, public ::testing::TestWithParam< - ::testing::tuple> { + ::testing::tuple> { public: Http2CustomSettingsTest() : Http2CustomSettingsTestBase(::testing::get<0>(GetParam()), ::testing::get<1>(GetParam()), - ::testing::get<2>(GetParam()), ::testing::get<3>(GetParam())) {} + ::testing::get<2>(GetParam()), ::testing::get<3>(GetParam()), + ::testing::get<4>(GetParam())) {} }; INSTANTIATE_TEST_SUITE_P(Http2CodecImplTestEdgeSettings, Http2CustomSettingsTest, ::testing::Combine(HTTP2SETTINGS_DEFAULT_COMBINE, HTTP2SETTINGS_DEFAULT_COMBINE, ::testing::Bool(), - ::testing::Bool())); + ::testing::Bool(), ::testing::Bool())); // Validates that custom parameters (those which are not explicitly named in the // envoy::config::core::v3::Http2ProtocolOptions proto) are properly sent and processed by @@ -3223,6 +3264,220 @@ TEST_P(Http2CodecImplTest, ConnectTest) { driveToCompletion(); } +TEST_P(Http2CodecImplTest, ShouldWaitForDeferredBodyToProcessBeforeProcessingTrailers) { + // We must initialize before dtor, otherwise we'll touch uninitialized + // members in dtor. + initialize(); + + // Test only makes sense if we have defer processing enabled. + if (!defer_processing_backedup_streams_) { + return; + } + + TestRequestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); + EXPECT_TRUE(request_encoder_->encodeHeaders(request_headers, false).ok()); + driveToCompletion(); + + // Force the stream to buffer data at the receiving codec. + server_->getStream(1)->readDisable(true); + Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); + request_encoder_->encodeData(body, false); + driveToCompletion(); + + // Now re-enable the stream, and try dispatching trailers to the server. + // It should buffer those trailers until the buffered data is processed + // from the callback below. + auto* process_buffered_data_callback = + new NiceMock(&server_connection_.dispatcher_); + EXPECT_FALSE(process_buffered_data_callback->enabled_); + + server_->getStream(1)->readDisable(false); + + EXPECT_TRUE(process_buffered_data_callback->enabled_); + + // Trailers should be buffered by the codec since there is unprocessed body. + // Hence we shouldn't invoke decodeTrailers yet. + EXPECT_CALL(request_decoder_, decodeTrailers_(_)).Times(0); + request_encoder_->encodeTrailers(TestRequestTrailerMapImpl{{"trailing", "header"}}); + driveToCompletion(); + + // Now invoke the deferred processing callback. + { + InSequence seq; + EXPECT_CALL(request_decoder_, decodeData(_, false)); + EXPECT_CALL(request_decoder_, decodeTrailers_(_)); + process_buffered_data_callback->invokeCallback(); + } +} + +TEST_P(Http2CodecImplTest, ShouldBufferDeferredBodyNoEndstream) { + // We must initialize before dtor, otherwise we'll touch uninitialized + // members in dtor. + initialize(); + + // Test only makes sense if we have defer processing enabled. + if (!defer_processing_backedup_streams_) { + return; + } + + TestRequestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); + EXPECT_TRUE(request_encoder_->encodeHeaders(request_headers, false).ok()); + driveToCompletion(); + + // Force the stream to buffer data at the receiving codec. + server_->getStream(1)->readDisable(true); + Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); + request_encoder_->encodeData(body, false); + driveToCompletion(); + + // Now re-enable the stream, we should just flush the buffered data without + // end stream to the upstream. + auto* process_buffered_data_callback = + new NiceMock(&server_connection_.dispatcher_); + EXPECT_FALSE(process_buffered_data_callback->enabled_); + + server_->getStream(1)->readDisable(false); + + EXPECT_TRUE(process_buffered_data_callback->enabled_); + + // Now invoke the deferred processing callback. + { + InSequence seq; + EXPECT_CALL(request_decoder_, decodeData(_, false)); + process_buffered_data_callback->invokeCallback(); + } +} + +TEST_P(Http2CodecImplTest, ShouldBufferDeferredBodyWithEndStream) { + // We must initialize before dtor, otherwise we'll touch uninitialized + // members in dtor. + initialize(); + + // Test only makes sense if we have defer processing enabled. + if (!defer_processing_backedup_streams_) { + return; + } + + TestRequestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); + EXPECT_TRUE(request_encoder_->encodeHeaders(request_headers, false).ok()); + driveToCompletion(); + + // Force the stream to buffer data at the receiving codec. + server_->getStream(1)->readDisable(true); + Buffer::OwnedImpl first_part(std::string(1024, 'a')); + request_encoder_->encodeData(first_part, false); + driveToCompletion(); + + // Finish request in subsequent call. + Buffer::OwnedImpl final_part(std::string(1024, 'a')); + request_encoder_->encodeData(final_part, true); + driveToCompletion(); + + auto* process_buffered_data_callback = + new NiceMock(&server_connection_.dispatcher_); + EXPECT_FALSE(process_buffered_data_callback->enabled_); + + server_->getStream(1)->readDisable(false); + + EXPECT_TRUE(process_buffered_data_callback->enabled_); + + // Now invoke the deferred processing callback. + { + InSequence seq; + EXPECT_CALL(request_decoder_, decodeData(_, true)); + process_buffered_data_callback->invokeCallback(); + } +} + +TEST_P(Http2CodecImplTest, + ShouldGracefullyHandleBufferedDataConsumedByNetworkEventInsteadOfCallback) { + // We must initialize before dtor, otherwise we'll touch uninitialized + // members in dtor. + initialize(); + + // Test only makes sense if we have defer processing enabled. + if (!defer_processing_backedup_streams_) { + return; + } + + TestRequestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); + EXPECT_TRUE(request_encoder_->encodeHeaders(request_headers, false).ok()); + driveToCompletion(); + + // Force the stream to buffer data at the receiving codec. + server_->getStream(1)->readDisable(true); + Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); + request_encoder_->encodeData(body, false); + driveToCompletion(); + + auto* process_buffered_data_callback = + new NiceMock(&server_connection_.dispatcher_); + EXPECT_FALSE(process_buffered_data_callback->enabled_); + server_->getStream(1)->readDisable(false); + EXPECT_TRUE(process_buffered_data_callback->enabled_); + + // Scoop the buffered data instead by this call to encodeData. + EXPECT_CALL(request_decoder_, decodeData(_, true)); + request_encoder_->encodeData(body, true); + driveToCompletion(); + + // Deferred processing callback should have nothing to consume. + { + InSequence seq; + EXPECT_CALL(request_decoder_, decodeData(_, _)).Times(0); + EXPECT_CALL(request_decoder_, decodeTrailers_(_)).Times(0); + process_buffered_data_callback->invokeCallback(); + } +} + +TEST_P(Http2CodecImplTest, CanHandleMultipleBufferedDataProcessingOnAStream) { + // We must initialize before dtor, otherwise we'll touch uninitialized + // members in dtor. + initialize(); + + // Test only makes sense if we have defer processing enabled. + if (!defer_processing_backedup_streams_) { + return; + } + + TestRequestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); + EXPECT_TRUE(request_encoder_->encodeHeaders(request_headers, false).ok()); + driveToCompletion(); + + auto* process_buffered_data_callback = + new NiceMock(&server_connection_.dispatcher_); + + for (int i = 0; i < 10; ++i) { + // Repeatedly back up, clearing with the deferred processing callback. + const bool end_stream = i == 9; + server_->getStream(1)->readDisable(true); + Buffer::OwnedImpl body(std::string(1024, 'a')); + request_encoder_->encodeData(body, end_stream); + driveToCompletion(); + + EXPECT_FALSE(process_buffered_data_callback->enabled_); + server_->getStream(1)->readDisable(false); + EXPECT_TRUE(process_buffered_data_callback->enabled_); + + { + InSequence seq; + EXPECT_CALL(request_decoder_, decodeData(_, end_stream)); + process_buffered_data_callback->invokeCallback(); + EXPECT_FALSE(process_buffered_data_callback->enabled_); + } + } +} + class TestNghttp2SessionFactory; // Test client for H/2 METADATA frame edge cases. @@ -3347,6 +3602,9 @@ class Http2CodecMetadataTest : public Http2CodecImplTestFixture, public ::testin Runtime::LoaderSingleton::getExisting()->mergeValues( {{"envoy.reloadable_features.http2_new_codec_wrapper", enable_new_codec_wrapper_ ? "true" : "false"}}); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{std::string(Runtime::defer_processing_backedup_streams), + defer_processing_backedup_streams_ ? "true" : "false"}}); allow_metadata_ = true; http2OptionsFromTuple(client_http2_options_, client_settings_); http2OptionsFromTuple(server_http2_options_, server_settings_); diff --git a/test/config/utility.cc b/test/config/utility.cc index d2d9bacc285e..af8131b2dcd3 100644 --- a/test/config/utility.cc +++ b/test/config/utility.cc @@ -841,7 +841,7 @@ void ConfigHelper::configureUpstreamTls( }); } -void ConfigHelper::addRuntimeOverride(const std::string& key, const std::string& value) { +void ConfigHelper::addRuntimeOverride(absl::string_view key, absl::string_view value) { auto* static_layer = bootstrap_.mutable_layered_runtime()->mutable_layers(0)->mutable_static_layer(); (*static_layer->mutable_fields())[std::string(key)] = ValueUtil::stringValue(std::string(value)); diff --git a/test/config/utility.h b/test/config/utility.h index be4d0f1d042f..1aefcc8c62c1 100644 --- a/test/config/utility.h +++ b/test/config/utility.h @@ -322,7 +322,7 @@ class ConfigHelper { void skipPortUsageValidation() { skip_port_usage_validation_ = true; } // Add this key value pair to the static runtime. - void addRuntimeOverride(const std::string& key, const std::string& value); + void addRuntimeOverride(absl::string_view key, absl::string_view value); // Add typed_filter_metadata to the first listener. void addListenerTypedMetadata(absl::string_view key, ProtobufWkt::Any& packed_value); diff --git a/test/extensions/filters/http/fault/BUILD b/test/extensions/filters/http/fault/BUILD index 5c161d7eb673..428463ff15bc 100644 --- a/test/extensions/filters/http/fault/BUILD +++ b/test/extensions/filters/http/fault/BUILD @@ -54,6 +54,7 @@ envoy_extension_cc_test( name = "fault_filter_integration_test", srcs = ["fault_filter_integration_test.cc"], extension_names = ["envoy.filters.http.fault"], + shard_count = 2, deps = [ "//source/extensions/filters/http/fault:config", "//test/integration:http_protocol_integration_lib", diff --git a/test/extensions/filters/http/kill_request/BUILD b/test/extensions/filters/http/kill_request/BUILD index 98398e0bd613..1747fa2540cc 100644 --- a/test/extensions/filters/http/kill_request/BUILD +++ b/test/extensions/filters/http/kill_request/BUILD @@ -44,6 +44,7 @@ envoy_extension_cc_test( name = "kill_request_filter_integration_test", srcs = ["kill_request_filter_integration_test.cc"], extension_names = ["envoy.filters.http.kill_request"], + shard_count = 2, deps = [ "//source/extensions/filters/http/kill_request:kill_request_config", "//test/integration:http_protocol_integration_lib", diff --git a/test/extensions/filters/http/rbac/BUILD b/test/extensions/filters/http/rbac/BUILD index 0e2cf13fc8cc..9a1da74d8218 100644 --- a/test/extensions/filters/http/rbac/BUILD +++ b/test/extensions/filters/http/rbac/BUILD @@ -46,6 +46,7 @@ envoy_extension_cc_test( name = "rbac_filter_integration_test", srcs = ["rbac_filter_integration_test.cc"], extension_names = ["envoy.filters.http.rbac"], + shard_count = 2, deps = [ "//source/extensions/clusters/dynamic_forward_proxy:cluster", "//source/extensions/filters/http/dynamic_forward_proxy:config", diff --git a/test/integration/BUILD b/test/integration/BUILD index 27ad677a7867..801fc8b0470e 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -407,13 +407,14 @@ envoy_cc_test( srcs = [ "buffer_accounting_integration_test.cc", ], - shard_count = 2, + shard_count = 4, deps = [ ":base_overload_integration_test_lib", ":http_integration_lib", ":http_protocol_integration_lib", ":socket_interface_swap_lib", ":tracked_watermark_buffer_lib", + "//test/integration/filters:tee_filter_lib", "//test/mocks/http:http_mocks", "//test/test_common:utility_lib", "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", @@ -548,7 +549,7 @@ envoy_cc_test( ], # As this test has many H1/H2/v4/v6 tests it takes a while to run. # Shard it enough to bring the run time in line with other integration tests. - shard_count = 10, + shard_count = 16, deps = [ ":protocol_integration_test_lib", ], @@ -569,7 +570,7 @@ envoy_cc_test( srcs = [ "multiplexed_upstream_integration_test.cc", ], - shard_count = 2, + shard_count = 4, deps = [ ":http_protocol_integration_lib", "//source/common/http:header_map_lib", @@ -674,7 +675,7 @@ envoy_cc_test( srcs = ["idle_timeout_integration_test.cc"], # As this test has many pauses for idle timeouts, it takes a while to run. # Shard it enough to bring the run time in line with other integration tests. - shard_count = 2, + shard_count = 4, deps = [ ":http_protocol_integration_lib", "//test/integration/filters:backpressure_filter_config_lib", @@ -1005,6 +1006,7 @@ envoy_cc_test( srcs = [ "redirect_integration_test.cc", ], + shard_count = 2, deps = [ ":http_protocol_integration_lib", "//source/common/http:header_map_lib", @@ -1148,7 +1150,7 @@ envoy_cc_test_library( envoy_cc_test( name = "overload_integration_test", srcs = ["overload_integration_test.cc"], - shard_count = 2, + shard_count = 4, deps = [ ":base_overload_integration_test_lib", ":http_protocol_integration_lib", diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index d996dbdf601f..85ee8e20bb6d 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -9,6 +9,7 @@ #include "test/integration/autonomous_upstream.h" #include "test/integration/base_overload_integration_test.h" +#include "test/integration/filters/tee_filter.h" #include "test/integration/http_protocol_integration.h" #include "test/integration/tracked_watermark_buffer.h" #include "test/integration/utility.h" @@ -112,6 +113,9 @@ class Http2BufferWatermarksTest std::shared_ptr buffer_factory_; bool streamBufferAccounting() { return std::get<1>(GetParam()); } + bool deferProcessingBackedUpStreams() { + return Runtime::runtimeFeatureEnabled(Runtime::defer_processing_backedup_streams); + } std::string printAccounts() { std::stringstream stream; @@ -670,8 +674,15 @@ TEST_P(Http2OverloadManagerIntegrationTest, CanResetStreamIfEnvoyLevelStreamEnde upstream_request_for_response->encodeData(response_size, true); if (streamBufferAccounting()) { - // Wait for access log to know the Envoy level stream has been deleted. - EXPECT_THAT(waitForAccessLog(access_log_name_), HasSubstr("200")); + if (deferProcessingBackedUpStreams()) { + // Wait for an accumulation of data, as we cannot rely on the access log + // output since we're deferring the processing of the stream data. + EXPECT_TRUE(buffer_factory_->waitUntilTotalBufferedExceeds(10 * 10 * 1024)); + + } else { + // Wait for access log to know the Envoy level stream has been deleted. + EXPECT_THAT(waitForAccessLog(access_log_name_), HasSubstr("200")); + } } // Set the pressure so the overload action kills the response if doing stream @@ -704,4 +715,263 @@ TEST_P(Http2OverloadManagerIntegrationTest, CanResetStreamIfEnvoyLevelStreamEnde } } +class Http2DeferredProcessingIntegrationTest : public Http2BufferWatermarksTest { +public: + Http2DeferredProcessingIntegrationTest() : registered_tee_factory_(tee_filter_factory_) { + config_helper_.prependFilter(R"EOF( + name: stream-tee-filter + )EOF"); + } + +protected: + StreamTeeFilterConfig tee_filter_factory_; + Registry::InjectFactory + registered_tee_factory_; +}; + +// We run with buffer accounting in order to verify the amount of data in the +// system. Buffer accounting isn't necessary for deferring http2 processing. +INSTANTIATE_TEST_SUITE_P( + IpVersions, Http2DeferredProcessingIntegrationTest, + testing::Combine(testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( + {Http::CodecType::HTTP2}, {FakeHttpConnection::Type::HTTP2})), + testing::Values(true), testing::Bool()), + protocolTestParamsAndBoolToString); + +TEST_P(Http2DeferredProcessingIntegrationTest, CanBufferInDownstreamCodec) { + config_helper_.setBufferLimits(1000, 1000); + initialize(); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{std::string(Runtime::defer_processing_backedup_streams), "true"}}); + + // Stop writes to the upstream. + write_matcher_->setDestinationPort(fake_upstreams_[0]->localAddress()->ip()->port()); + write_matcher_->setWriteReturnsEgain(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + auto [request_encoder, response_decoder] = codec_client_->startRequest(default_request_headers_); + codec_client_->sendData(request_encoder, 1000, false); + // Wait for an upstream request to have our reach its buffer limit and read + // disable. + test_server_->waitForGaugeEq("cluster.cluster_0.upstream_rq_active", 1); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_backed_up_total", 1); + test_server_->waitForCounterEq("http.config_test.downstream_flow_control_paused_reading_total", + 1); + + codec_client_->sendData(request_encoder, 1000, true); + + // Verify codec received but is buffered as we're still read disabled. + buffer_factory_->waitUntilTotalBufferedExceeds(2000); + test_server_->waitForCounterEq("http.config_test.downstream_flow_control_resumed_reading_total", + 0); + EXPECT_TRUE(tee_filter_factory_.inspectStreamTee(0, [](const StreamTee& tee) { + absl::MutexLock l{&tee.mutex_}; + EXPECT_EQ(tee.request_body_.length(), 1000); + })); + + // Allow draining to the upstream, and complete the stream. + write_matcher_->setResumeWrites(); + + waitForNextUpstreamRequest(); + FakeStreamPtr upstream_request = std::move(upstream_request_); + upstream_request->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + ASSERT_TRUE(response_decoder->waitForEndStream()); + ASSERT_TRUE(upstream_request->complete()); + test_server_->waitForCounterEq("http.config_test.downstream_flow_control_resumed_reading_total", + 1); +} + +TEST_P(Http2DeferredProcessingIntegrationTest, CanBufferInUpstreamCodec) { + config_helper_.setBufferLimits(1000, 1000); + initialize(); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{std::string(Runtime::defer_processing_backedup_streams), "true"}}); + + // Stop writes to the downstream. + write_matcher_->setSourcePort(lookupPort("http")); + codec_client_ = makeHttpConnection(lookupPort("http")); + write_matcher_->setWriteReturnsEgain(); + + auto response_decoder = codec_client_->makeRequestWithBody(default_request_headers_, 1); + waitForNextUpstreamRequest(); + FakeStreamPtr upstream_request = std::move(upstream_request_); + upstream_request->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request->encodeData(1000, false); + + // Wait for an upstream response to have our reach its buffer limit and read + // disable. + test_server_->waitForGaugeEq("cluster.cluster_0.upstream_rq_active", 1); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_paused_reading_total", 1); + + upstream_request->encodeData(500, false); + + // Verify codec received but is buffered as we're still read disabled. + buffer_factory_->waitUntilTotalBufferedExceeds(1500); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_resumed_reading_total", + 0); + EXPECT_TRUE(tee_filter_factory_.inspectStreamTee(0, [](const StreamTee& tee) { + absl::MutexLock l{&tee.mutex_}; + EXPECT_EQ(tee.response_body_.length(), 1000); + })); + + // Allow draining to the downstream, and complete the stream. + write_matcher_->setResumeWrites(); + response_decoder->waitForBodyData(1500); + + upstream_request->encodeData(1, true); + ASSERT_TRUE(response_decoder->waitForEndStream()); + ASSERT_TRUE(upstream_request->complete()); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_resumed_reading_total", + 1); +} + +TEST_P(Http2DeferredProcessingIntegrationTest, CanDeferOnStreamCloseForUpstream) { + config_helper_.setBufferLimits(1000, 1000); + initialize(); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{std::string(Runtime::defer_processing_backedup_streams), "true"}}); + + // Stop writes to the downstream. + write_matcher_->setSourcePort(lookupPort("http")); + codec_client_ = makeHttpConnection(lookupPort("http")); + write_matcher_->setWriteReturnsEgain(); + + auto response_decoder = codec_client_->makeRequestWithBody(default_request_headers_, 1); + waitForNextUpstreamRequest(); + FakeStreamPtr upstream_request = std::move(upstream_request_); + upstream_request->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request->encodeData(1000, false); + + // Wait for an upstream response to have our reach its buffer limit and read + // disable. + test_server_->waitForGaugeEq("cluster.cluster_0.upstream_rq_active", 1); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_paused_reading_total", 1); + upstream_request->encodeData(500, true); + + // Verify codec received and has buffered onStreamClose for upstream as we're still read disabled. + buffer_factory_->waitUntilTotalBufferedExceeds(1500); + test_server_->waitForGaugeEq("cluster.cluster_0.http2.deferred_stream_close", 1); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_resumed_reading_total", + 0); + EXPECT_TRUE(tee_filter_factory_.inspectStreamTee(0, [](const StreamTee& tee) { + absl::MutexLock l{&tee.mutex_}; + EXPECT_EQ(tee.response_body_.length(), 1000); + })); + + // Allow draining to the downstream. + write_matcher_->setResumeWrites(); + + ASSERT_TRUE(response_decoder->waitForEndStream()); + ASSERT_TRUE(upstream_request->complete()); + test_server_->waitForGaugeEq("cluster.cluster_0.http2.deferred_stream_close", 0); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_resumed_reading_total", + 1); +} + +TEST_P(Http2DeferredProcessingIntegrationTest, + ShouldCloseDeferredUpstreamOnStreamCloseIfLocalReply) { + config_helper_.setBufferLimits(9000, 9000); + initialize(); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{std::string(Runtime::defer_processing_backedup_streams), "true"}}); + + // Stop writes to the downstream. + write_matcher_->setSourcePort(lookupPort("http")); + codec_client_ = makeHttpConnection(lookupPort("http")); + write_matcher_->setWriteReturnsEgain(); + + auto response_decoder = codec_client_->makeRequestWithBody(default_request_headers_, 1); + waitForNextUpstreamRequest(); + FakeStreamPtr upstream_request = std::move(upstream_request_); + upstream_request->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request->encodeData(9000, false); + + // Wait for an upstream response to have our reach its buffer limit and read + // disable. + test_server_->waitForGaugeEq("cluster.cluster_0.upstream_rq_active", 1); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_paused_reading_total", 1); + + auto close_if_over_9000 = + [](StreamTee& tee, Http::StreamEncoderFilterCallbacks* encoder_cbs) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(tee.mutex_) -> Http::FilterDataStatus { + if (tee.response_body_.length() > 9000) { + encoder_cbs->sendLocalReply(Http::Code::InternalServerError, "Response size was over 9000!", + nullptr, absl::nullopt, ""); + return Http::FilterDataStatus::StopIterationNoBuffer; + } + return Http::FilterDataStatus::Continue; + }; + + EXPECT_TRUE(tee_filter_factory_.setEncodeDataCallback(0, close_if_over_9000)); + + upstream_request->encodeData(1, true); + + // Verify codec received and has buffered onStreamClose for upstream as we're still read disabled. + buffer_factory_->waitUntilTotalBufferedExceeds(9001); + test_server_->waitForGaugeEq("cluster.cluster_0.http2.deferred_stream_close", 1); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_resumed_reading_total", + 0); + EXPECT_TRUE(tee_filter_factory_.inspectStreamTee(0, [](const StreamTee& tee) { + absl::MutexLock l{&tee.mutex_}; + EXPECT_EQ(tee.response_body_.length(), 9000); + })); + + // Allow draining to the downstream, which should trigger a local reply. + write_matcher_->setResumeWrites(); + + ASSERT_TRUE(response_decoder->waitForReset()); + ASSERT_TRUE(upstream_request->complete()); + test_server_->waitForGaugeEq("cluster.cluster_0.http2.deferred_stream_close", 0); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_resumed_reading_total", + 1); +} + +TEST_P(Http2DeferredProcessingIntegrationTest, + ShouldCloseDeferredUpstreamOnStreamCloseIfResetByDownstream) { + config_helper_.setBufferLimits(1000, 1000); + initialize(); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{std::string(Runtime::defer_processing_backedup_streams), "true"}}); + + // Stop writes to the downstream. + write_matcher_->setSourcePort(lookupPort("http")); + codec_client_ = makeHttpConnection(lookupPort("http")); + write_matcher_->setWriteReturnsEgain(); + + auto [request_encoder, response_decoder] = codec_client_->startRequest(default_request_headers_); + codec_client_->sendData(request_encoder, 100, true); + + waitForNextUpstreamRequest(); + FakeStreamPtr upstream_request = std::move(upstream_request_); + + upstream_request->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request->encodeData(1000, false); + + // Wait for an upstream response to have our reach its buffer limit and read + // disable. + test_server_->waitForGaugeEq("cluster.cluster_0.upstream_rq_active", 1); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_paused_reading_total", 1); + + upstream_request->encodeData(500, true); + ASSERT_TRUE(upstream_request->complete()); + + // Verify codec received and has buffered onStreamClose for upstream as we're still read disabled. + buffer_factory_->waitUntilTotalBufferedExceeds(1500); + test_server_->waitForGaugeEq("cluster.cluster_0.http2.deferred_stream_close", 1); + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_resumed_reading_total", + 0); + EXPECT_TRUE(tee_filter_factory_.inspectStreamTee(0, [](const StreamTee& tee) { + absl::MutexLock l{&tee.mutex_}; + EXPECT_EQ(tee.response_body_.length(), 1000); + })); + + // Downstream sends a RST, we should clean up the buffered upstream. + codec_client_->sendReset(request_encoder); + test_server_->waitForGaugeEq("cluster.cluster_0.http2.deferred_stream_close", 0); + // Resetting the upstream stream doesn't increment this count. + test_server_->waitForCounterEq("cluster.cluster_0.upstream_flow_control_resumed_reading_total", + 0); +} + } // namespace Envoy diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index e29fd05bd6e5..f495efca2a44 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -45,6 +45,27 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "tee_filter_lib", + srcs = [ + "tee_filter.cc", + ], + hdrs = [ + "tee_filter.h", + ], + external_deps = ["abseil_synchronization"], + deps = [ + "//envoy/http:filter_interface", + "//envoy/registry", + "//envoy/server:filter_config_interface", + "//source/common/buffer:buffer_lib", + "//source/common/common:minimal_logger_lib", + "//source/common/http:header_map_lib", + "//source/extensions/filters/http/common:pass_through_filter_lib", + "//test/extensions/filters/http/common:empty_http_filter_config_lib", + ], +) + envoy_cc_test_library( name = "local_reply_during_encoding_data_filter_lib", srcs = [ diff --git a/test/integration/filters/tee_filter.cc b/test/integration/filters/tee_filter.cc new file mode 100644 index 000000000000..8c6e5cf2d90c --- /dev/null +++ b/test/integration/filters/tee_filter.cc @@ -0,0 +1,85 @@ +#include "test/integration/filters/tee_filter.h" + +#include "envoy/registry/registry.h" + +#include "source/common/common/logger.h" +#include "source/common/http/header_map_impl.h" + +namespace Envoy { + +// A test filter that essentially tees the data flow through it. +class StreamTeeFilter : public Http::PassThroughFilter, public StreamTee { +public: + // Http::PassThroughFilter + Http::FilterDataStatus decodeData(Buffer::Instance& buffer, bool end_stream) override { + ENVOY_LOG_MISC(trace, "StreamTee decodeData {}", buffer.length()); + absl::MutexLock l{&mutex_}; + request_body_.add(buffer); + decode_end_stream_ = end_stream; + return Http::FilterDataStatus::Continue; + } + + Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& request_trailers) override { + absl::MutexLock l{&mutex_}; + request_trailers_ = Http::createHeaderMap(request_trailers); + decode_end_stream_ = true; + return Http::FilterTrailersStatus::Continue; + } + + Http::FilterDataStatus encodeData(Buffer::Instance& buffer, bool end_stream) override { + ENVOY_LOG_MISC(trace, "StreamTee encodeData {}", buffer.length()); + absl::MutexLock l{&mutex_}; + response_body_.add(buffer); + encode_end_stream_ = end_stream; + if (on_encode_data_) { + return on_encode_data_(*this, encoder_callbacks_); + } + return Http::FilterDataStatus::Continue; + } + + Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap& response_trailers) override { + absl::MutexLock l{&mutex_}; + response_trailers_ = Http::createHeaderMap(response_trailers); + encode_end_stream_ = true; + return Http::FilterTrailersStatus::Continue; + } +}; + +Http::FilterFactoryCb StreamTeeFilterConfig::createFilter(const std::string&, + Server::Configuration::FactoryContext&) { + return [this](Http::FilterChainFactoryCallbacks& callbacks) -> void { + auto filter = std::make_shared(); + // TODO(kbaichoo): support multiple streams. + current_tee_ = filter; + callbacks.addStreamFilter(std::move(filter)); + }; +} + +bool StreamTeeFilterConfig::inspectStreamTee(int /*stream_number*/, + std::function inspector) { + if (!current_tee_) { + ENVOY_LOG_MISC(warn, "No current stream_tee!"); + return false; + } + + // TODO(kbaichoo): support multiple streams. + inspector(*current_tee_); + return true; +} + +bool StreamTeeFilterConfig::setEncodeDataCallback( + int /*stream_number*/, + std::function + cb) { + if (!current_tee_) { + ENVOY_LOG_MISC(warn, "No current stream_tee!"); + return false; + } + + absl::MutexLock l{¤t_tee_->mutex_}; + current_tee_->on_encode_data_ = cb; + return true; +} + +} // namespace Envoy diff --git a/test/integration/filters/tee_filter.h b/test/integration/filters/tee_filter.h new file mode 100644 index 000000000000..3c1d8b697308 --- /dev/null +++ b/test/integration/filters/tee_filter.h @@ -0,0 +1,55 @@ +#pragma once +#include +#include + +#include "envoy/http/filter.h" +#include "envoy/server/filter_config.h" + +#include "source/common/buffer/buffer_impl.h" +#include "source/extensions/filters/http/common/pass_through_filter.h" + +#include "test/extensions/filters/http/common/empty_http_filter_config.h" + +#include "absl/synchronization/mutex.h" + +namespace Envoy { + +// Tees stream data. +struct StreamTee { + virtual ~StreamTee() = default; + mutable absl::Mutex mutex_; + Buffer::OwnedImpl request_body_ ABSL_GUARDED_BY(mutex_){}; + Buffer::OwnedImpl response_body_ ABSL_GUARDED_BY(mutex_){}; + bool decode_end_stream_ ABSL_GUARDED_BY(mutex_){false}; + bool encode_end_stream_ ABSL_GUARDED_BY(mutex_){false}; + Http::RequestHeaderMapPtr request_headers_ ABSL_GUARDED_BY(mutex_){nullptr}; + Http::RequestTrailerMapPtr request_trailers_ ABSL_GUARDED_BY(mutex_){nullptr}; + Http::ResponseHeaderMapPtr response_headers_ ABSL_GUARDED_BY(mutex_){nullptr}; + Http::ResponseTrailerMapPtr response_trailers_ ABSL_GUARDED_BY(mutex_){nullptr}; + + std::function + on_encode_data_ ABSL_GUARDED_BY(mutex_){nullptr}; +}; + +using StreamTeeSharedPtr = std::shared_ptr; + +// Inject a specific instance of this factory in order to leverage the same +// instance used by Envoy to inspect internally. +class StreamTeeFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig { +public: + StreamTeeFilterConfig() : EmptyHttpFilterConfig("stream-tee-filter") {} + + Http::FilterFactoryCb createFilter(const std::string&, + Server::Configuration::FactoryContext&) override; + bool inspectStreamTee(int /*stream_number*/, std::function inspector); + bool setEncodeDataCallback(int /*stream_number*/, + std::function + cb); + +private: + // TODO(kbaichoo): support multiple streams. + StreamTeeSharedPtr current_tee_; +}; + +} // namespace Envoy diff --git a/test/integration/http2_flood_integration_test.cc b/test/integration/http2_flood_integration_test.cc index d2ba01e1fdff..89553858d1f1 100644 --- a/test/integration/http2_flood_integration_test.cc +++ b/test/integration/http2_flood_integration_test.cc @@ -30,14 +30,20 @@ namespace Envoy { namespace { const uint32_t ControlFrameFloodLimit = 100; const uint32_t AllFrameFloodLimit = 1000; + +bool deferredProcessing(std::tuple params) { + return std::get<2>(params); +} + } // namespace std::string testParamsToString( - const ::testing::TestParamInfo> params) { + const ::testing::TestParamInfo> params) { const bool is_v4 = (std::get<0>(params.param) == Network::Address::IpVersion::v4); const bool http2_new_codec_wrapper = std::get<1>(params.param); - return absl::StrCat(is_v4 ? "IPv4" : "IPv6", - http2_new_codec_wrapper ? "WrappedHttp2" : "BareHttp2"); + return absl::StrCat( + is_v4 ? "IPv4" : "IPv6", http2_new_codec_wrapper ? "WrappedHttp2" : "BareHttp2", + deferredProcessing(params.param) ? "WithDeferredProcessing" : "NoDeferredProcessing"); } // It is important that the new socket interface is installed before any I/O activity starts and @@ -47,7 +53,7 @@ std::string testParamsToString( // Http2FrameIntegrationTest destructor completes. class Http2FloodMitigationTest : public SocketInterfaceSwap, - public testing::TestWithParam>, + public testing::TestWithParam>, public Http2RawFrameIntegrationTest { public: Http2FloodMitigationTest() : Http2RawFrameIntegrationTest(std::get<0>(GetParam())) { @@ -58,6 +64,8 @@ class Http2FloodMitigationTest const bool enable_new_wrapper = std::get<1>(GetParam()); config_helper_.addRuntimeOverride("envoy.reloadable_features.http2_new_codec_wrapper", enable_new_wrapper ? "true" : "false"); + config_helper_.addRuntimeOverride(Runtime::defer_processing_backedup_streams, + deferredProcessing(GetParam()) ? "true" : "false"); } protected: @@ -78,8 +86,8 @@ class Http2FloodMitigationTest INSTANTIATE_TEST_SUITE_P( IpVersions, Http2FloodMitigationTest, - testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), - testing::ValuesIn({false, true})), + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), ::testing::Bool(), + ::testing::Bool()), testParamsToString); void Http2FloodMitigationTest::initializeUpstreamFloodTest() { @@ -595,6 +603,15 @@ TEST_P(Http2FloodMitigationTest, Trailers) { // Verify flood detection by the WINDOW_UPDATE frame when a decoder filter is resuming reading from // the downstream via DecoderFilterBelowWriteBufferLowWatermark. TEST_P(Http2FloodMitigationTest, WindowUpdateOnLowWatermarkFlood) { + // This test depends on data flowing through a backed up stream eagerly (e.g. the + // backpressure-filter triggers above watermark when it receives headers from + // the downstream, and only goes below watermark if the response body has + // passed through the filter.). With defer processing of backed up streams however + // the data won't be eagerly processed as the stream is backed up. + // TODO(kbaichoo): Remove this test when removing this feature tag. + if (deferredProcessing(GetParam())) { + return; + } config_helper_.prependFilter(R"EOF( name: backpressure-filter )EOF"); diff --git a/test/integration/http_protocol_integration.cc b/test/integration/http_protocol_integration.cc index 2eff0470845d..73266ae93fe2 100644 --- a/test/integration/http_protocol_integration.cc +++ b/test/integration/http_protocol_integration.cc @@ -8,28 +8,39 @@ std::vector HttpProtocolIntegrationTest::getProtocolTest const std::vector& upstream_protocols) { std::vector ret; + const auto addHttp2TestParametersWithNewCodecWrapperOrDeferredProcessing = + [&ret](Network::Address::IpVersion ip_version, Http::CodecType downstream_protocol, + Http::CodecType upstream_protocol) { + ret.push_back(HttpProtocolTestParams{ip_version, downstream_protocol, upstream_protocol, + true, false}); + ret.push_back( + HttpProtocolTestParams{ip_version, downstream_protocol, upstream_protocol, true, true}); + ret.push_back(HttpProtocolTestParams{ip_version, downstream_protocol, upstream_protocol, + false, true}); + }; + for (auto ip_version : TestEnvironment::getIpVersionsForTest()) { for (auto downstream_protocol : downstream_protocols) { for (auto upstream_protocol : upstream_protocols) { #ifdef ENVOY_ENABLE_QUIC - ret.push_back( - HttpProtocolTestParams{ip_version, downstream_protocol, upstream_protocol, false}); + ret.push_back(HttpProtocolTestParams{ip_version, downstream_protocol, upstream_protocol, + false, false}); if (downstream_protocol == Http::CodecType::HTTP2 || upstream_protocol == Http::CodecType::HTTP2) { - ret.push_back( - HttpProtocolTestParams{ip_version, downstream_protocol, upstream_protocol, true}); + addHttp2TestParametersWithNewCodecWrapperOrDeferredProcessing( + ip_version, downstream_protocol, upstream_protocol); } #else if (downstream_protocol == Http::CodecType::HTTP3 || upstream_protocol == Http::CodecType::HTTP3) { ENVOY_LOG_MISC(warn, "Skipping HTTP/3 as support is compiled out"); } else { - ret.push_back( - HttpProtocolTestParams{ip_version, downstream_protocol, upstream_protocol, false}); + ret.push_back(HttpProtocolTestParams{ip_version, downstream_protocol, upstream_protocol, + false, false}); if (downstream_protocol == Http::CodecType::HTTP2 || upstream_protocol == Http::CodecType::HTTP2) { - ret.push_back( - HttpProtocolTestParams{ip_version, downstream_protocol, upstream_protocol, true}); + addHttp2TestParametersWithNewCodecWrapperOrDeferredProcessing( + ip_version, downstream_protocol, upstream_protocol); } } #endif @@ -68,7 +79,9 @@ std::string HttpProtocolIntegrationTest::protocolTestParamsToString( return absl::StrCat((params.param.version == Network::Address::IpVersion::v4 ? "IPv4_" : "IPv6_"), downstreamToString(params.param.downstream_protocol), upstreamToString(params.param.upstream_protocol), - params.param.http2_new_codec_wrapper ? "WrappedHttp2" : "BareHttp2"); + params.param.http2_new_codec_wrapper ? "WrappedHttp2" : "BareHttp2", + params.param.defer_processing_backedup_streams ? "WithDeferredProcessing" + : "NoDeferredProcessing"); } void HttpProtocolIntegrationTest::expectUpstreamBytesSentAndReceived( diff --git a/test/integration/http_protocol_integration.h b/test/integration/http_protocol_integration.h index d5bc1607ab6b..d288babeb2d9 100644 --- a/test/integration/http_protocol_integration.h +++ b/test/integration/http_protocol_integration.h @@ -11,6 +11,7 @@ struct HttpProtocolTestParams { Http::CodecType downstream_protocol; Http::CodecType upstream_protocol; bool http2_new_codec_wrapper; + bool defer_processing_backedup_streams; }; // Allows easy testing of Envoy code for HTTP/HTTP2 upstream/downstream. @@ -56,6 +57,9 @@ class HttpProtocolIntegrationTest : public testing::TestWithParambody().size()); } -// Very similar set-up to testRetry but with a 16k request the request will not +// Very similar set-up to testRetry but with a 65k request the request will not // be buffered and the 503 will be returned to the user. TEST_P(ProtocolIntegrationTest, RetryHittingBufferLimit) { config_helper_.setBufferLimits(1024, 1024); // Set buffer limits upstream and downstream. diff --git a/test/mocks/http/stream.h b/test/mocks/http/stream.h index a9d27e58ddef..2d0356abf812 100644 --- a/test/mocks/http/stream.h +++ b/test/mocks/http/stream.h @@ -18,7 +18,7 @@ class MockStream : public Stream { MOCK_METHOD(void, resetStream, (StreamResetReason reason)); MOCK_METHOD(void, readDisable, (bool disable)); MOCK_METHOD(void, setWriteBufferWatermarks, (uint32_t)); - MOCK_METHOD(uint32_t, bufferLimit, ()); + MOCK_METHOD(uint32_t, bufferLimit, (), (const)); MOCK_METHOD(const Network::Address::InstanceConstSharedPtr&, connectionLocalAddress, ()); MOCK_METHOD(void, setFlushTimeout, (std::chrono::milliseconds timeout)); MOCK_METHOD(void, setAccount, (Buffer::BufferMemoryAccountSharedPtr));