Skip to content

Commit

Permalink
upstream: Initialize upstream network filters (envoyproxy#25792)
Browse files Browse the repository at this point in the history
Initialize upstream network filters at onPoolReady() by calling the
underlying connection's initializeReadFilters(). For downstream network
filters this is done when the connection is created, as the incoming
connection is already connected. For upstream network filters
initializeReadFilters() was never called, causing each filter's
onNewConnection() to be called only when first data was received.

This change makes upstream network filter behavior more symmetric to
downstream network filters. This ensures that filters may depend on their
onNewConnection() being called after connection establishment also if
there is no data being received on the connection for some time.

It should be noted that upstream network filters may still have their
onWrite() called before onNewConnection(), if data is written before the
connection is established. onNewConnection() is always called
before onData() is called the first time.

Signed-off-by: Jarno Rajahalme <[email protected]>
  • Loading branch information
jrajahalme authored Mar 17, 2023
1 parent fa5f548 commit 51f0558
Show file tree
Hide file tree
Showing 13 changed files with 222 additions and 21 deletions.
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ bug_fixes:
- area: ext_proc
change: |
Let onData always raise StopIterationAndWatermark when waiting for headers response, to avoid http errors (413 on request path, and 500 on response path) when data size goes above high watermark.
- area: upstream
change: |
Initialize upstream network read filters via their ``onNewConnection()`` callback once the upstream connection has been established even if there is no data available for reading on the new upstream connection. This behavior change can be reverted by setting ``envoy.reloadable_features.initialize_upstream_filters`` to ``false``.
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
5 changes: 4 additions & 1 deletion source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,10 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
[&client]() { client.onConnectionDurationTimeout(); });
client.connection_duration_timer_->enableTimer(max_connection_duration.value());
}

// Initialize client read filters
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.initialize_upstream_filters")) {
client.initializeReadFilters();
}
// At this point, for the mixed ALPN pool, the client may be deleted. Do not
// refer to client after this point.
onConnected(client);
Expand Down
3 changes: 3 additions & 0 deletions source/common/conn_pool/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class ActiveClient : public LinkedObject<ActiveClient>,
return std::min<int64_t>(remaining_streams_, remaining_concurrent_streams);
}

// Initialize upstream read filters. Called when connected.
virtual void initializeReadFilters() PURE;

// Closes the underlying connection.
virtual void close() PURE;
// Returns the ID of the underlying connection.
Expand Down
6 changes: 6 additions & 0 deletions source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class CodecClient : protected Logger::Loggable<Logger::Id::client>,
*/
bool isHalfCloseEnabled() { return connection_->isHalfCloseEnabled(); }

/**
* Initialize all of the installed read filters on the underlying connection.
* This effectively calls onNewConnection() on each of them.
*/
void initializeReadFilters() { connection_->initializeReadFilters(); }

/**
* Close the underlying network connection. This is immediate and will not attempt to flush any
* pending write data.
Expand Down
1 change: 1 addition & 0 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
&traffic_stats.bind_errors_, nullptr});
}

void initializeReadFilters() override { codec_client_->initializeReadFilters(); }
absl::optional<Http::Protocol> protocol() const override { return codec_client_->protocol(); }
void close() override { codec_client_->close(); }
virtual Http::RequestEncoder& newStreamEncoder(Http::ResponseDecoder& response_decoder) PURE;
Expand Down
1 change: 1 addition & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ envoy_cc_library(
"//envoy/network:filter_interface",
"//source/common/common:assert_lib",
"//source/common/common:linked_object",
"//source/common/runtime:runtime_lib",
],
)

Expand Down
19 changes: 18 additions & 1 deletion source/common/network/filter_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "envoy/network/connection.h"

#include "source/common/common/assert.h"
#include "source/common/runtime/runtime_features.h"

namespace Envoy {
namespace Network {
Expand Down Expand Up @@ -41,7 +42,23 @@ bool FilterManagerImpl::initializeReadFilters() {
if (upstream_filters_.empty()) {
return false;
}
onContinueReading(nullptr, connection_);
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.initialize_upstream_filters")) {
// Initialize read filters without calling onData() afterwards.
// This is called just after an connection has been established and nothing may have been read
// yet. onData() will be called separately as data is read from the connection.
for (auto& entry : upstream_filters_) {
if (entry->filter_ && !entry->initialized_) {
entry->initialized_ = true;
FilterStatus status = entry->filter_->onNewConnection();
if (status == FilterStatus::StopIteration ||
connection_.state() != Connection::State::Open) {
break;
}
}
}
} else {
onContinueReading(nullptr, connection_);
}
return true;
}

Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ RUNTIME_GUARD(envoy_reloadable_features_http_filter_avoid_reentrant_local_reply)
RUNTIME_GUARD(envoy_reloadable_features_http_reject_path_with_fragment);
RUNTIME_GUARD(envoy_reloadable_features_http_response_half_close);
RUNTIME_GUARD(envoy_reloadable_features_http_strip_fragment_from_path_unsafe_if_disabled);
RUNTIME_GUARD(envoy_reloadable_features_initialize_upstream_filters);
RUNTIME_GUARD(envoy_reloadable_features_no_extension_lookup_by_name);
RUNTIME_GUARD(envoy_reloadable_features_no_full_scan_certs_on_sni_mismatch);
RUNTIME_GUARD(envoy_reloadable_features_oauth_header_passthrough_fix);
Expand Down
2 changes: 2 additions & 0 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "source/common/common/logger.h"
#include "source/common/http/conn_pool_base.h"
#include "source/common/network/filter_impl.h"
#include "source/common/runtime/runtime_features.h"

namespace Envoy {
namespace Tcp {
Expand Down Expand Up @@ -111,6 +112,7 @@ class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient {
}
}

void initializeReadFilters() override { connection_->initializeReadFilters(); }
absl::optional<Http::Protocol> protocol() const override { return {}; }
void close() override;
uint32_t numActiveStreams() const override { return callbacks_ ? 1 : 0; }
Expand Down
1 change: 1 addition & 0 deletions test/common/conn_pool/conn_pool_base_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class TestActiveClient : public ActiveClient {
: ActiveClient(parent, lifetime_stream_limit, concurrent_stream_limit),
supports_early_data_(supports_early_data) {}

void initializeReadFilters() override {}
void close() override { onEvent(Network::ConnectionEvent::LocalClose); }
uint64_t id() const override { return 1; }
bool closingWithIncompleteStream() const override { return false; }
Expand Down
1 change: 1 addition & 0 deletions test/common/tcp/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ class TcpConnPoolImplDestructorTest : public Event::TestUsingSimulatedTime, publ
EXPECT_CALL(*connection_, streamInfo());
EXPECT_CALL(*connection_, id()).Times(AnyNumber());
EXPECT_CALL(*connection_, readDisable(_)).Times(AnyNumber());
EXPECT_CALL(*connection_, initializeReadFilters());

connect_timer_ = new NiceMock<Event::MockTimer>(&dispatcher_);
EXPECT_CALL(dispatcher_, createClientConnection_(_, _, _, _)).WillOnce(Return(connection_));
Expand Down
2 changes: 2 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,13 @@ envoy_cc_test(
name = "cluster_filter_integration_test",
srcs = ["cluster_filter_integration_test.cc"],
deps = [
":http_integration_lib",
":integration_lib",
"//envoy/network:filter_interface",
"//source/extensions/filters/network/tcp_proxy:config",
"//test/config:utility_lib",
"//test/test_common:registry_lib",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
],
)
Expand Down
Loading

0 comments on commit 51f0558

Please sign in to comment.