diff --git a/src/v/cloud_storage_clients/client_pool.cc b/src/v/cloud_storage_clients/client_pool.cc index 7ee886ec19579..d1e2f9f896958 100644 --- a/src/v/cloud_storage_clients/client_pool.cc +++ b/src/v/cloud_storage_clients/client_pool.cc @@ -13,12 +13,15 @@ #include "cloud_storage_clients/abs_client.h" #include "cloud_storage_clients/logger.h" #include "cloud_storage_clients/s3_client.h" +#include "model/timeout_clock.h" #include "ssx/future-util.h" #include #include +#include #include +#include namespace { constexpr auto self_configure_attempts = 3; @@ -169,6 +172,8 @@ ss::future<> client_pool::stop() { _cvar.broken(); _self_config_barrier.broken(); _credentials_var.broken(); + // Wait for all background operations to complete. + co_await _bg_gate.close(); // Wait until all leased objects are returned co_await _gate.close(); @@ -238,7 +243,10 @@ std::tuple pick_two_random_shards() { ss::future client_pool::acquire(ss::abort_source& as) { auto guard = _gate.hold(); + std::optional source_sid; + std::optional client; + try { // If credentials have not yet been acquired, wait for them. It is // possible that credentials are not initialized right after remote @@ -251,29 +259,34 @@ client_pool::acquire(ss::abort_source& as) { u = co_await ss::get_units(_self_config_barrier, 1); } - while (unlikely( - _pool.empty() && !_gate.is_closed() && !_as.abort_requested())) { - if ( + while (!client.has_value() && !_gate.is_closed() + && !_as.abort_requested()) { + if (likely(!_pool.empty())) { + client = _pool.back(); + _pool.pop_back(); + } else if ( ss::smp::count == 1 || _policy == client_pool_overdraft_policy::wait_if_empty || _leased.size() >= _capacity * 2) { // If borrowing is disabled or this shard borrowed '_capacity' // client connections then wait util one of the clients is // freed. - co_await _cvar.wait(); + co_await ssx::with_timeout_abortable( + _cvar.wait(), model::no_timeout, as); + vlog( pool_log.debug, "cvar triggered, pool size: {}", _pool.size()); } else { + // Try borrowing from peer shard. auto clients_in_use = [](client_pool& other) { return std::clamp( other._capacity - other._pool.size(), 0UL, other._capacity); }; - // Borrow from random shard. Use 2-random approach. Pick 2 - // random shards + // Use 2-random approach. Pick 2 random shards auto [sid1, sid2] = pick_two_random_shards(); auto cnt1 = co_await container().invoke_on( sid1, clients_in_use); @@ -299,15 +312,16 @@ client_pool::acquire(ss::abort_source& as) { } // Depending on the result either wait or create new connection if (success) { - vlog(pool_log.debug, "successfuly borrowed from {}", sid); + vlog(pool_log.debug, "successfully borrowed from {}", sid); if (_probe) { _probe->register_borrow(); } source_sid = sid; - _pool.emplace_back(make_client()); + client = make_client(); } else { vlog(pool_log.debug, "can't borrow connection, waiting"); - co_await _cvar.wait(); + co_await ssx::with_timeout_abortable( + _cvar.wait(), model::no_timeout, as); vlog( pool_log.debug, "cvar triggered, pool size: {}", @@ -320,9 +334,7 @@ client_pool::acquire(ss::abort_source& as) { if (_gate.is_closed() || _as.abort_requested()) { throw ss::gate_closed_exception(); } - vassert(!_pool.empty(), "'acquire' invariant is broken"); - auto client = _pool.back(); - _pool.pop_back(); + vassert(client.has_value(), "'acquire' invariant is broken"); update_usage_stats(); vlog( @@ -337,27 +349,41 @@ client_pool::acquire(ss::abort_source& as) { } client_lease lease( - client, + client.value(), as, ss::make_deleter([pool = weak_from_this(), - client, + client = client.value(), g = std::move(guard), source_sid]() mutable { if (pool) { if (source_sid.has_value()) { - vlog( - pool_log.debug, "disposing the borrowed client connection"); - // Since the client was borrowed we can't just add it back to - // the pool. This will lead to a situation when the connection - // simultaneously exists on two different shards. + // If all clients from the local pool are in-use we will + // shutdown the borrowed one and return the "accounting unit" + // to the source shard. + // Otherwise, we replace the oldest client in the + // pool to improve connection reuse. + if (!pool->_pool.empty()) { + vlog( + pool_log.debug, + "disposing the the oldest client connection and " + "replacing it with the borrowed one"); + pool->_pool.push_back(std::move(client)); + client = std::move(pool->_pool.front()); + pool->_pool.pop_front(); + } else { + vlog( + pool_log.debug, + "disposing the borrowed client connection"); + } + client->shutdown(); - ssx::spawn_with_gate(pool->_gate, [client] { + ssx::spawn_with_gate(pool->_bg_gate, [client] { return client->stop().finally([client] {}); }); // In the background return the client to the connection pool // of the source shard. The lifetime is guaranteed by the gate // guard. - ssx::spawn_with_gate(pool->_gate, [&pool, source_sid] { + ssx::spawn_with_gate(pool->_bg_gate, [&pool, source_sid] { return pool->container().invoke_on( source_sid.value(), [my_sid = ss::this_shard_id()](client_pool& other) { @@ -408,24 +434,24 @@ bool client_pool::borrow_one(unsigned other) { _pool.pop_back(); update_usage_stats(); c->shutdown(); - ssx::spawn_with_gate(_gate, [c] { return c->stop().finally([c] {}); }); + ssx::spawn_with_gate(_bg_gate, [c] { return c->stop().finally([c] {}); }); return true; } void client_pool::return_one(unsigned other) { vlog(pool_log.debug, "shard {} returns a client", other); - if (_pool.size() + _leased.size() < _capacity) { - // The _pool has fewer elements than it should have because it was - // borrowed from previously. - _pool.emplace_back(make_client()); - update_usage_stats(); - vlog( - pool_log.debug, - "creating new client, current usage is {}/{}", - normalized_num_clients_in_use(), - _capacity); - _cvar.signal(); - } + vassert( + _pool.size() < _capacity, + "tried to return a borrowed client but the pool is full"); + // Cold clients are at the front. Hot clients are at the back. + _pool.emplace_front(make_client()); + update_usage_stats(); + vlog( + pool_log.debug, + "creating new client, current usage is {}/{}", + normalized_num_clients_in_use(), + _capacity); + _cvar.signal(); } size_t client_pool::size() const noexcept { return _pool.size(); } @@ -433,6 +459,7 @@ size_t client_pool::size() const noexcept { return _pool.size(); } size_t client_pool::max_size() const noexcept { return _capacity; } void client_pool::populate_client_pool() { + _pool.reserve(_capacity); for (size_t i = 0; i < _capacity; i++) { _pool.emplace_back(make_client()); } @@ -440,7 +467,7 @@ void client_pool::populate_client_pool() { _cvar.signal(); } -client_pool::http_client_ptr client_pool::make_client() const { +client_pool::http_client_ptr client_pool::make_client() const noexcept { return std::visit( [this](const auto& cfg) -> http_client_ptr { using cfg_type = std::decay_t; @@ -461,9 +488,10 @@ void client_pool::release(http_client_ptr leased) { "releasing a client, pool size: {}, capacity: {}", _pool.size(), _capacity); - if (_pool.size() < _capacity) { - _pool.emplace_back(std::move(leased)); - } + vassert( + _pool.size() < _capacity, + "tried to release a client but the pool is at capacity"); + _pool.emplace_back(std::move(leased)); _cvar.signal(); } diff --git a/src/v/cloud_storage_clients/client_pool.h b/src/v/cloud_storage_clients/client_pool.h index a18be70bdcbf3..3ab2cd2700548 100644 --- a/src/v/cloud_storage_clients/client_pool.h +++ b/src/v/cloud_storage_clients/client_pool.h @@ -16,6 +16,7 @@ #include "utils/intrusive_list_helpers.h" #include "utils/stop_signal.h" +#include #include #include #include @@ -132,6 +133,12 @@ class client_pool size_t max_size() const noexcept; + bool has_background_operations() const noexcept { + return _bg_gate.get_count() > 0; + } + + bool has_waiters() const noexcept { return _cvar.has_waiters(); } + private: ss::future<> client_self_configure(std::optional> @@ -143,7 +150,7 @@ class client_pool std::optional result); void populate_client_pool(); - http_client_ptr make_client() const; + http_client_ptr make_client() const noexcept; void release(http_client_ptr leased); /// Return number of clients which wasn't utilized @@ -162,12 +169,16 @@ class client_pool client_configuration _config; ss::shared_ptr _probe; client_pool_overdraft_policy _policy; - std::vector _pool; + ss::circular_buffer _pool; // List of all connections currently used by clients intrusive_list _leased; ss::condition_variable _cvar; ss::abort_source _as; ss::gate _gate; + // A gate for background operations. Most useful in testing where we want + // to wait all async housekeeping to complete before asserting state + // invariants. + ss::gate _bg_gate; /// Holds and applies the credentials for requests to S3. Shared pointer to /// enable rotating credentials to all clients. diff --git a/src/v/cloud_storage_clients/tests/CMakeLists.txt b/src/v/cloud_storage_clients/tests/CMakeLists.txt index 2394fd789e0d2..0173be4e6f48a 100644 --- a/src/v/cloud_storage_clients/tests/CMakeLists.txt +++ b/src/v/cloud_storage_clients/tests/CMakeLists.txt @@ -4,6 +4,7 @@ rp_test( BINARY_NAME cloud_storage_clients_single_thread SOURCES backend_detection_test.cc + client_pool_test.cc s3_client_test.cc xml_sax_parser_test.cc exception_test.cc diff --git a/src/v/cloud_storage_clients/tests/client_pool_mt_test.cc b/src/v/cloud_storage_clients/tests/client_pool_mt_test.cc index 68710e5bb8ecb..f705092cbbe7c 100644 --- a/src/v/cloud_storage_clients/tests/client_pool_mt_test.cc +++ b/src/v/cloud_storage_clients/tests/client_pool_mt_test.cc @@ -10,27 +10,25 @@ #include "cloud_storage_clients/client_pool.h" #include "cloud_storage_clients/s3_client.h" -#include "net/dns.h" #include "seastarx.h" #include +#include #include #include #include #include #include -#include -#include -#include -#include -#include #include #include +#include +#include #include #include #include +#include #include using namespace std::chrono_literals; @@ -39,7 +37,7 @@ ss::logger test_log("test-log"); static const uint16_t httpd_port_number = 4434; static constexpr const char* httpd_host_name = "127.0.0.1"; -cloud_storage_clients::s3_configuration transport_configuration() { +static cloud_storage_clients::s3_configuration transport_configuration() { net::unresolved_address server_addr(httpd_host_name, httpd_port_number); cloud_storage_clients::s3_configuration conf; conf.uri = cloud_storage_clients::access_point_uri(httpd_host_name); @@ -109,11 +107,29 @@ SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_blocked_on_another_shard) { }); }); - ss::sleep(1ms).get(); + // Wait for the above future to get scheduled and block. + pool + .invoke_on_others([](cloud_storage_clients::client_pool& pool) { + while (!pool.has_waiters()) { + return ss::yield(); + } + return ss::now(); + }) + .get(); vlog(test_log.debug, "return lease to the current shard"); leases.pop_front(); - ss::sleep(1ms).get(); + + pool + .invoke_on_all([](cloud_storage_clients::client_pool& pool) { + while (pool.has_background_operations()) { + return ss::yield(); + } + return ss::now(); + }) + .get(); + ; + // Since we returned to the current shard the future that // await for the 'acquire' method to be completed on another shard // souldn't become available. @@ -198,3 +214,118 @@ SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_blocked_on_this_shard) { BOOST_FAIL("Timed out"); } } + +SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_after_leasing_all) { + BOOST_REQUIRE(ss::smp::count == 2); + auto sconf = ss::sharded_parameter([] { + auto conf = transport_configuration(); + return conf; + }); + auto conf = transport_configuration(); + + ss::sharded pool; + size_t num_connections_per_shard = 4; + pool + .start( + num_connections_per_shard, + sconf, + cloud_storage_clients::client_pool_overdraft_policy::borrow_if_empty) + .get(); + + pool + .invoke_on_all([](cloud_storage_clients::client_pool& p) { + auto tcfg = transport_configuration(); + auto cred = cloud_roles::aws_credentials{ + tcfg.access_key.value(), + tcfg.secret_key.value(), + std::nullopt, + tcfg.region}; + p.load_credentials(cred); + }) + .get(); + auto pool_stop = ss::defer([&pool] { pool.stop().get(); }); + auto pool_no_bg_ops = [&pool] { + return pool.invoke_on_all([](cloud_storage_clients::client_pool& pool) { + while (pool.has_background_operations()) { + return ss::yield(); + } + return ss::now(); + }); + }; + + ss::abort_source as; + + struct shard_leases { + ss::abort_source as; + std::deque leases; + }; + ss::sharded leases; + leases.start().get(); + auto leases_stop = ss::defer([&leases] { leases.stop().get(); }); + + // Lease all connections from all the shards. + for (size_t i = 0; i < ss::smp::count * num_connections_per_shard; i++) { + leases.local().leases.push_back( + pool.local().acquire(leases.local().as).get()); + } + + vlog(test_log.debug, "connections depleted"); + + // Release clients from local shard. They are the first in the queue. + for (size_t i = 0; i < num_connections_per_shard; i++) { + leases.local().leases.pop_front(); + } + + vlog(test_log.debug, "done returning local leases"); + + // Lease local connections from the remote shard. + leases + .invoke_on( + 1, + [&pool, num_connections_per_shard](shard_leases& sl) { + return ss::async([&sl, &pool, num_connections_per_shard] { + for (size_t i = 0; i < num_connections_per_shard; i++) { + sl.leases.push_back(pool.local().acquire(sl.as).get()); + } + }); + }) + .get(); + + vlog(test_log.debug, "done borrowing leases"); + + auto pending_acquire = pool.local().acquire(as); + ss::yield().get(); + + if (pending_acquire.available()) { + BOOST_FAIL("Expecting to get blocked on acquire"); + } + + // Return all connections from the remote shard. + leases + .invoke_on( + 1, + [&pool_no_bg_ops, num_connections_per_shard](shard_leases& sl) { + return ss::async([&sl, &pool_no_bg_ops, num_connections_per_shard] { + for (size_t i = 0; i < num_connections_per_shard; i++) { + sl.leases.pop_back(); + pool_no_bg_ops().get(); + } + }); + }) + .get(); + + // Return connections from the local shard. + for (size_t i = 0; i < num_connections_per_shard; i++) { + leases.local().leases.pop_front(); + pool_no_bg_ops().get(); + } + vlog(test_log.debug, "done returning everything, will try to acquire now"); + + try { + ss::with_timeout( + ss::lowres_clock::now() + 1s, std::move(pending_acquire)) + .get(); + } catch (const ss::timed_out_error&) { + BOOST_FAIL("Timed out"); + } +} diff --git a/src/v/cloud_storage_clients/tests/client_pool_test.cc b/src/v/cloud_storage_clients/tests/client_pool_test.cc new file mode 100644 index 0000000000000..a3bc2e2381ae0 --- /dev/null +++ b/src/v/cloud_storage_clients/tests/client_pool_test.cc @@ -0,0 +1,91 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_storage_clients/client_pool.h" +#include "seastarx.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +using namespace std::chrono_literals; + +ss::logger test_log("test-log"); +static const uint16_t httpd_port_number = 4434; +static constexpr const char* httpd_host_name = "127.0.0.1"; + +static cloud_storage_clients::s3_configuration transport_configuration() { + net::unresolved_address server_addr(httpd_host_name, httpd_port_number); + cloud_storage_clients::s3_configuration conf; + conf.uri = cloud_storage_clients::access_point_uri(httpd_host_name); + conf.access_key = cloud_roles::public_key_str("acess-key"); + conf.secret_key = cloud_roles::private_key_str("secret-key"); + conf.region = cloud_roles::aws_region_name("us-east-1"); + conf.server_addr = server_addr; + conf._probe = ss::make_shared( + net::metrics_disabled::yes, + net::public_metrics_disabled::yes, + cloud_roles::aws_region_name{"region"}, + cloud_storage_clients::endpoint_url{"endpoint"}); + return conf; +} + +SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_abortable) { + auto sconf = ss::sharded_parameter([] { + auto conf = transport_configuration(); + return conf; + }); + auto conf = transport_configuration(); + + ss::sharded pool; + size_t num_connections_per_shard = 0; + pool + .start( + num_connections_per_shard, + sconf, + cloud_storage_clients::client_pool_overdraft_policy::borrow_if_empty) + .get(); + + pool + .invoke_on_all([&conf](cloud_storage_clients::client_pool& p) { + auto cred = cloud_roles::aws_credentials{ + conf.access_key.value(), + conf.secret_key.value(), + std::nullopt, + conf.region}; + p.load_credentials(cred); + }) + .get(); + auto pool_stop = ss::defer([&pool] { pool.stop().get(); }); + + ss::abort_source as; + + auto f = pool.local().acquire(as); + while (!pool.local().has_waiters()) { + ss::yield().get(); + } + + BOOST_TEST_REQUIRE( + !f.available(), "acquire should be blocked as pool is empty"); + + as.request_abort(); + + BOOST_REQUIRE_THROW(f.get(), ss::abort_requested_exception); +} diff --git a/src/v/cloud_storage_clients/tests/s3_client_test.cc b/src/v/cloud_storage_clients/tests/s3_client_test.cc index f8bd6a4c98ca0..101c13653ae86 100644 --- a/src/v/cloud_storage_clients/tests/s3_client_test.cc +++ b/src/v/cloud_storage_clients/tests/s3_client_test.cc @@ -299,7 +299,7 @@ struct configured_test_pair { ss::shared_ptr client; }; -cloud_storage_clients::s3_configuration transport_configuration() { +static cloud_storage_clients::s3_configuration transport_configuration() { net::unresolved_address server_addr(httpd_host_name, httpd_port_number); cloud_storage_clients::s3_configuration conf; conf.uri = cloud_storage_clients::access_point_uri(httpd_host_name);