Skip to content

Commit

Permalink
net: Add bytes sent/received metrics
Browse files Browse the repository at this point in the history
Adds two basic metrics that track bytes sent and received from the
networking stack.

This can give a better per application insight than what host wide
metrics (for example via node exporter) can do.

Further it can serve as a comparison point with host based metrics for
possible write amplification (though this is less likely on the network
side).

The patch follows a similar pattern as to how memory and disk based
metrics work.

Note one might be inclined to introduce some grouping into the metrics
in relation to either the interface or source IP similar to how we have
mountpoints and/or groups on the disk side.

While this sounds useful at first in practice it would be less useful.
Often for the major cloud providers and similar in self hosted
environments there is only a single interface/source-IP and routing
happens at a later point (switches, routers etc.). Further, adding this
separation would make the implementation more expensive in either
compute or memory space.

We link the metrics to the cpu scheduling group which allows for getting
a more detailed picture of where the network usage is coming from.

Further it sets up for future network scheduling at the group level.
  • Loading branch information
StephanDollberg committed Jan 31, 2025
1 parent 808766d commit ed98fa8
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 2 deletions.
18 changes: 18 additions & 0 deletions include/seastar/net/api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,19 @@ public:
bool supports_ipv6() const;
};

struct statistics {
uint64_t bytes_sent = 0;
uint64_t bytes_received = 0;
};

namespace metrics {
class metric_groups;
class label_instance;
}

void register_net_metrics_for_scheduling_group(
metrics::metric_groups& m, unsigned sg_id, const metrics::label_instance& name);

class network_stack {
public:
virtual ~network_stack() {}
Expand All @@ -468,6 +481,11 @@ public:
return false;
}

// Return network stats (bytes sent/received etc.) for this stack and scheduling group
virtual statistics stats(unsigned scheduling_group_id) = 0;
// Clears the stats for this stack and scheduling group
virtual void clear_stats(unsigned scheduling_group_id) = 0;

/**
* Returns available network interfaces. This represents a
* snapshot of interfaces available at call time, hence the
Expand Down
2 changes: 2 additions & 0 deletions include/seastar/net/posix-stack.hh
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ public:
virtual bool has_per_core_namespace() override { return _reuseport; };
bool supports_ipv6() const override;
std::vector<network_interface> network_interfaces() override;
virtual statistics stats(unsigned scheduling_group_id) override;
virtual void clear_stats(unsigned scheduling_group_id) override;
};

class posix_ap_network_stack : public posix_network_stack {
Expand Down
4 changes: 3 additions & 1 deletion src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,9 @@ reactor::task_queue::register_stats() {
}, sm::description("Total amount in milliseconds we were in violation of the task quota"),
{group_label}),
});

register_net_metrics_for_scheduling_group(new_metrics, _id, group_label);

_metrics = std::exchange(new_metrics, {});
}

Expand Down Expand Up @@ -2560,7 +2563,6 @@ void reactor::register_metrics() {
sm::make_counter("abandoned_failed_futures", _abandoned_failed_futures, sm::description("Total number of abandoned failed futures, futures destroyed while still containing an exception")),
});

namespace sm = seastar::metrics;
_metric_groups.add_group("reactor", {
sm::make_counter("fstream_reads", _io_stats.fstream_reads,
sm::description(
Expand Down
15 changes: 15 additions & 0 deletions src/net/native-stack-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ namespace seastar {

extern logger seastar_logger;

namespace internal {

namespace native_stack_net_stats {

static inline thread_local std::array<uint64_t, max_scheduling_groups()> bytes_sent = {};
static inline thread_local std::array<uint64_t, max_scheduling_groups()> bytes_received = {};

};

}

namespace net {

using namespace seastar;
Expand Down Expand Up @@ -172,6 +183,8 @@ public:
}
return _conn->wait_for_data().then([this] {
_buf = _conn->read();
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
internal::native_stack_net_stats::bytes_received[sg_id] += _buf.len();
_cur_frag = 0;
_eof = !_buf.len();
return get();
Expand All @@ -193,6 +206,8 @@ public:
: _conn(std::move(conn)) {}
using data_sink_impl::put;
virtual future<> put(packet p) override {
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
internal::native_stack_net_stats::bytes_sent[sg_id] += p.len();
return _conn->send(std::move(p));
}
virtual future<> close() override {
Expand Down
14 changes: 13 additions & 1 deletion src/net/native-stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ class native_network_stack : public network_stack {
friend class native_network_interface;

std::vector<network_interface> network_interfaces() override;

virtual statistics stats(unsigned scheduling_group_id) override {
return statistics{
internal::native_stack_net_stats::bytes_sent[scheduling_group_id],
internal::native_stack_net_stats::bytes_received[scheduling_group_id],
};
}

virtual void clear_stats(unsigned scheduling_group_id) override {
internal::native_stack_net_stats::bytes_sent[scheduling_group_id] = 0;
internal::native_stack_net_stats::bytes_received[scheduling_group_id] = 0;
}
};

thread_local promise<std::unique_ptr<network_stack>> native_network_stack::ready_promise;
Expand Down Expand Up @@ -427,6 +439,6 @@ std::vector<network_interface> native_network_stack::network_interfaces() {
return res;
}

}
} // namespace net

}
27 changes: 27 additions & 0 deletions src/net/posix-stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ copy_reinterpret_cast(const void* ptr) {
return tmp;
}

thread_local std::array<uint64_t, seastar::max_scheduling_groups()> bytes_sent = {};
thread_local std::array<uint64_t, seastar::max_scheduling_groups()> bytes_received = {};

}

namespace seastar {
Expand Down Expand Up @@ -637,6 +640,8 @@ posix_data_source_impl::get() {
_config.buffer_size /= 2;
_config.buffer_size = std::max(_config.buffer_size, _config.min_buffer_size);
}
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
bytes_received[sg_id] += b.size();
return b;
});
}
Expand Down Expand Up @@ -671,12 +676,16 @@ std::vector<iovec> to_iovec(std::vector<temporary_buffer<char>>& buf_vec) {

future<>
posix_data_sink_impl::put(temporary_buffer<char> buf) {
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
bytes_sent[sg_id] += buf.size();
return _fd.write_all(buf.get(), buf.size()).then([d = buf.release()] {});
}

future<>
posix_data_sink_impl::put(packet p) {
_p = std::move(p);
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
bytes_sent[sg_id] += _p.len();
return _fd.write_all(_p).then([this] { _p.reset(); });
}

Expand Down Expand Up @@ -876,13 +885,17 @@ future<> posix_datagram_channel::send(const socket_address& dst, const char *mes
auto len = strlen(message);
auto a = dst;
resolve_outgoing_address(a);
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
bytes_sent[sg_id] += len;
return _fd.sendto(a, message, len)
.then([len] (size_t size) { assert(size == len); });
}

future<> posix_datagram_channel::send(const socket_address& dst, packet p) {
auto len = p.len();
_send.prepare(dst, std::move(p));
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
bytes_sent[sg_id] += len;
return _fd.sendmsg(&_send._hdr)
.then([len] (size_t size) { assert(size == len); });
}
Expand Down Expand Up @@ -954,6 +967,8 @@ posix_datagram_channel::receive() {
break;
}
}
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
bytes_received[sg_id] += size;
return make_ready_future<datagram>(datagram(std::make_unique<posix_datagram>(
_recv._src_addr, dst ? *dst : _address, packet(fragment{_recv._buffer, size}, make_deleter([buf = _recv._buffer] { delete[] buf; })))));
}).handle_exception([p = _recv._buffer](auto ep) {
Expand Down Expand Up @@ -1199,6 +1214,18 @@ std::vector<network_interface> posix_network_stack::network_interfaces() {
return std::vector<network_interface>(thread_local_interfaces.begin(), thread_local_interfaces.end());
}

statistics posix_network_stack::stats(unsigned scheduling_group_id) {
return statistics{
bytes_sent[scheduling_group_id],
bytes_received[scheduling_group_id],
};
}

void posix_network_stack::clear_stats(unsigned scheduling_group_id) {
bytes_sent[scheduling_group_id] = 0;
bytes_received[scheduling_group_id] = 0;
}

}

}
19 changes: 19 additions & 0 deletions src/net/stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ module;
#ifdef SEASTAR_MODULE
module seastar;
#else
#include <seastar/core/metrics_api.hh>
#include <seastar/core/reactor.hh>
#include <seastar/net/stack.hh>
#include <seastar/net/inet_address.hh>
#endif
Expand Down Expand Up @@ -286,4 +288,21 @@ std::vector<network_interface> network_stack::network_interfaces() {
return {};
}

void register_net_metrics_for_scheduling_group(
metrics::metric_groups &metrics, unsigned sg_id, const metrics::label_instance& name) {
namespace sm = seastar::metrics;
metrics.add_group("network", {
sm::make_counter("bytes_sent", [sg_id] { return engine().net().stats(sg_id).bytes_sent; },
sm::description("Counts the number of bytes written to network sockets."), {name}),
sm::make_counter("bytes_received", [sg_id] { return engine().net().stats(sg_id).bytes_received; },
sm::description("Counts the number of bytes received from network sockets."), {name}),
});

// need to clear stats in case we recreated a SG with the same id
// but avoid during reactor startup
if (engine_is_ready()) {
engine().net().clear_stats(sg_id);
}
}

}

0 comments on commit ed98fa8

Please sign in to comment.