Skip to content

Commit eba8cfe

Browse files
author
Leonardo Parente
authored
Flow Metrics Enrichment (#464)
1 parent 1244d92 commit eba8cfe

File tree

4 files changed

+217
-21
lines changed

4 files changed

+217
-21
lines changed

golang/pkg/client/types.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ type FlowPayload struct {
188188
Ipv4 int64 `mapstructure:"ipv4"`
189189
Ipv6 int64 `mapstructure:"ipv6"`
190190
OtherL4 int64 `mapstructure:"other_l4"`
191-
PayloadSize Quantiles `mapstructure:"payload_size"`
192191
TCP int64 `mapstructure:"tcp"`
193192
UDP int64 `mapstructure:"udp"`
194193
TopGeoLocBytes []NameCount `mapstructure:"top_geoLoc_bytes"`
@@ -201,10 +200,10 @@ type FlowPayload struct {
201200
TopDstIpsPackets []NameCount `mapstructure:"top_dst_ips_packets"`
202201
TopDstPortsBytes []NameCount `mapstructure:"top_dst_ports_bytes"`
203202
TopDstPortsPackets []NameCount `mapstructure:"top_dst_ports_packets"`
204-
TopInIfIndexBytes []NameCount `mapstructure:"top_in_if_index_bytes"`
205-
TopInIfIndexPackets []NameCount `mapstructure:"top_in_if_index_packets"`
206-
TopOutIfIndexBytes []NameCount `mapstructure:"top_out_if_index_bytes"`
207-
TopOutIfIndexPackets []NameCount `mapstructure:"top_out_if_index_packets"`
203+
TopInInterfacesBytes []NameCount `mapstructure:"top_in_interfaces_bytes"`
204+
TopInInterfacesPackets []NameCount `mapstructure:"top_in_interfaces_packets"`
205+
TopOutInterfacesBytes []NameCount `mapstructure:"top_out_interfaces_bytes"`
206+
TopOutInterfacesPackets []NameCount `mapstructure:"top_out_interfaces_packets"`
208207
TopSrcIpsAndPortBytes []NameCount `mapstructure:"top_src_ips_and_port_bytes"`
209208
TopSrcIpsAndPortPackets []NameCount `mapstructure:"top_src_ips_and_port_packets"`
210209
TopConversationsBytes []NameCount `mapstructure:"top_conversations_bytes"`

src/handlers/flow/FlowStreamHandler.cpp

+129-10
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,24 @@
1919

2020
namespace visor::handler::flow {
2121

22+
template <typename Out>
23+
static void split(const std::string &s, char delim, Out result)
24+
{
25+
std::stringstream ss;
26+
ss.str(s);
27+
std::string item;
28+
while (std::getline(ss, item, delim)) {
29+
*(result++) = item;
30+
}
31+
}
32+
33+
static std::vector<std::string> split(const std::string &s, char delim)
34+
{
35+
std::vector<std::string> elems;
36+
split(s, delim, std::back_inserter(elems));
37+
return elems;
38+
}
39+
2240
FlowStreamHandler::FlowStreamHandler(const std::string &name, InputEventProxy *proxy, const Configurable *window_config)
2341
: visor::StreamMetricsHandler<FlowMetricsManager>(name, window_config)
2442
, _sample_rate_scaling(true)
@@ -47,6 +65,54 @@ void FlowStreamHandler::start()
4765

4866
process_groups(_group_defs);
4967

68+
// Setup Configs
69+
if (config_exists("recorded_stream")) {
70+
_metrics->set_recorded_stream();
71+
}
72+
73+
EnrichMap enrich_data;
74+
if (config_exists("device_map")) {
75+
for (const auto &device_info : config_get<StringList>("device_map")) {
76+
std::vector<std::string> data = split(device_info, ',');
77+
if (data.size() < 2) {
78+
// should at least contain device name and ip
79+
continue;
80+
}
81+
DeviceEnrich *device{nullptr};
82+
if (auto it = enrich_data.find(data[1]); it != enrich_data.end()) {
83+
device = &it->second;
84+
} else {
85+
enrich_data[data[1]] = DeviceEnrich{data[0], {}};
86+
device = &enrich_data[data[1]];
87+
}
88+
if (data.size() < 4) {
89+
// should have interface information
90+
continue;
91+
}
92+
auto if_index = static_cast<uint32_t>(std::stol(data[3]));
93+
if (auto it = device->interfaces.find(if_index); it == device->interfaces.end()) {
94+
if (data.size() > 4) {
95+
device->interfaces[if_index] = InterfaceEnrich{data[2], data[4]};
96+
} else {
97+
device->interfaces[if_index] = InterfaceEnrich{data[2], std::string()};
98+
}
99+
}
100+
}
101+
}
102+
103+
std::unordered_map<std::string, std::string> concat_if;
104+
if (config_exists("first_filter_if_as_label") && config_get<bool>("first_filter_if_as_label") && config_exists("only_interfaces")) {
105+
concat_if["default"] = config_get<StringList>("only_interfaces")[0];
106+
auto interface = static_cast<uint32_t>(std::stoul(config_get<StringList>("only_interfaces")[0]));
107+
for (const auto &data : enrich_data) {
108+
auto it = data.second.interfaces.find(interface);
109+
if (it != data.second.interfaces.end()) {
110+
concat_if[data.first] = it->second.name;
111+
}
112+
}
113+
}
114+
_metrics->set_enrich_data(std::move(concat_if), std::move(enrich_data));
115+
50116
// Setup Filters
51117
if (config_exists("only_ips")) {
52118
_parse_host_specs(config_get<StringList>("only_ips"));
@@ -80,10 +146,6 @@ void FlowStreamHandler::start()
80146
_sample_rate_scaling = false;
81147
}
82148

83-
if (config_exists("recorded_stream")) {
84-
_metrics->set_recorded_stream();
85-
}
86-
87149
if (_flow_proxy) {
88150
_sflow_connection = _flow_proxy->sflow_signal.connect(&FlowStreamHandler::process_sflow_cb, this);
89151
_netflow_connection = _flow_proxy->netflow_signal.connect(&FlowStreamHandler::process_netflow_cb, this);
@@ -471,7 +533,6 @@ void FlowMetricsBucket::specialized_merge(const AbstractMetricsBucket &o, Metric
471533

472534
for (const auto &device : other._devices_metrics) {
473535
const auto &deviceId = device.first;
474-
const auto &device_data = device.second;
475536

476537
if (group_enabled(group::FlowMetrics::Counters)) {
477538
_devices_metrics[deviceId]->counters.UDP += device.second->counters.UDP;
@@ -542,7 +603,22 @@ void FlowMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap a
542603

543604
for (const auto &device : _devices_metrics) {
544605
auto device_labels = add_labels;
545-
device_labels["device"] = device.first;
606+
auto deviceId = device.first;
607+
DeviceEnrich *dev{nullptr};
608+
if (_enrich_data) {
609+
if (auto it = _enrich_data->find(deviceId); it != _enrich_data->end()) {
610+
dev = &it->second;
611+
deviceId = it->second.name;
612+
}
613+
}
614+
device_labels["device"] = deviceId;
615+
if (_concat_if) {
616+
if (auto it = _concat_if->find(device.first); (it != _concat_if->end()) && !it->second.empty()) {
617+
device_labels["device_interface"] = deviceId + "|" + it->second;
618+
} else {
619+
device_labels["device_interface"] = deviceId + "|" + _concat_if->at("default");
620+
}
621+
}
546622

547623
if (group_enabled(group::FlowMetrics::Counters)) {
548624
device.second->counters.UDP.to_prometheus(out, device_labels);
@@ -574,8 +650,22 @@ void FlowMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap a
574650
if (group_enabled(group::FlowMetrics::Conversations)) {
575651
device.second->topByBytes.topConversations.to_prometheus(out, device_labels);
576652
}
577-
device.second->topByBytes.topInIfIndex.to_prometheus(out, device_labels, [](const uint32_t &val) { return std::to_string(val); });
578-
device.second->topByBytes.topOutIfIndex.to_prometheus(out, device_labels, [](const uint32_t &val) { return std::to_string(val); });
653+
device.second->topByBytes.topInIfIndex.to_prometheus(out, device_labels, [dev](const uint32_t &val) {
654+
if (dev) {
655+
if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) {
656+
return it->second.name;
657+
}
658+
}
659+
return std::to_string(val);
660+
});
661+
device.second->topByBytes.topOutIfIndex.to_prometheus(out, device_labels, [dev](const uint32_t &val) {
662+
if (dev) {
663+
if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) {
664+
return it->second.name;
665+
}
666+
}
667+
return std::to_string(val);
668+
});
579669
if (group_enabled(group::FlowMetrics::TopGeo)) {
580670
device.second->topByBytes.topGeoLoc.to_prometheus(out, device_labels);
581671
device.second->topByBytes.topASN.to_prometheus(out, device_labels);
@@ -613,6 +703,21 @@ void FlowMetricsBucket::to_json(json &j) const
613703

614704
for (const auto &device : _devices_metrics) {
615705
auto deviceId = device.first;
706+
DeviceEnrich *dev{nullptr};
707+
if (_enrich_data) {
708+
auto it = _enrich_data->find(deviceId);
709+
if (it != _enrich_data->end()) {
710+
dev = &it->second;
711+
deviceId = it->second.name;
712+
}
713+
}
714+
if (_concat_if) {
715+
if (auto it = _concat_if->find(device.first); (it != _concat_if->end()) && !it->second.empty()) {
716+
deviceId += "|" + it->second;
717+
} else {
718+
deviceId += "|" + _concat_if->at("default");
719+
}
720+
}
616721

617722
if (group_enabled(group::FlowMetrics::Counters)) {
618723
device.second->counters.UDP.to_json(j["devices"][deviceId]);
@@ -644,8 +749,22 @@ void FlowMetricsBucket::to_json(json &j) const
644749
if (group_enabled(group::FlowMetrics::Conversations)) {
645750
device.second->topByBytes.topConversations.to_json(j["devices"][deviceId]);
646751
}
647-
device.second->topByBytes.topInIfIndex.to_json(j["devices"][deviceId], [](const uint32_t &val) { return std::to_string(val); });
648-
device.second->topByBytes.topOutIfIndex.to_json(j["devices"][deviceId], [](const uint32_t &val) { return std::to_string(val); });
752+
device.second->topByBytes.topInIfIndex.to_json(j["devices"][deviceId], [dev](const uint32_t &val) {
753+
if (dev) {
754+
if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) {
755+
return it->second.name;
756+
}
757+
}
758+
return std::to_string(val);
759+
});
760+
device.second->topByBytes.topOutIfIndex.to_json(j["devices"][deviceId], [dev](const uint32_t &val) {
761+
if (dev) {
762+
if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) {
763+
return it->second.name;
764+
}
765+
}
766+
return std::to_string(val);
767+
});
649768
if (group_enabled(group::FlowMetrics::TopGeo)) {
650769
device.second->topByBytes.topGeoLoc.to_json(j["devices"][deviceId]);
651770
device.second->topByBytes.topASN.to_json(j["devices"][deviceId]);

src/handlers/flow/FlowStreamHandler.h

+49-6
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@ enum FlowMetrics : visor::MetricGroupIntType {
3434
};
3535
}
3636

37+
struct InterfaceEnrich {
38+
std::string name;
39+
std::string descr;
40+
};
41+
42+
struct DeviceEnrich {
43+
std::string name;
44+
std::unordered_map<uint32_t, InterfaceEnrich> interfaces;
45+
};
46+
47+
typedef std::unordered_map<std::string, DeviceEnrich> EnrichMap;
48+
3749
struct FlowData {
3850
bool is_ipv6;
3951
IP_PROTOCOL l4;
@@ -83,8 +95,8 @@ struct FlowTopN {
8395
, topSrcIPandPort(FLOW_SCHEMA, "ip_port", {"top_src_ips_and_port_" + metric}, "Top source IP addresses and port by " + metric)
8496
, topDstIPandPort(FLOW_SCHEMA, "ip_port", {"top_dst_ips_and_port_" + metric}, "Top destination IP addresses and port by " + metric)
8597
, topConversations(FLOW_SCHEMA, "conversations", {"top_conversations_" + metric}, "Top source IP addresses and port by " + metric)
86-
, topInIfIndex(FLOW_SCHEMA, "index", {"top_in_if_index_" + metric}, "Top input interface indexes by " + metric)
87-
, topOutIfIndex(FLOW_SCHEMA, "index", {"top_out_if_index_" + metric}, "Top output interface indexes by " + metric)
98+
, topInIfIndex(FLOW_SCHEMA, "interface", {"top_in_interfaces_" + metric}, "Top input interfaces by " + metric)
99+
, topOutIfIndex(FLOW_SCHEMA, "interface", {"top_out_interfaces_" + metric}, "Top output interfaces by " + metric)
88100
, topGeoLoc(FLOW_SCHEMA, "geo_loc", {"top_geoLoc_" + metric}, "Top GeoIP locations by " + metric)
89101
, topASN(FLOW_SCHEMA, "asn", {"top_ASN_" + metric}, "Top ASNs by IP by " + metric)
90102
{
@@ -157,10 +169,10 @@ struct FlowDevice {
157169

158170
class FlowMetricsBucket final : public visor::AbstractMetricsBucket
159171
{
160-
161172
protected:
162173
mutable std::shared_mutex _mutex;
163-
174+
EnrichMap *_enrich_data{nullptr};
175+
std::unordered_map<std::string, std::string> *_concat_if{nullptr};
164176
struct counters {
165177
Counter filtered;
166178
Counter total;
@@ -173,8 +185,6 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket
173185
counters _counters;
174186
size_t _topn_count{10};
175187
uint64_t _topn_percentile_threshold{0};
176-
177-
using InterfacePair = std::pair<uint32_t, uint32_t>;
178188
// <DeviceId, FlowDevice>
179189
std::map<std::string, std::unique_ptr<FlowDevice>> _devices_metrics;
180190

@@ -203,6 +213,12 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket
203213
_topn_percentile_threshold = percentile_threshold;
204214
}
205215

216+
inline void set_enrich_data(std::unordered_map<std::string, std::string> *concat, EnrichMap *enrich_data)
217+
{
218+
_concat_if = concat;
219+
_enrich_data = enrich_data;
220+
}
221+
206222
inline void process_filtered(uint64_t filtered)
207223
{
208224
std::unique_lock lock(_mutex);
@@ -213,19 +229,46 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket
213229

214230
class FlowMetricsManager final : public visor::AbstractMetricsManager<FlowMetricsBucket>
215231
{
232+
EnrichMap _enrich_data;
233+
std::unordered_map<std::string, std::string> _concat_if;
234+
216235
public:
217236
FlowMetricsManager(const Configurable *window_config)
218237
: visor::AbstractMetricsManager<FlowMetricsBucket>(window_config)
219238
{
220239
}
221240

241+
inline void set_enrich_data(std::unordered_map<std::string, std::string> concat, EnrichMap enrich_data)
242+
{
243+
_concat_if = concat;
244+
_enrich_data = enrich_data;
245+
if (!_concat_if.empty() && !_enrich_data.empty()) {
246+
live_bucket()->set_enrich_data(&_concat_if, &_enrich_data);
247+
} else if (!_concat_if.empty()) {
248+
live_bucket()->set_enrich_data(&_concat_if, nullptr);
249+
} else if (!_concat_if.empty()) {
250+
live_bucket()->set_enrich_data(nullptr, &_enrich_data);
251+
}
252+
}
253+
222254
inline void process_filtered(timespec stamp, uint64_t filtered)
223255
{
224256
// base event, no sample
225257
new_event(stamp, false);
226258
live_bucket()->process_filtered(filtered);
227259
}
228260
void process_flow(const FlowPacket &payload);
261+
262+
void on_period_shift([[maybe_unused]] timespec stamp, [[maybe_unused]] const FlowMetricsBucket *maybe_expiring_bucket) override
263+
{
264+
if (!_concat_if.empty() && !_enrich_data.empty()) {
265+
live_bucket()->set_enrich_data(&_concat_if, &_enrich_data);
266+
} else if (!_concat_if.empty()) {
267+
live_bucket()->set_enrich_data(&_concat_if, nullptr);
268+
} else if (!_concat_if.empty()) {
269+
live_bucket()->set_enrich_data(nullptr, &_enrich_data);
270+
}
271+
}
229272
};
230273

231274
class FlowStreamHandler final : public visor::StreamMetricsHandler<FlowMetricsManager>

src/handlers/flow/tests/test_flows.cpp

+35
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,41 @@ TEST_CASE("Parse sflow stream", "[sflow][flow]")
4949
CHECK(j["devices"]["192.168.0.13"]["top_src_ips_and_port_bytes"][0]["name"] == "10.4.1.2:57420");
5050
}
5151

52+
TEST_CASE("Parse sflow with enrichment", "[sflow][flow]")
53+
{
54+
FlowInputStream stream{"sflow-test"};
55+
stream.config_set("flow_type", "sflow");
56+
stream.config_set("pcap_file", "tests/fixtures/ecmp.pcap");
57+
58+
visor::Config c;
59+
auto stream_proxy = stream.add_event_proxy(c);
60+
c.config_set<uint64_t>("num_periods", 1);
61+
FlowStreamHandler flow_handler{"flow-test", stream_proxy, &c};
62+
flow_handler.config_set<visor::Configurable::StringList>("device_map", {"route1,192.168.0.11,eth0,37,provide Y", "route2,192.168.0.12,eth3,4"});
63+
flow_handler.config_set<visor::Configurable::StringList>("only_interfaces", {"37", "4", "52"});
64+
flow_handler.config_set<bool>("first_filter_if_as_label", true);
65+
66+
flow_handler.start();
67+
stream.start();
68+
stream.stop();
69+
flow_handler.stop();
70+
71+
auto counters = flow_handler.metrics()->bucket(0)->counters();
72+
auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked();
73+
74+
// confirmed with wireshark
75+
CHECK(event_data.num_events->value() == 9279);
76+
CHECK(event_data.num_samples->value() == 9279);
77+
CHECK(counters.filtered.value() == 8573);
78+
CHECK(counters.total.value() == 44212);
79+
80+
nlohmann::json j;
81+
flow_handler.metrics()->bucket(0)->to_json(j);
82+
CHECK(j["devices"]["route1|eth0"]["top_in_interfaces_bytes"][0]["name"] == "eth0");
83+
CHECK(j["devices"]["route2|37"]["top_in_interfaces_bytes"][0]["name"] == "eth3");
84+
CHECK(j["devices"]["192.168.0.13|37"]["top_in_interfaces_bytes"][0]["name"] == "52");
85+
}
86+
5287
TEST_CASE("Parse sflow stream without sampling", "[sflow][flow]")
5388
{
5489

0 commit comments

Comments
 (0)