Skip to content

Commit

Permalink
Merge pull request #15681 from nvartolomei/nv/client-pool-metastable
Browse files Browse the repository at this point in the history
cloud_storage_clients: stricter but simpler invariants for the pool
  • Loading branch information
nvartolomei authored Dec 19, 2023
2 parents 5f34eed + a6111c0 commit 037380b
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 50 deletions.
104 changes: 66 additions & 38 deletions src/v/cloud_storage_clients/client_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
#include "cloud_storage_clients/abs_client.h"
#include "cloud_storage_clients/logger.h"
#include "cloud_storage_clients/s3_client.h"
#include "model/timeout_clock.h"
#include "ssx/future-util.h"

#include <seastar/core/smp.hh>

#include <algorithm>
#include <optional>
#include <random>
#include <utility>

namespace {
constexpr auto self_configure_attempts = 3;
Expand Down Expand Up @@ -169,6 +172,8 @@ ss::future<> client_pool::stop() {
_cvar.broken();
_self_config_barrier.broken();
_credentials_var.broken();
// Wait for all background operations to complete.
co_await _bg_gate.close();
// Wait until all leased objects are returned
co_await _gate.close();

Expand Down Expand Up @@ -238,7 +243,10 @@ std::tuple<unsigned int, unsigned int> pick_two_random_shards() {
ss::future<client_pool::client_lease>
client_pool::acquire(ss::abort_source& as) {
auto guard = _gate.hold();

std::optional<unsigned int> source_sid;
std::optional<http_client_ptr> client;

try {
// If credentials have not yet been acquired, wait for them. It is
// possible that credentials are not initialized right after remote
Expand All @@ -251,29 +259,34 @@ client_pool::acquire(ss::abort_source& as) {
u = co_await ss::get_units(_self_config_barrier, 1);
}

while (unlikely(
_pool.empty() && !_gate.is_closed() && !_as.abort_requested())) {
if (
while (!client.has_value() && !_gate.is_closed()
&& !_as.abort_requested()) {
if (likely(!_pool.empty())) {
client = _pool.back();
_pool.pop_back();
} else if (
ss::smp::count == 1
|| _policy == client_pool_overdraft_policy::wait_if_empty
|| _leased.size() >= _capacity * 2) {
// If borrowing is disabled or this shard borrowed '_capacity'
// client connections then wait util one of the clients is
// freed.
co_await _cvar.wait();
co_await ssx::with_timeout_abortable(
_cvar.wait(), model::no_timeout, as);

vlog(
pool_log.debug,
"cvar triggered, pool size: {}",
_pool.size());
} else {
// Try borrowing from peer shard.
auto clients_in_use = [](client_pool& other) {
return std::clamp(
other._capacity - other._pool.size(),
0UL,
other._capacity);
};
// Borrow from random shard. Use 2-random approach. Pick 2
// random shards
// Use 2-random approach. Pick 2 random shards
auto [sid1, sid2] = pick_two_random_shards();
auto cnt1 = co_await container().invoke_on(
sid1, clients_in_use);
Expand All @@ -299,15 +312,16 @@ client_pool::acquire(ss::abort_source& as) {
}
// Depending on the result either wait or create new connection
if (success) {
vlog(pool_log.debug, "successfuly borrowed from {}", sid);
vlog(pool_log.debug, "successfully borrowed from {}", sid);
if (_probe) {
_probe->register_borrow();
}
source_sid = sid;
_pool.emplace_back(make_client());
client = make_client();
} else {
vlog(pool_log.debug, "can't borrow connection, waiting");
co_await _cvar.wait();
co_await ssx::with_timeout_abortable(
_cvar.wait(), model::no_timeout, as);
vlog(
pool_log.debug,
"cvar triggered, pool size: {}",
Expand All @@ -320,9 +334,7 @@ client_pool::acquire(ss::abort_source& as) {
if (_gate.is_closed() || _as.abort_requested()) {
throw ss::gate_closed_exception();
}
vassert(!_pool.empty(), "'acquire' invariant is broken");
auto client = _pool.back();
_pool.pop_back();
vassert(client.has_value(), "'acquire' invariant is broken");

update_usage_stats();
vlog(
Expand All @@ -337,27 +349,41 @@ client_pool::acquire(ss::abort_source& as) {
}

client_lease lease(
client,
client.value(),
as,
ss::make_deleter([pool = weak_from_this(),
client,
client = client.value(),
g = std::move(guard),
source_sid]() mutable {
if (pool) {
if (source_sid.has_value()) {
vlog(
pool_log.debug, "disposing the borrowed client connection");
// Since the client was borrowed we can't just add it back to
// the pool. This will lead to a situation when the connection
// simultaneously exists on two different shards.
// If all clients from the local pool are in-use we will
// shutdown the borrowed one and return the "accounting unit"
// to the source shard.
// Otherwise, we replace the oldest client in the
// pool to improve connection reuse.
if (!pool->_pool.empty()) {
vlog(
pool_log.debug,
"disposing the the oldest client connection and "
"replacing it with the borrowed one");
pool->_pool.push_back(std::move(client));
client = std::move(pool->_pool.front());
pool->_pool.pop_front();
} else {
vlog(
pool_log.debug,
"disposing the borrowed client connection");
}

client->shutdown();
ssx::spawn_with_gate(pool->_gate, [client] {
ssx::spawn_with_gate(pool->_bg_gate, [client] {
return client->stop().finally([client] {});
});
// In the background return the client to the connection pool
// of the source shard. The lifetime is guaranteed by the gate
// guard.
ssx::spawn_with_gate(pool->_gate, [&pool, source_sid] {
ssx::spawn_with_gate(pool->_bg_gate, [&pool, source_sid] {
return pool->container().invoke_on(
source_sid.value(),
[my_sid = ss::this_shard_id()](client_pool& other) {
Expand Down Expand Up @@ -408,39 +434,40 @@ bool client_pool::borrow_one(unsigned other) {
_pool.pop_back();
update_usage_stats();
c->shutdown();
ssx::spawn_with_gate(_gate, [c] { return c->stop().finally([c] {}); });
ssx::spawn_with_gate(_bg_gate, [c] { return c->stop().finally([c] {}); });
return true;
}

void client_pool::return_one(unsigned other) {
vlog(pool_log.debug, "shard {} returns a client", other);
if (_pool.size() + _leased.size() < _capacity) {
// The _pool has fewer elements than it should have because it was
// borrowed from previously.
_pool.emplace_back(make_client());
update_usage_stats();
vlog(
pool_log.debug,
"creating new client, current usage is {}/{}",
normalized_num_clients_in_use(),
_capacity);
_cvar.signal();
}
vassert(
_pool.size() < _capacity,
"tried to return a borrowed client but the pool is full");
// Cold clients are at the front. Hot clients are at the back.
_pool.emplace_front(make_client());
update_usage_stats();
vlog(
pool_log.debug,
"creating new client, current usage is {}/{}",
normalized_num_clients_in_use(),
_capacity);
_cvar.signal();
}

size_t client_pool::size() const noexcept { return _pool.size(); }

size_t client_pool::max_size() const noexcept { return _capacity; }

void client_pool::populate_client_pool() {
_pool.reserve(_capacity);
for (size_t i = 0; i < _capacity; i++) {
_pool.emplace_back(make_client());
}

_cvar.signal();
}

client_pool::http_client_ptr client_pool::make_client() const {
client_pool::http_client_ptr client_pool::make_client() const noexcept {
return std::visit(
[this](const auto& cfg) -> http_client_ptr {
using cfg_type = std::decay_t<decltype(cfg)>;
Expand All @@ -461,9 +488,10 @@ void client_pool::release(http_client_ptr leased) {
"releasing a client, pool size: {}, capacity: {}",
_pool.size(),
_capacity);
if (_pool.size() < _capacity) {
_pool.emplace_back(std::move(leased));
}
vassert(
_pool.size() < _capacity,
"tried to release a client but the pool is at capacity");
_pool.emplace_back(std::move(leased));
_cvar.signal();
}

Expand Down
15 changes: 13 additions & 2 deletions src/v/cloud_storage_clients/client_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "utils/intrusive_list_helpers.h"
#include "utils/stop_signal.h"

#include <seastar/core/circular_buffer.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
Expand Down Expand Up @@ -132,6 +133,12 @@ class client_pool

size_t max_size() const noexcept;

bool has_background_operations() const noexcept {
return _bg_gate.get_count() > 0;
}

bool has_waiters() const noexcept { return _cvar.has_waiters(); }

private:
ss::future<>
client_self_configure(std::optional<std::reference_wrapper<stop_signal>>
Expand All @@ -143,7 +150,7 @@ class client_pool
std::optional<client_self_configuration_output> result);

void populate_client_pool();
http_client_ptr make_client() const;
http_client_ptr make_client() const noexcept;
void release(http_client_ptr leased);

/// Return number of clients which wasn't utilized
Expand All @@ -162,12 +169,16 @@ class client_pool
client_configuration _config;
ss::shared_ptr<client_probe> _probe;
client_pool_overdraft_policy _policy;
std::vector<http_client_ptr> _pool;
ss::circular_buffer<http_client_ptr> _pool;
// List of all connections currently used by clients
intrusive_list<client_lease, &client_lease::_hook> _leased;
ss::condition_variable _cvar;
ss::abort_source _as;
ss::gate _gate;
// A gate for background operations. Most useful in testing where we want
// to wait all async housekeeping to complete before asserting state
// invariants.
ss::gate _bg_gate;

/// Holds and applies the credentials for requests to S3. Shared pointer to
/// enable rotating credentials to all clients.
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage_clients/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ rp_test(
BINARY_NAME cloud_storage_clients_single_thread
SOURCES
backend_detection_test.cc
client_pool_test.cc
s3_client_test.cc
xml_sax_parser_test.cc
exception_test.cc
Expand Down
Loading

0 comments on commit 037380b

Please sign in to comment.