Skip to content

Commit

Permalink
fix(kubernetes_log source): Set user-agent for k8s apiserver requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
oganel committed Nov 27, 2024
1 parent 68c2a19 commit b0cc62f
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 126 deletions.
246 changes: 149 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ hashbrown = { version = "0.14.5", default-features = false, optional = true, fea
headers = { version = "0.3.9", default-features = false }
hostname = { version = "0.4.0", default-features = false }
http = { version = "0.2.9", default-features = false }
http-1 = { package = "http", version = "1.0", default-features = false }
http-serde = "1.1.3"
http-body = { version = "0.4.5", default-features = false }
hyper = { version = "0.14.28", default-features = false, features = ["client", "runtime", "http1", "http2", "server", "stream"] }
Expand All @@ -317,8 +318,8 @@ indoc = { version = "2.0.5", default-features = false }
inventory = { version = "0.3.15", default-features = false }
ipnet = { version = "2", default-features = false, optional = true, features = ["serde", "std"] }
itertools = { version = "0.13.0", default-features = false, optional = false, features = ["use_alloc"] }
k8s-openapi = { version = "0.18.0", default-features = false, features = ["api", "v1_26"], optional = true }
kube = { version = "0.82.0", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true }
k8s-openapi = { version = "0.22.0", default-features = false, features = ["v1_26"], optional = true }
kube = { version = "0.93.0", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true }
listenfd = { version = "1.0.1", default-features = false, optional = true }
logfmt = { version = "0.0.2", default-features = false, optional = true }
lru = { version = "0.12.5", default-features = false, optional = true }
Expand Down
10 changes: 5 additions & 5 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ arc-swap,https://github.com/vorner/arc-swap,MIT OR Apache-2.0,Michal 'vorner' Va
arr_macro,https://github.com/JoshMcguigan/arr_macro,MIT OR Apache-2.0,Josh Mcguigan
arrayvec,https://github.com/bluss/arrayvec,MIT OR Apache-2.0,bluss
ascii,https://github.com/tomprogrammer/rust-ascii,Apache-2.0 OR MIT,"Thomas Bahn <[email protected]>, Torbjørn Birch Moltu <[email protected]>, Simon Sapin <[email protected]>"
async-broadcast,https://github.com/smol-rs/async-broadcast,MIT OR Apache-2.0,"Stjepan Glavina <[email protected]>, Yoshua Wuyts <[email protected]>, Zeeshan Ali Khan <[email protected]>"
async-channel,https://github.com/smol-rs/async-channel,Apache-2.0 OR MIT,Stjepan Glavina <[email protected]>
async-compression,https://github.com/Nullus157/async-compression,MIT OR Apache-2.0,"Wim Looman <[email protected]>, Allen Bui <[email protected]>"
async-executor,https://github.com/smol-rs/async-executor,Apache-2.0 OR MIT,Stjepan Glavina <[email protected]>
Expand Down Expand Up @@ -224,6 +225,7 @@ finl_unicode,https://github.com/dahosek/finl_unicode,MIT OR Apache-2.0,The finl_
flagset,https://github.com/enarx/flagset,Apache-2.0,Nathaniel McCallum <[email protected]>
flate2,https://github.com/rust-lang/flate2-rs,MIT OR Apache-2.0,"Alex Crichton <[email protected]>, Josh Triplett <[email protected]>"
float_eq,https://github.com/jtempest/float_eq-rs,MIT OR Apache-2.0,jtempest
fluent-uri,https://github.com/yescallop/fluent-uri-rs,MIT,Scallop Ye <[email protected]>
flume,https://github.com/zesterer/flume,Apache-2.0 OR MIT,Joshua Barretto <[email protected]>
fnv,https://github.com/servo/rust-fnv,Apache-2.0 OR MIT,Alex Crichton <[email protected]>
foldhash,https://github.com/orlp/foldhash,Zlib,Orson Peters <[email protected]>
Expand Down Expand Up @@ -325,15 +327,14 @@ jni,https://github.com/jni-rs/jni-rs,MIT OR Apache-2.0,Josh Chase <josh@prevoty.
jni-sys,https://github.com/sfackler/rust-jni-sys,MIT OR Apache-2.0,Steven Fackler <[email protected]>
js-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/js-sys,MIT OR Apache-2.0,The wasm-bindgen Developers
json-patch,https://github.com/idubrov/json-patch,MIT OR Apache-2.0,Ivan Dubrov <[email protected]>
jsonpath_lib,https://github.com/freestrings/jsonpath,MIT,Changseok Han <[email protected]>
k8s-openapi,https://github.com/Arnavion/k8s-openapi,Apache-2.0,Arnavion <[email protected]>
jsonpath-rust,https://github.com/besok/jsonpath-rust,MIT,BorisZhguchev <[email protected]>
jsonptr,https://github.com/chanced/jsonptr,MIT OR Apache-2.0,chance dinkins
k8s-openapi,https://github.com/Arnavion/k8s-openapi,Apache-2.0,Arnav Singh <[email protected]>
keccak,https://github.com/RustCrypto/sponges/tree/master/keccak,Apache-2.0 OR MIT,RustCrypto Developers
kqueue,https://gitlab.com/rust-kqueue/rust-kqueue,MIT,William Orr <[email protected]>
kqueue-sys,https://gitlab.com/rust-kqueue/rust-kqueue-sys,MIT,"William Orr <[email protected]>, Daniel (dmilith) Dettlaff <[email protected]>"
krb5-src,https://github.com/MaterializeInc/rust-krb5-src,Apache-2.0,"Materialize, Inc."
kube,https://github.com/kube-rs/kube,Apache-2.0,"clux <[email protected]>, Natalie Klestrup Röijezon <[email protected]>, kazk <[email protected]>"
kube-core,https://github.com/kube-rs/kube,Apache-2.0,"clux <[email protected]>, kazk <[email protected]>"
kube-runtime,https://github.com/kube-rs/kube,Apache-2.0,"Natalie Klestrup Röijezon <[email protected]>, clux <[email protected]>"
lalrpop-util,https://github.com/lalrpop/lalrpop,Apache-2.0 OR MIT,Niko Matsakis <[email protected]>
lapin,https://github.com/amqp-rs/lapin,MIT,"Geoffroy Couprie <[email protected]>, Marc-Antoine Perennou <[email protected]>"
lazy_static,https://github.com/rust-lang-nursery/lazy-static.rs,MIT OR Apache-2.0,Marvin Löbel <[email protected]>
Expand Down Expand Up @@ -632,7 +633,6 @@ tracing-log,https://github.com/tokio-rs/tracing,MIT,Tokio Contributors <team@tok
tracing-serde,https://github.com/tokio-rs/tracing,MIT,Tokio Contributors <[email protected]>
tracing-subscriber,https://github.com/tokio-rs/tracing,MIT,"Eliza Weisman <[email protected]>, David Barsky <[email protected]>, Tokio Contributors <[email protected]>"
tracing-tower,https://github.com/tokio-rs/tracing,MIT,Eliza Weisman <[email protected]>
treediff,https://github.com/Byron/treediff-rs,MIT OR Apache-2.0,Sebastian Thiel <[email protected]>
triomphe,https://github.com/Manishearth/triomphe,MIT OR Apache-2.0,"Manish Goregaokar <[email protected]>, The Servo Project Developers"
trust-dns-proto,https://github.com/bluejekyll/trust-dns,MIT OR Apache-2.0,Benjamin Fry <[email protected]>
trust-dns-resolver,https://github.com/bluejekyll/trust-dns,MIT OR Apache-2.0,Benjamin Fry <[email protected]>
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/21864_kubernetes_logs_user_agent.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `kubernetes_log` source now sets a user-agent when querying k8s apiserver.

authors: ganelo
3 changes: 3 additions & 0 deletions license-tool.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"openssl-macros" = { origin = "https://github.com/sfackler/rust-openssl" }
"serde_nanos" = { origin = "https://github.com/caspervonb/serde_nanos" }

# rust-license-tool can't find the license for jsonpath-rust 0.5.1
"jsonpath-rust" = { license = "MIT", origin = "https://github.com/besok/jsonpath-rust" }

# `ring` has a custom license that is mostly "ISC-style" but parts of it also fall under OpenSSL licensing.
"ring-0.16.20" = { license = "ISC AND Custom" }
"ring-0.17.5" = { license = "ISC AND Custom" }
Expand Down
2 changes: 2 additions & 0 deletions src/aws/auth.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Authentication settings for AWS components.
use std::time::Duration;

#[allow(deprecated)]
use aws_config::{
default_provider::credentials::DefaultCredentialsChain,
identity::IdentityCache,
Expand Down Expand Up @@ -275,6 +276,7 @@ impl AwsAuthentication {
} => {
let connector = super::connector(proxy, tls_options)?;

#[allow(deprecated)]
// The SDK uses the default profile out of the box, but doesn't provide an optional
// type in the builder. We can just hardcode it so that everything works.
let profile_files = ProfileFiles::builder()
Expand Down
67 changes: 50 additions & 17 deletions src/kubernetes/reflector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Intercept [`watcher::Event`]'s.
use std::{hash::Hash, time::Duration};
use std::{hash::Hash, sync::Arc, time::Duration};

use futures::StreamExt;
use futures_util::Stream;
Expand All @@ -26,31 +26,64 @@ pub async fn custom_reflector<K, W>(
{
pin!(stream);
let mut delay_queue = DelayQueue::default();
let mut init_buffer_meta = Vec::new();
loop {
tokio::select! {
result = stream.next() => {
match result {
Some(Ok(event)) => {
match event {
// Immediately reconcile `Applied` event
watcher::Event::Applied(ref obj) => {
trace!(message = "Processing Applied event.", ?event);
// Immediately reconcile `Apply` event
watcher::Event::Apply(ref obj) => {
trace!(message = "Processing Apply event.", event_type = std::any::type_name::<K>(), event = ?event);
store.apply_watcher_event(&event);
let meta_descr = MetaDescribe::from_meta(obj.meta());
meta_cache.store(meta_descr);
}
// Delay reconciling any `Deleted` events
watcher::Event::Deleted(ref obj) => {
// Delay reconciling any `Delete` events
watcher::Event::Delete(ref obj) => {
trace!(message = "Delaying processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
delay_queue.insert(event.to_owned(), delay_deletion);
let meta_descr = MetaDescribe::from_meta(obj.meta());
meta_cache.delete(&meta_descr);
}
// Clear all delayed events on `Restarted` events
watcher::Event::Restarted(_) => {
trace!(message = "Processing Restarted event.", ?event);
// Clear all delayed events on `Init` event
watcher::Event::Init => {
trace!(message = "Processing Init event.", event_type = std::any::type_name::<K>(), event = ?event);
delay_queue.clear();
store.apply_watcher_event(&event);
meta_cache.clear();
init_buffer_meta.clear();
}
// Immediately reconcile `InitApply` event (but buffer the obj ref so we can handle implied deletions on InitDone)
watcher::Event::InitApply(ref obj) => {
trace!(message = "Processing InitApply event.", event_type = std::any::type_name::<K>(), event = ?event);
store.apply_watcher_event(&event);
let meta_descr = MetaDescribe::from_meta(obj.meta());
meta_cache.store(meta_descr.clone());
init_buffer_meta.push(meta_descr.clone());
}
// Reconcile `InitApply` events and implied deletions
watcher::Event::InitDone => {
trace!(message = "Processing InitDone event.", event_type = std::any::type_name::<K>(), event = ?event);
store.apply_watcher_event(&event);


store.as_reader().state().into_iter()
// delay deleting objs that were added before but not during InitApply
.for_each(|obj| {
if let Some(inner) = Arc::into_inner(obj) {
let meta_descr = MetaDescribe::from_meta(inner.meta());
if !init_buffer_meta.contains(&meta_descr) {
let implied_deletion_event = watcher::Event::Delete(inner);
trace!(message = "Delaying processing implied deletion.", event_type = std::any::type_name::<K>(), event = ?implied_deletion_event);
delay_queue.insert(implied_deletion_event, delay_deletion);
meta_cache.delete(&meta_descr);
}
}
});

init_buffer_meta.clear();
}
}
},
Expand All @@ -69,10 +102,10 @@ pub async fn custom_reflector<K, W>(
Some(event) => {
let event = event.into_inner();
match event {
watcher::Event::Deleted(ref obj) => {
watcher::Event::Delete(ref obj) => {
let meta_desc = MetaDescribe::from_meta(obj.meta());
if !meta_cache.contains(&meta_desc) {
trace!(message = "Processing Deleted event.", ?event);
trace!(message = "Processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
store.apply_watcher_event(&event);
}
},
Expand Down Expand Up @@ -118,7 +151,7 @@ mod tests {
..ConfigMap::default()
};
let (mut tx, rx) = mpsc::channel::<_>(5);
tx.send(Ok(watcher::Event::Applied(cm.clone())))
tx.send(Ok(watcher::Event::Apply(cm.clone())))
.await
.unwrap();
let meta_cache = MetaCache::new();
Expand All @@ -144,10 +177,10 @@ mod tests {
..ConfigMap::default()
};
let (mut tx, rx) = mpsc::channel::<_>(5);
tx.send(Ok(watcher::Event::Applied(cm.clone())))
tx.send(Ok(watcher::Event::Apply(cm.clone())))
.await
.unwrap();
tx.send(Ok(watcher::Event::Deleted(cm.clone())))
tx.send(Ok(watcher::Event::Delete(cm.clone())))
.await
.unwrap();
let meta_cache = MetaCache::new();
Expand Down Expand Up @@ -178,13 +211,13 @@ mod tests {
..ConfigMap::default()
};
let (mut tx, rx) = mpsc::channel::<_>(5);
tx.send(Ok(watcher::Event::Applied(cm.clone())))
tx.send(Ok(watcher::Event::Apply(cm.clone())))
.await
.unwrap();
tx.send(Ok(watcher::Event::Deleted(cm.clone())))
tx.send(Ok(watcher::Event::Delete(cm.clone())))
.await
.unwrap();
tx.send(Ok(watcher::Event::Applied(cm.clone())))
tx.send(Ok(watcher::Event::Apply(cm.clone())))
.await
.unwrap();
let meta_cache = MetaCache::new();
Expand Down
14 changes: 9 additions & 5 deletions src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bytes::Bytes;
use chrono::Utc;
use futures::{future::FutureExt, stream::StreamExt};
use futures_util::Stream;
use http_1::{HeaderName, HeaderValue};
use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
use k8s_paths_provider::K8sPathsProvider;
use kube::{
Expand All @@ -34,7 +35,7 @@ use vector_lib::{
};
use vrl::value::{kind::Collection, Kind};

use crate::sources::kubernetes_logs::partial_events_merger::merge_partial_events;
use crate::{built_info::{PKG_NAME, PKG_VERSION}, sources::kubernetes_logs::partial_events_merger::merge_partial_events};
use crate::{
config::{
log_schema, ComponentKey, DataType, GenerateConfig, GlobalOptions, SourceConfig,
Expand Down Expand Up @@ -586,7 +587,7 @@ impl Source {
// If the user passed a custom Kubeconfig use it, otherwise
// we attempt to load the local kubeconfig, followed by the
// in-cluster environment variables
let client_config = match &config.kube_config_file {
let mut client_config = match &config.kube_config_file {
Some(kc) => {
ClientConfig::from_custom_kubeconfig(
config::Kubeconfig::read_from(kc)?,
Expand All @@ -596,6 +597,9 @@ impl Source {
}
None => ClientConfig::infer().await?,
};
if let Ok(user_agent) = HeaderValue::from_str(&format!("{}/{}", PKG_NAME, PKG_VERSION)) {
client_config.headers.push((HeaderName::from_static("user-agent"), user_agent));
}
let client = Client::try_from(client_config)?;

let data_dir = globals.resolve_and_make_data_subdir(config.data_dir.as_ref(), key.id())?;
Expand Down Expand Up @@ -695,7 +699,7 @@ impl Source {
..Default::default()
},
)
.backoff(watcher::default_backoff());
.backoff(watcher::DefaultBackoff::default());
let pod_store_w = reflector::store::Writer::default();
let pod_state = pod_store_w.as_reader();
let pod_cacher = MetaCache::new();
Expand All @@ -718,7 +722,7 @@ impl Source {
..Default::default()
},
)
.backoff(watcher::default_backoff());
.backoff(watcher::DefaultBackoff::default());
let ns_store_w = reflector::store::Writer::default();
let ns_state = ns_store_w.as_reader();
let ns_cacher = MetaCache::new();
Expand All @@ -741,7 +745,7 @@ impl Source {
..Default::default()
},
)
.backoff(watcher::default_backoff());
.backoff(watcher::DefaultBackoff::default());
let node_store_w = reflector::store::Writer::default();
let node_state = node_store_w.as_reader();
let node_cacher = MetaCache::new();
Expand Down

0 comments on commit b0cc62f

Please sign in to comment.