Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(http-sink): Add automatic bearer token acquisition for http-sink #20995

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,26 @@ pub enum Auth {
password: SensitiveString,
},

/// Lorem ipsum dolor sit amet.
///
/// Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia.
OAuth2 {
/// Temporibus autem quibusdam et aut officiis debitis.
#[configurable(metadata(docs::examples = "${TOKEN_ENDPOINT}"))]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suggest prefix the env var names with OAUTH_ or OAUTH2_

#[configurable(metadata(docs::examples = "token_endpoint"))]
token_endpoint: String,

/// Nam libero tempore, cum soluta nobis.
#[configurable(metadata(docs::examples = "${CLIENT_ID}"))]
#[configurable(metadata(docs::examples = "client_id"))]
client_id: String,

/// At vero eos et accusamus et iusto odio dignissimos ducimus qui blanditiis praesentium.
#[configurable(metadata(docs::examples = "${CLIENT_SECRET}"))]
#[configurable(metadata(docs::examples = "client_secret"))]
client_secret: SensitiveString,
},

/// Bearer authentication.
///
/// The bearer token value (OAuth2, JWT, etc.) is passed as-is.
Expand Down Expand Up @@ -338,6 +358,9 @@ impl Auth {
Ok(auth) => map.typed_insert(auth),
Err(error) => error!(message = "Invalid bearer token.", token = %token, %error),
},
Auth::OAuth2 { ..} => {
panic!("Operation not supporter for this type of authorization.")
},
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/databend/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ impl SinkConfig for DatabendConfig {
Some(Auth::Bearer { .. }) => {
return Err("Bearer authentication is not supported currently".into());
}
Some(Auth::OAuth2 { .. }) => {
return Err("OAuth2 authentication is not supported currently".into());
}
None => {}
}
if let Some(database) = &self.database {
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/prometheus/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ fn authorized<T: HttpBody>(req: &Request<T>, auth: &Option<Auth>) -> bool {
),
Auth::Bearer { token } => {
HeaderValue::from_str(format!("Bearer {}", token.inner()).as_str())
},
Auth::OAuth2 { .. } => {
panic!("Operation not supporter for this type of authorization.")
}
};

Expand Down
251 changes: 174 additions & 77 deletions src/sources/util/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
//! - Call call() supplying the generic inputs for calling and the source-specific
//! context.

use std::sync::{Arc, Mutex};
use bytes::Bytes;
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt};
use http::{response::Parts, Uri};
use hyper::{Body, Request};
use std::time::Duration;
use serde::Deserialize;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, future::ready};
use tokio_stream::wrappers::IntervalStream;
use vector_lib::json_size::JsonSize;
use vector_lib::sensitive_string::SensitiveString;
use bytes::Buf;

use crate::{
http::{Auth, HttpClient},
Expand Down Expand Up @@ -119,12 +123,88 @@
}
}

#[derive(Clone)]
struct BearerTokenState {
token: Arc<Mutex<Option<ExpirableToken>>>
}

#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct Token {
access_token: String,
expires_in: u32
}

#[derive(Debug)]
struct ExpirableToken {
access_token: String,
expires_after_ms: u128
}

impl BearerTokenState {

fn new() -> BearerTokenState {
let empty_token = Arc::new(Mutex::new(None));
BearerTokenState { token: empty_token }
}

async fn get_token(&mut self, client: HttpClient, token_endpoint: String, client_id: String, client_secret: SensitiveString) -> String {

let now = SystemTime::now();
let since_the_epoch = now
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");

//first lets try to return already acquired token
{
let maybe_token = self.token.lock().unwrap();
match &*maybe_token {
Some(token) => {
if since_the_epoch.as_millis() < token.expires_after_ms {
//we have token, token is valud for at least 1min, we can use it.

Check failure on line 164 in src/sources/util/http_client.rs

View workflow job for this annotation

GitHub Actions / Check Spelling

`valud` is not a recognized word. (unrecognized-spelling)
return token.access_token.clone();
}
},
None => {},
}
}

//we need to acquire new
let request_body = format!("client_secret={}&grant_type=client_credentials&response_type=token&client_id={}", client_secret.inner(), client_id);
println!("{}", request_body);

let builder = Request::post(&token_endpoint);
let builder = builder.header("Content-Type", "application/x-www-form-urlencoded");
let request = builder.body(Body::from(request_body)).expect("error creating request");
let response = client.send(request).await.unwrap();

println!("BODY {:?}", response);

let body = hyper::body::aggregate(response).await.unwrap();
let token: Token = serde_json::from_reader(body.reader()).unwrap();
let token_to_return = token.access_token.clone();

//expires_in means, in seconds, for how long it will be valid, lets say 5min,
//to not cause some random 4xx, because token expired in the meantime, we will make some
//room for token refreshing, this room is 1min (60seconds)
let token_is_valid_for_ms = (token.expires_in - 60) * 1000;
let token_will_expire_after_ms = since_the_epoch.as_millis() + (token_is_valid_for_ms as u128);


{
let _ = self.token.lock().unwrap().replace(ExpirableToken{access_token:token.access_token, expires_after_ms: token_will_expire_after_ms});
}

token_to_return
}
}

/// Calls one or more urls at an interval.
/// - The HTTP request is built per the options in provided generic inputs.
/// - The HTTP response is decoded/parsed into events by the specific context.
/// - The events are then sent to the output stream.
pub(crate) async fn call<
B: HttpClientBuilder<Context = C> + Send + Clone,
B: HttpClientBuilder<Context = C> + Send + Clone + std::marker::Sync,
C: HttpClientContext + Send,
>(
inputs: GenericHttpClientInputs,
Expand All @@ -136,11 +216,15 @@
// proxy and tls settings.
let client =
HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed");

//oauth2 beaing used or no, needs to be initialized.

Check failure on line 220 in src/sources/util/http_client.rs

View workflow job for this annotation

GitHub Actions / Check Spelling

`beaing` is not a recognized word. (unrecognized-spelling)
let bearer_state = BearerTokenState::new();

let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval))
.take_until(inputs.shutdown)
.map(move |_| stream::iter(inputs.urls.clone()))
.flatten()
.map(move |url| {
.then(move |url| {
let client = client.clone();
let endpoint = url.to_string();

Expand Down Expand Up @@ -170,84 +254,97 @@

// building an empty request should be infallible
let mut request = builder.body(Body::empty()).expect("error creating request");

if let Some(auth) = &inputs.auth {
auth.apply(&mut request);
}

tokio::time::timeout(inputs.timeout, client.send(request))
.then(move |result| async move {
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error.into()),
Err(_) => Err(format!(
"Timeout error: request exceeded {}s",
inputs.timeout.as_secs_f64()
)
.into()),

let auth = inputs.auth.clone();
let mut bearer_state = bearer_state.clone();
async move {

if let Some(auth) = auth{
match auth {
Auth::Basic { .. } => auth.apply(&mut request),
Auth::Bearer { .. } => auth.apply(&mut request),
Auth::OAuth2 { token_endpoint, client_id, client_secret } => {
let token = bearer_state.get_token(client.clone(), token_endpoint, client_id, client_secret).await;
let temp_auth = Auth::Bearer{ token: SensitiveString::from(token)};
temp_auth.apply(&mut request);
},
}
})
.and_then(|response| async move {
let (header, body) = response.into_parts();
let body = hyper::body::to_bytes(body).await?;
emit!(EndpointBytesReceived {
byte_size: body.len(),
protocol: "http",
endpoint: endpoint.as_str(),
});
Ok((header, body))
})
.into_stream()
.filter_map(move |response| {
ready(match response {
Ok((header, body)) if header.status == hyper::StatusCode::OK => {
context.on_response(&url, &header, &body).map(|mut events| {
let byte_size = if events.is_empty() {
// We need to explicitly set the byte size
// to 0 since
// `estimated_json_encoded_size_of` returns
// at least 1 for an empty collection. For
// the purposes of the
// HttpClientEventsReceived event, we should
// emit 0 when there aren't any usable
// metrics.
JsonSize::zero()
} else {
events.estimated_json_encoded_size_of()
};

emit!(HttpClientEventsReceived {
byte_size,
count: events.len(),
url: url.to_string()
});

// We'll enrich after receiving the events so
// that the byte sizes are accurate.
context.enrich_events(&mut events);
}

stream::iter(events)
})
}
Ok((header, _)) => {
context.on_http_response_error(&url, &header);
emit!(HttpClientHttpResponseError {
code: header.status,
url: url.to_string(),
});
None
}
Err(error) => {
emit!(HttpClientHttpError {
error,
url: url.to_string()
});
None
tokio::time::timeout(inputs.timeout, client.send(request))
.then(move |result| async move {
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error.into()),
Err(_) => Err(format!(
"Timeout error: request exceeded {}s",
inputs.timeout.as_secs_f64()
)
.into()),
}
})
})
.flatten()
.boxed()
.and_then(|response| async move {
let (header, body) = response.into_parts();
let body = hyper::body::to_bytes(body).await?;
emit!(EndpointBytesReceived {
byte_size: body.len(),
protocol: "http",
endpoint: endpoint.as_str(),
});
Ok((header, body))
})
.into_stream()
.filter_map(move |response| {
ready(match response {
Ok((header, body)) if header.status == hyper::StatusCode::OK => {
context.on_response(&url, &header, &body).map(|mut events| {
let byte_size = if events.is_empty() {
// We need to explicitly set the byte size
// to 0 since
// `estimated_json_encoded_size_of` returns
// at least 1 for an empty collection. For
// the purposes of the
// HttpClientEventsReceived event, we should
// emit 0 when there aren't any usable
// metrics.
JsonSize::zero()
} else {
events.estimated_json_encoded_size_of()
};

emit!(HttpClientEventsReceived {
byte_size,
count: events.len(),
url: url.to_string()
});

// We'll enrich after receiving the events so
// that the byte sizes are accurate.
context.enrich_events(&mut events);

stream::iter(events)
})
}
Ok((header, _)) => {
context.on_http_response_error(&url, &header);
emit!(HttpClientHttpResponseError {
code: header.status,
url: url.to_string(),
});
None
}
Err(error) => {
emit!(HttpClientHttpError {
error,
url: url.to_string()
});
None
}
})
})
.flatten()
.boxed()
}
})
.flatten_unordered(None)
.boxed();
Expand Down
Loading