Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kubernetes_logs source): Set user-agent for k8s apiserver requests. #21905

Merged
merged 14 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 150 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ aws-types = { version = "1.3.3", default-features = false, optional = true }
aws-sigv4 = { version = "1.2.5", default-features = false, features = ["sign-http"], optional = true }
aws-config = { version = "1.0.1", default-features = false, features = ["behavior-version-latest", "credentials-process", "sso", "rt-tokio"], optional = true }
aws-credential-types = { version = "1.2.1", default-features = false, features = ["hardcoded-credentials"], optional = true }
aws-runtime = { version = "1.4.3", optional = true }
aws-smithy-http = { version = "0.60", default-features = false, features = ["event-stream", "rt-tokio"], optional = true }
aws-smithy-types = { version = "1.2.9", default-features = false, features = ["rt-tokio"], optional = true }
aws-smithy-runtime-api = { version = "1.7.3", default-features = false, optional = true }
Expand Down Expand Up @@ -319,6 +320,7 @@ hashbrown = { version = "0.14.5", default-features = false, optional = true, fea
headers = { version = "0.3.9", default-features = false }
hostname = { version = "0.4.0", default-features = false }
http = { version = "0.2.9", default-features = false }
http-1 = { package = "http", version = "1.0", default-features = false }
http-serde = "1.1.3"
http-body = { version = "0.4.5", default-features = false }
hyper = { version = "0.14.28", default-features = false, features = ["client", "runtime", "http1", "http2", "server", "stream"] }
Expand All @@ -329,8 +331,8 @@ indoc = { version = "2.0.5", default-features = false }
inventory = { version = "0.3.15", default-features = false }
ipnet = { version = "2", default-features = false, optional = true, features = ["serde", "std"] }
itertools = { version = "0.13.0", default-features = false, optional = false, features = ["use_alloc"] }
k8s-openapi = { version = "0.18.0", default-features = false, features = ["api", "v1_26"], optional = true }
kube = { version = "0.82.0", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true }
k8s-openapi = { version = "0.22.0", default-features = false, features = ["v1_26"], optional = true }
kube = { version = "0.93.0", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true }
listenfd = { version = "1.0.1", default-features = false, optional = true }
logfmt = { version = "0.0.2", default-features = false, optional = true }
lru = { version = "0.12.5", default-features = false, optional = true }
Expand Down Expand Up @@ -498,6 +500,7 @@ api-client = [

aws-core = [
"aws-config",
"aws-runtime",
"dep:aws-credential-types",
"dep:aws-sigv4",
"dep:aws-types",
Expand Down
10 changes: 5 additions & 5 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ arc-swap,https://github.com/vorner/arc-swap,MIT OR Apache-2.0,Michal 'vorner' Va
arr_macro,https://github.com/JoshMcguigan/arr_macro,MIT OR Apache-2.0,Josh Mcguigan
arrayvec,https://github.com/bluss/arrayvec,MIT OR Apache-2.0,bluss
ascii,https://github.com/tomprogrammer/rust-ascii,Apache-2.0 OR MIT,"Thomas Bahn <[email protected]>, Torbjørn Birch Moltu <[email protected]>, Simon Sapin <[email protected]>"
async-broadcast,https://github.com/smol-rs/async-broadcast,MIT OR Apache-2.0,"Stjepan Glavina <[email protected]>, Yoshua Wuyts <[email protected]>, Zeeshan Ali Khan <[email protected]>"
async-channel,https://github.com/smol-rs/async-channel,Apache-2.0 OR MIT,Stjepan Glavina <[email protected]>
async-compression,https://github.com/Nullus157/async-compression,MIT OR Apache-2.0,"Wim Looman <[email protected]>, Allen Bui <[email protected]>"
async-executor,https://github.com/smol-rs/async-executor,Apache-2.0 OR MIT,Stjepan Glavina <[email protected]>
Expand Down Expand Up @@ -225,6 +226,7 @@ finl_unicode,https://github.com/dahosek/finl_unicode,MIT OR Apache-2.0,The finl_
flagset,https://github.com/enarx/flagset,Apache-2.0,Nathaniel McCallum <[email protected]>
flate2,https://github.com/rust-lang/flate2-rs,MIT OR Apache-2.0,"Alex Crichton <[email protected]>, Josh Triplett <[email protected]>"
float_eq,https://github.com/jtempest/float_eq-rs,MIT OR Apache-2.0,jtempest
fluent-uri,https://github.com/yescallop/fluent-uri-rs,MIT,Scallop Ye <[email protected]>
flume,https://github.com/zesterer/flume,Apache-2.0 OR MIT,Joshua Barretto <[email protected]>
fnv,https://github.com/servo/rust-fnv,Apache-2.0 OR MIT,Alex Crichton <[email protected]>
foldhash,https://github.com/orlp/foldhash,Zlib,Orson Peters <[email protected]>
Expand Down Expand Up @@ -327,15 +329,14 @@ jni,https://github.com/jni-rs/jni-rs,MIT OR Apache-2.0,Josh Chase <josh@prevoty.
jni-sys,https://github.com/sfackler/rust-jni-sys,MIT OR Apache-2.0,Steven Fackler <[email protected]>
js-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/js-sys,MIT OR Apache-2.0,The wasm-bindgen Developers
json-patch,https://github.com/idubrov/json-patch,MIT OR Apache-2.0,Ivan Dubrov <[email protected]>
jsonpath_lib,https://github.com/freestrings/jsonpath,MIT,Changseok Han <[email protected]>
k8s-openapi,https://github.com/Arnavion/k8s-openapi,Apache-2.0,Arnavion <[email protected]>
jsonpath-rust,https://github.com/besok/jsonpath-rust,MIT,BorisZhguchev <[email protected]>
jsonptr,https://github.com/chanced/jsonptr,MIT OR Apache-2.0,chance dinkins
k8s-openapi,https://github.com/Arnavion/k8s-openapi,Apache-2.0,Arnav Singh <[email protected]>
keccak,https://github.com/RustCrypto/sponges/tree/master/keccak,Apache-2.0 OR MIT,RustCrypto Developers
kqueue,https://gitlab.com/rust-kqueue/rust-kqueue,MIT,William Orr <[email protected]>
kqueue-sys,https://gitlab.com/rust-kqueue/rust-kqueue-sys,MIT,"William Orr <[email protected]>, Daniel (dmilith) Dettlaff <[email protected]>"
krb5-src,https://github.com/MaterializeInc/rust-krb5-src,Apache-2.0,"Materialize, Inc."
kube,https://github.com/kube-rs/kube,Apache-2.0,"clux <[email protected]>, Natalie Klestrup Röijezon <[email protected]>, kazk <[email protected]>"
kube-core,https://github.com/kube-rs/kube,Apache-2.0,"clux <[email protected]>, kazk <[email protected]>"
kube-runtime,https://github.com/kube-rs/kube,Apache-2.0,"Natalie Klestrup Röijezon <[email protected]>, clux <[email protected]>"
lalrpop-util,https://github.com/lalrpop/lalrpop,Apache-2.0 OR MIT,Niko Matsakis <[email protected]>
lapin,https://github.com/amqp-rs/lapin,MIT,"Geoffroy Couprie <[email protected]>, Marc-Antoine Perennou <[email protected]>"
lazy_static,https://github.com/rust-lang-nursery/lazy-static.rs,MIT OR Apache-2.0,Marvin Löbel <[email protected]>
Expand Down Expand Up @@ -639,7 +640,6 @@ tracing-log,https://github.com/tokio-rs/tracing,MIT,Tokio Contributors <team@tok
tracing-serde,https://github.com/tokio-rs/tracing,MIT,Tokio Contributors <[email protected]>
tracing-subscriber,https://github.com/tokio-rs/tracing,MIT,"Eliza Weisman <[email protected]>, David Barsky <[email protected]>, Tokio Contributors <[email protected]>"
tracing-tower,https://github.com/tokio-rs/tracing,MIT,Eliza Weisman <[email protected]>
treediff,https://github.com/Byron/treediff-rs,MIT OR Apache-2.0,Sebastian Thiel <[email protected]>
triomphe,https://github.com/Manishearth/triomphe,MIT OR Apache-2.0,"Manish Goregaokar <[email protected]>, The Servo Project Developers"
trust-dns-proto,https://github.com/bluejekyll/trust-dns,MIT OR Apache-2.0,Benjamin Fry <[email protected]>
trust-dns-resolver,https://github.com/bluejekyll/trust-dns,MIT OR Apache-2.0,Benjamin Fry <[email protected]>
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/21864_kubernetes_logs_user_agent.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `kubernetes_log` source now sets a `user-agent` header when querying [k8s apiserver](https://kubernetes.io/docs/reference/command-line-tools-reference/kube-apiserver/).

authors: ganelo
3 changes: 3 additions & 0 deletions license-tool.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"openssl-macros" = { origin = "https://github.com/sfackler/rust-openssl" }
"serde_nanos" = { origin = "https://github.com/caspervonb/serde_nanos" }

# rust-license-tool can't find the license for jsonpath-rust 0.5.1
"jsonpath-rust" = { license = "MIT", origin = "https://github.com/besok/jsonpath-rust" }

# `ring` has a custom license that is mostly "ISC-style" but parts of it also fall under OpenSSL licensing.
"ring-0.16.20" = { license = "ISC AND Custom" }
"ring-0.17.5" = { license = "ISC AND Custom" }
Expand Down
10 changes: 4 additions & 6 deletions src/aws/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ use aws_config::{
default_provider::credentials::DefaultCredentialsChain,
identity::IdentityCache,
imds,
profile::{
profile_file::{ProfileFileKind, ProfileFiles},
ProfileFileCredentialsProvider,
},
profile::ProfileFileCredentialsProvider,
provider_config::ProviderConfig,
sts::AssumeRoleProviderBuilder,
};
use aws_credential_types::{provider::SharedCredentialsProvider, Credentials};
use aws_runtime::env_config::file::{EnvConfigFileKind, EnvConfigFiles};
use aws_smithy_async::time::SystemTimeSource;
use aws_smithy_runtime_api::client::identity::SharedIdentityCache;
use aws_types::{region::Region, SdkConfig};
Expand Down Expand Up @@ -277,8 +275,8 @@ impl AwsAuthentication {

// The SDK uses the default profile out of the box, but doesn't provide an optional
// type in the builder. We can just hardcode it so that everything works.
let profile_files = ProfileFiles::builder()
.with_file(ProfileFileKind::Credentials, credentials_file)
let profile_files = EnvConfigFiles::builder()
.with_file(EnvConfigFileKind::Credentials, credentials_file)
.build();

let provider_config = ProviderConfig::empty().with_http_client(connector);
Expand Down
67 changes: 50 additions & 17 deletions src/kubernetes/reflector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Intercept [`watcher::Event`]'s.

use std::{hash::Hash, time::Duration};
use std::{hash::Hash, sync::Arc, time::Duration};

use futures::StreamExt;
use futures_util::Stream;
Expand All @@ -26,31 +26,64 @@ pub async fn custom_reflector<K, W>(
{
pin!(stream);
let mut delay_queue = DelayQueue::default();
let mut init_buffer_meta = Vec::new();
loop {
tokio::select! {
result = stream.next() => {
match result {
Some(Ok(event)) => {
match event {
// Immediately reconcile `Applied` event
watcher::Event::Applied(ref obj) => {
trace!(message = "Processing Applied event.", ?event);
// Immediately reconcile `Apply` event
watcher::Event::Apply(ref obj) => {
trace!(message = "Processing Apply event.", event_type = std::any::type_name::<K>(), event = ?event);
store.apply_watcher_event(&event);
let meta_descr = MetaDescribe::from_meta(obj.meta());
meta_cache.store(meta_descr);
}
// Delay reconciling any `Deleted` events
watcher::Event::Deleted(ref obj) => {
// Delay reconciling any `Delete` events
watcher::Event::Delete(ref obj) => {
trace!(message = "Delaying processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
delay_queue.insert(event.to_owned(), delay_deletion);
let meta_descr = MetaDescribe::from_meta(obj.meta());
meta_cache.delete(&meta_descr);
}
// Clear all delayed events on `Restarted` events
watcher::Event::Restarted(_) => {
trace!(message = "Processing Restarted event.", ?event);
// Clear all delayed events on `Init` event
watcher::Event::Init => {
trace!(message = "Processing Init event.", event_type = std::any::type_name::<K>(), event = ?event);
delay_queue.clear();
store.apply_watcher_event(&event);
meta_cache.clear();
init_buffer_meta.clear();
}
// Immediately reconcile `InitApply` event (but buffer the obj ref so we can handle implied deletions on InitDone)
watcher::Event::InitApply(ref obj) => {
trace!(message = "Processing InitApply event.", event_type = std::any::type_name::<K>(), event = ?event);
store.apply_watcher_event(&event);
let meta_descr = MetaDescribe::from_meta(obj.meta());
meta_cache.store(meta_descr.clone());
init_buffer_meta.push(meta_descr.clone());
}
// Reconcile `InitApply` events and implied deletions
watcher::Event::InitDone => {
trace!(message = "Processing InitDone event.", event_type = std::any::type_name::<K>(), event = ?event);
store.apply_watcher_event(&event);


store.as_reader().state().into_iter()
// delay deleting objs that were added before but not during InitApply
.for_each(|obj| {
if let Some(inner) = Arc::into_inner(obj) {
let meta_descr = MetaDescribe::from_meta(inner.meta());
if !init_buffer_meta.contains(&meta_descr) {
let implied_deletion_event = watcher::Event::Delete(inner);
trace!(message = "Delaying processing implied deletion.", event_type = std::any::type_name::<K>(), event = ?implied_deletion_event);
delay_queue.insert(implied_deletion_event, delay_deletion);
meta_cache.delete(&meta_descr);
}
}
});

init_buffer_meta.clear();
}
}
},
Expand All @@ -69,10 +102,10 @@ pub async fn custom_reflector<K, W>(
Some(event) => {
let event = event.into_inner();
match event {
watcher::Event::Deleted(ref obj) => {
watcher::Event::Delete(ref obj) => {
let meta_desc = MetaDescribe::from_meta(obj.meta());
if !meta_cache.contains(&meta_desc) {
trace!(message = "Processing Deleted event.", ?event);
trace!(message = "Processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
store.apply_watcher_event(&event);
}
},
Expand Down Expand Up @@ -118,7 +151,7 @@ mod tests {
..ConfigMap::default()
};
let (mut tx, rx) = mpsc::channel::<_>(5);
tx.send(Ok(watcher::Event::Applied(cm.clone())))
tx.send(Ok(watcher::Event::Apply(cm.clone())))
.await
.unwrap();
let meta_cache = MetaCache::new();
Expand All @@ -144,10 +177,10 @@ mod tests {
..ConfigMap::default()
};
let (mut tx, rx) = mpsc::channel::<_>(5);
tx.send(Ok(watcher::Event::Applied(cm.clone())))
tx.send(Ok(watcher::Event::Apply(cm.clone())))
.await
.unwrap();
tx.send(Ok(watcher::Event::Deleted(cm.clone())))
tx.send(Ok(watcher::Event::Delete(cm.clone())))
.await
.unwrap();
let meta_cache = MetaCache::new();
Expand Down Expand Up @@ -178,13 +211,13 @@ mod tests {
..ConfigMap::default()
};
let (mut tx, rx) = mpsc::channel::<_>(5);
tx.send(Ok(watcher::Event::Applied(cm.clone())))
tx.send(Ok(watcher::Event::Apply(cm.clone())))
.await
.unwrap();
tx.send(Ok(watcher::Event::Deleted(cm.clone())))
tx.send(Ok(watcher::Event::Delete(cm.clone())))
.await
.unwrap();
tx.send(Ok(watcher::Event::Applied(cm.clone())))
tx.send(Ok(watcher::Event::Apply(cm.clone())))
.await
.unwrap();
let meta_cache = MetaCache::new();
Expand Down
19 changes: 14 additions & 5 deletions src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bytes::Bytes;
use chrono::Utc;
use futures::{future::FutureExt, stream::StreamExt};
use futures_util::Stream;
use http_1::{HeaderName, HeaderValue};
use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
use k8s_paths_provider::K8sPathsProvider;
use kube::{
Expand All @@ -34,7 +35,10 @@ use vector_lib::{
};
use vrl::value::{kind::Collection, Kind};

use crate::sources::kubernetes_logs::partial_events_merger::merge_partial_events;
use crate::{
built_info::{PKG_NAME, PKG_VERSION},
sources::kubernetes_logs::partial_events_merger::merge_partial_events,
};
use crate::{
config::{
log_schema, ComponentKey, DataType, GenerateConfig, GlobalOptions, SourceConfig,
Expand Down Expand Up @@ -586,7 +590,7 @@ impl Source {
// If the user passed a custom Kubeconfig use it, otherwise
// we attempt to load the local kubeconfig, followed by the
// in-cluster environment variables
let client_config = match &config.kube_config_file {
let mut client_config = match &config.kube_config_file {
Some(kc) => {
ClientConfig::from_custom_kubeconfig(
config::Kubeconfig::read_from(kc)?,
Expand All @@ -596,6 +600,11 @@ impl Source {
}
None => ClientConfig::infer().await?,
};
if let Ok(user_agent) = HeaderValue::from_str(&format!("{}/{}", PKG_NAME, PKG_VERSION)) {
client_config
.headers
.push((HeaderName::from_static("user-agent"), user_agent));
}
ganelo marked this conversation as resolved.
Show resolved Hide resolved
let client = Client::try_from(client_config)?;

let data_dir = globals.resolve_and_make_data_subdir(config.data_dir.as_ref(), key.id())?;
Expand Down Expand Up @@ -695,7 +704,7 @@ impl Source {
..Default::default()
},
)
.backoff(watcher::default_backoff());
.backoff(watcher::DefaultBackoff::default());
let pod_store_w = reflector::store::Writer::default();
let pod_state = pod_store_w.as_reader();
let pod_cacher = MetaCache::new();
Expand All @@ -718,7 +727,7 @@ impl Source {
..Default::default()
},
)
.backoff(watcher::default_backoff());
.backoff(watcher::DefaultBackoff::default());
let ns_store_w = reflector::store::Writer::default();
let ns_state = ns_store_w.as_reader();
let ns_cacher = MetaCache::new();
Expand All @@ -741,7 +750,7 @@ impl Source {
..Default::default()
},
)
.backoff(watcher::default_backoff());
.backoff(watcher::DefaultBackoff::default());
let node_store_w = reflector::store::Writer::default();
let node_state = node_store_w.as_reader();
let node_cacher = MetaCache::new();
Expand Down
Loading