Skip to content

Commit

Permalink
enhancement(http): unifying http query parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
sainad2222 committed Jan 18, 2025
1 parent 681f08d commit bcbbb62
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 23 deletions.
54 changes: 54 additions & 0 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use rand::Rng;
use serde_with::serde_as;
use snafu::{ResultExt, Snafu};
use std::{
collections::HashMap,
fmt,
net::SocketAddr,
task::{Context, Poll},
Expand Down Expand Up @@ -553,6 +554,59 @@ where
}
}

/// Configuration of the query parameter value for HTTP requests.
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
#[serde(untagged)]
#[configurable(metadata(docs::enum_tag_description = "Query parameter value"))]
pub enum QueryParameterValue {
/// Query parameter with single value
SingleParam(String),
/// Query parameter with multiple values
MultiParams(Vec<String>),
}

impl QueryParameterValue {
/// Returns an iterator over string slices of the parameter values
pub fn iter(&self) -> std::iter::Map<std::slice::Iter<'_, String>, fn(&String) -> &str> {
match self {
QueryParameterValue::SingleParam(param) => std::slice::from_ref(param).iter(),
QueryParameterValue::MultiParams(params) => params.iter(),
}
.map(String::as_str)
}

/// Convert to Vec<String> for owned iteration
fn into_vec(self) -> Vec<String> {
match self {
QueryParameterValue::SingleParam(param) => vec![param],
QueryParameterValue::MultiParams(params) => params,
}
}
}

// Implement IntoIterator for &QueryParameterValue
impl<'a> IntoIterator for &'a QueryParameterValue {
type Item = &'a str;
type IntoIter = std::iter::Map<std::slice::Iter<'a, String>, fn(&String) -> &str>;

fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

// Implement IntoIterator for owned QueryParameterValue
impl IntoIterator for QueryParameterValue {
type Item = String;
type IntoIter = std::vec::IntoIter<String>;

fn into_iter(self) -> Self::IntoIter {
self.into_vec().into_iter()
}
}

pub type QueryParameters = HashMap<String, QueryParameterValue>;

#[cfg(test)]
mod tests {
use std::convert::Infallible;
Expand Down
32 changes: 22 additions & 10 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use bytes::{Buf, Bytes};
use http::{Response, StatusCode, Uri};
use hyper::{body, Body};
Expand All @@ -13,14 +11,13 @@ use super::{
InvalidHostSnafu, Request, VersionType,
};
use crate::{
http::{HttpClient, MaybeAuth},
http::{HttpClient, MaybeAuth, QueryParameterValue, QueryParameters},
sinks::{
elasticsearch::{
ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig,
OpenSearchServiceType, ParseError,
},
util::auth::Auth,
util::{http::RequestConfig, UriSerde},
util::{auth::Auth, http::RequestConfig, UriSerde},
HealthcheckError,
},
tls::TlsSettings,
Expand All @@ -37,7 +34,7 @@ pub struct ElasticsearchCommon {
pub request_builder: ElasticsearchRequestBuilder,
pub tls_settings: TlsSettings,
pub request: RequestConfig,
pub query_params: HashMap<String, String>,
pub query_params: QueryParameters,
pub metric_to_log: MetricToLog,
}

Expand Down Expand Up @@ -123,19 +120,34 @@ impl ElasticsearchCommon {
let mut query_params = config.query.clone().unwrap_or_default();
query_params.insert(
"timeout".into(),
format!("{}s", tower_request.timeout.as_secs()),
QueryParameterValue::SingleParam(format!("{}s", tower_request.timeout.as_secs())),
);

if let Some(pipeline) = &config.pipeline {
if !pipeline.is_empty() {
query_params.insert("pipeline".into(), pipeline.into());
query_params.insert(
"pipeline".into(),
QueryParameterValue::SingleParam(pipeline.into()),
);
}
}

let bulk_url = {
let mut query = url::form_urlencoded::Serializer::new(String::new());
for (p, v) in &query_params {
query.append_pair(&p[..], &v[..]);
// Iterate through the HashMap
for (param_name, param_value) in &query_params {
match param_value {
QueryParameterValue::SingleParam(param) => {
// For single parameter, just append one pair
query.append_pair(param_name, param);
}
QueryParameterValue::MultiParams(params) => {
// For multiple parameters, append the same key multiple times
for value in params {
query.append_pair(param_name, value);
}
}
}
}
format!("{}/_bulk?{}", base_url, query.finish())
};
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
codecs::Transformer,
config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
event::{EventRef, LogEvent, Value},
http::HttpClient,
http::{HttpClient, QueryParameters},
internal_events::TemplateRenderingError,
sinks::{
elasticsearch::{
Expand Down Expand Up @@ -175,7 +175,7 @@ pub struct ElasticsearchConfig {
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
#[configurable(metadata(docs::examples = "query_examples()"))]
pub query: Option<HashMap<String, String>>,
pub query: Option<QueryParameters>,

#[serde(default)]
#[configurable(derived)]
Expand Down
7 changes: 5 additions & 2 deletions src/sinks/elasticsearch/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{config::DATA_STREAM_TIMESTAMP_KEY, *};
use crate::{
aws::{ImdsAuthentication, RegionOrEndpoint},
config::{ProxyConfig, SinkConfig, SinkContext},
http::HttpClient,
http::{HttpClient, QueryParameterValue},
sinks::{
util::{auth::Auth, BatchConfig, Compression, SinkBatchSettings},
HealthcheckError,
Expand Down Expand Up @@ -133,7 +133,10 @@ async fn ensure_pipeline_in_params() {
.await
.expect("Config error");

assert_eq!(common.query_params["pipeline"], pipeline);
assert_eq!(
common.query_params["pipeline"],
QueryParameterValue::SingleParam(pipeline)
);
}

#[tokio::test]
Expand Down
3 changes: 2 additions & 1 deletion src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use snafu::ResultExt;
use std::{collections::HashMap, time::Duration};
use tokio_util::codec::Decoder as _;

use crate::http::QueryParameters;
use crate::sources::util::http_client;
use crate::{
codecs::{Decoder, DecodingConfig},
Expand Down Expand Up @@ -78,7 +79,7 @@ pub struct HttpClientConfig {
docs::additional_props_description = "A query string parameter and it's value(s)."
))]
#[configurable(metadata(docs::examples = "query_examples()"))]
pub query: HashMap<String, Vec<String>>,
pub query: QueryParameters,

/// Decoder to use on the HTTP responses.
#[configurable(derived)]
Expand Down
8 changes: 6 additions & 2 deletions src/sources/http_client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use vector_lib::config::LogNamespace;
use warp::{http::HeaderMap, Filter};

use crate::components::validation::prelude::*;
use crate::http::QueryParameterValue;
use crate::sources::util::http::HttpMethod;
use crate::{serde::default_decoding, serde::default_framing_message_based};
use vector_lib::codecs::decoding::{
Expand Down Expand Up @@ -180,10 +181,13 @@ async fn request_query_applied() {
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::from([
("key1".to_string(), vec!["val2".to_string()]),
(
"key1".to_string(),
QueryParameterValue::MultiParams(vec!["val2".to_string()]),
),
(
"key2".to_string(),
vec!["val1".to_string(), "val2".to_string()],
QueryParameterValue::MultiParams(vec!["val1".to_string(), "val2".to_string()]),
),
]),
decoding: DeserializerConfig::Json(Default::default()),
Expand Down
11 changes: 8 additions & 3 deletions src/sources/prometheus/scrape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use vector_lib::configurable::configurable_component;
use vector_lib::{config::LogNamespace, event::Event};

use super::parser;
use crate::http::QueryParameters;
use crate::sources::util::http::HttpMethod;
use crate::sources::util::http_client::{default_timeout, warn_if_interval_too_low};
use crate::{
Expand Down Expand Up @@ -94,7 +95,7 @@ pub struct PrometheusScrapeConfig {
#[serde(default)]
#[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
#[configurable(metadata(docs::examples = "query_example()"))]
query: HashMap<String, Vec<String>>,
query: QueryParameters,

#[configurable(derived)]
tls: Option<TlsConfig>,
Expand Down Expand Up @@ -330,6 +331,7 @@ mod test {
use super::*;
use crate::{
config,
http::QueryParameterValue,
sinks::prometheus::exporter::PrometheusExporterConfig,
test_util::{
components::{run_and_assert_source_compliance, HTTP_PULL_SOURCE_TAGS},
Expand Down Expand Up @@ -572,10 +574,13 @@ mod test {
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
query: HashMap::from([
("key1".to_string(), vec!["val2".to_string()]),
(
"key1".to_string(),
QueryParameterValue::MultiParams(vec!["val2".to_string()]),
),
(
"key2".to_string(),
vec!["val1".to_string(), "val2".to_string()],
QueryParameterValue::MultiParams(vec!["val1".to_string(), "val2".to_string()]),
),
]),
auth: None,
Expand Down
3 changes: 2 additions & 1 deletion src/sources/util/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{collections::HashMap, future::ready};
use tokio_stream::wrappers::IntervalStream;
use vector_lib::json_size::JsonSize;

use crate::http::QueryParameters;
use crate::{
http::{Auth, HttpClient},
internal_events::{
Expand Down Expand Up @@ -82,7 +83,7 @@ pub(crate) trait HttpClientContext {
}

/// Builds a url for the HTTP requests.
pub(crate) fn build_url(uri: &Uri, query: &HashMap<String, Vec<String>>) -> Uri {
pub(crate) fn build_url(uri: &Uri, query: &QueryParameters) -> Uri {
let mut serializer = url::form_urlencoded::Serializer::new(String::new());
if let Some(query) = uri.query() {
serializer.extend_pairs(url::form_urlencoded::parse(query.as_bytes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ base: components: sources: http_client: configuration: {
options: "*": {
description: "A query string parameter and it's value(s)."
required: true
type: array: items: type: string: {}
type: string: {}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ base: components: sources: prometheus_scrape: configuration: {
options: "*": {
description: "A query string parameter."
required: true
type: array: items: type: string: {}
type: string: {}
}
}
}
Expand Down

0 comments on commit bcbbb62

Please sign in to comment.