From bcbbb62d86cc8e80066d576688b20e5fbb0ecf7d Mon Sep 17 00:00:00 2001 From: Sainath Singineedi Date: Sun, 19 Jan 2025 01:13:23 +0530 Subject: [PATCH] enhancement(http): unifying http query parameters --- src/http.rs | 54 +++++++++++++++++++ src/sinks/elasticsearch/common.rs | 32 +++++++---- src/sinks/elasticsearch/config.rs | 4 +- src/sinks/elasticsearch/integration_tests.rs | 7 ++- src/sources/http_client/client.rs | 3 +- src/sources/http_client/tests.rs | 8 ++- src/sources/prometheus/scrape.rs | 11 ++-- src/sources/util/http_client.rs | 3 +- .../components/sources/base/http_client.cue | 2 +- .../sources/base/prometheus_scrape.cue | 2 +- 10 files changed, 103 insertions(+), 23 deletions(-) diff --git a/src/http.rs b/src/http.rs index 9faaee12ed86c..169cfc9f9a81a 100644 --- a/src/http.rs +++ b/src/http.rs @@ -16,6 +16,7 @@ use rand::Rng; use serde_with::serde_as; use snafu::{ResultExt, Snafu}; use std::{ + collections::HashMap, fmt, net::SocketAddr, task::{Context, Poll}, @@ -553,6 +554,59 @@ where } } +/// Configuration of the query parameter value for HTTP requests. +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +#[serde(untagged)] +#[configurable(metadata(docs::enum_tag_description = "Query parameter value"))] +pub enum QueryParameterValue { + /// Query parameter with single value + SingleParam(String), + /// Query parameter with multiple values + MultiParams(Vec), +} + +impl QueryParameterValue { + /// Returns an iterator over string slices of the parameter values + pub fn iter(&self) -> std::iter::Map, fn(&String) -> &str> { + match self { + QueryParameterValue::SingleParam(param) => std::slice::from_ref(param).iter(), + QueryParameterValue::MultiParams(params) => params.iter(), + } + .map(String::as_str) + } + + /// Convert to Vec for owned iteration + fn into_vec(self) -> Vec { + match self { + QueryParameterValue::SingleParam(param) => vec![param], + QueryParameterValue::MultiParams(params) => params, + } + } +} + +// Implement IntoIterator for &QueryParameterValue +impl<'a> IntoIterator for &'a QueryParameterValue { + type Item = &'a str; + type IntoIter = std::iter::Map, fn(&String) -> &str>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +// Implement IntoIterator for owned QueryParameterValue +impl IntoIterator for QueryParameterValue { + type Item = String; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.into_vec().into_iter() + } +} + +pub type QueryParameters = HashMap; + #[cfg(test)] mod tests { use std::convert::Infallible; diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index e2cdec7a1adde..1060ee5c7a22a 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use bytes::{Buf, Bytes}; use http::{Response, StatusCode, Uri}; use hyper::{body, Body}; @@ -13,14 +11,13 @@ use super::{ InvalidHostSnafu, Request, VersionType, }; use crate::{ - http::{HttpClient, MaybeAuth}, + http::{HttpClient, MaybeAuth, QueryParameterValue, QueryParameters}, sinks::{ elasticsearch::{ ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig, OpenSearchServiceType, ParseError, }, - util::auth::Auth, - util::{http::RequestConfig, UriSerde}, + util::{auth::Auth, http::RequestConfig, UriSerde}, HealthcheckError, }, tls::TlsSettings, @@ -37,7 +34,7 @@ pub struct ElasticsearchCommon { pub request_builder: ElasticsearchRequestBuilder, pub tls_settings: TlsSettings, pub request: RequestConfig, - pub query_params: HashMap, + pub query_params: QueryParameters, pub metric_to_log: MetricToLog, } @@ -123,19 +120,34 @@ impl ElasticsearchCommon { let mut query_params = config.query.clone().unwrap_or_default(); query_params.insert( "timeout".into(), - format!("{}s", tower_request.timeout.as_secs()), + QueryParameterValue::SingleParam(format!("{}s", tower_request.timeout.as_secs())), ); if let Some(pipeline) = &config.pipeline { if !pipeline.is_empty() { - query_params.insert("pipeline".into(), pipeline.into()); + query_params.insert( + "pipeline".into(), + QueryParameterValue::SingleParam(pipeline.into()), + ); } } let bulk_url = { let mut query = url::form_urlencoded::Serializer::new(String::new()); - for (p, v) in &query_params { - query.append_pair(&p[..], &v[..]); + // Iterate through the HashMap + for (param_name, param_value) in &query_params { + match param_value { + QueryParameterValue::SingleParam(param) => { + // For single parameter, just append one pair + query.append_pair(param_name, param); + } + QueryParameterValue::MultiParams(params) => { + // For multiple parameters, append the same key multiple times + for value in params { + query.append_pair(param_name, value); + } + } + } } format!("{}/_bulk?{}", base_url, query.finish()) }; diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 6ae622583d73d..ba639c6ec794b 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -10,7 +10,7 @@ use crate::{ codecs::Transformer, config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext}, event::{EventRef, LogEvent, Value}, - http::HttpClient, + http::{HttpClient, QueryParameters}, internal_events::TemplateRenderingError, sinks::{ elasticsearch::{ @@ -175,7 +175,7 @@ pub struct ElasticsearchConfig { #[configurable(metadata(docs::advanced))] #[configurable(metadata(docs::additional_props_description = "A query string parameter."))] #[configurable(metadata(docs::examples = "query_examples()"))] - pub query: Option>, + pub query: Option, #[serde(default)] #[configurable(derived)] diff --git a/src/sinks/elasticsearch/integration_tests.rs b/src/sinks/elasticsearch/integration_tests.rs index 5a5a486ef446e..9c58b12a6694f 100644 --- a/src/sinks/elasticsearch/integration_tests.rs +++ b/src/sinks/elasticsearch/integration_tests.rs @@ -16,7 +16,7 @@ use super::{config::DATA_STREAM_TIMESTAMP_KEY, *}; use crate::{ aws::{ImdsAuthentication, RegionOrEndpoint}, config::{ProxyConfig, SinkConfig, SinkContext}, - http::HttpClient, + http::{HttpClient, QueryParameterValue}, sinks::{ util::{auth::Auth, BatchConfig, Compression, SinkBatchSettings}, HealthcheckError, @@ -133,7 +133,10 @@ async fn ensure_pipeline_in_params() { .await .expect("Config error"); - assert_eq!(common.query_params["pipeline"], pipeline); + assert_eq!( + common.query_params["pipeline"], + QueryParameterValue::SingleParam(pipeline) + ); } #[tokio::test] diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index 64749a3d651a0..06a801a91634c 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -10,6 +10,7 @@ use snafu::ResultExt; use std::{collections::HashMap, time::Duration}; use tokio_util::codec::Decoder as _; +use crate::http::QueryParameters; use crate::sources::util::http_client; use crate::{ codecs::{Decoder, DecodingConfig}, @@ -78,7 +79,7 @@ pub struct HttpClientConfig { docs::additional_props_description = "A query string parameter and it's value(s)." ))] #[configurable(metadata(docs::examples = "query_examples()"))] - pub query: HashMap>, + pub query: QueryParameters, /// Decoder to use on the HTTP responses. #[configurable(derived)] diff --git a/src/sources/http_client/tests.rs b/src/sources/http_client/tests.rs index 69309321b07f5..8ce4f87f3de29 100644 --- a/src/sources/http_client/tests.rs +++ b/src/sources/http_client/tests.rs @@ -5,6 +5,7 @@ use vector_lib::config::LogNamespace; use warp::{http::HeaderMap, Filter}; use crate::components::validation::prelude::*; +use crate::http::QueryParameterValue; use crate::sources::util::http::HttpMethod; use crate::{serde::default_decoding, serde::default_framing_message_based}; use vector_lib::codecs::decoding::{ @@ -180,10 +181,13 @@ async fn request_query_applied() { interval: INTERVAL, timeout: TIMEOUT, query: HashMap::from([ - ("key1".to_string(), vec!["val2".to_string()]), + ( + "key1".to_string(), + QueryParameterValue::MultiParams(vec!["val2".to_string()]), + ), ( "key2".to_string(), - vec!["val1".to_string(), "val2".to_string()], + QueryParameterValue::MultiParams(vec!["val1".to_string(), "val2".to_string()]), ), ]), decoding: DeserializerConfig::Json(Default::default()), diff --git a/src/sources/prometheus/scrape.rs b/src/sources/prometheus/scrape.rs index 28128e2a81a75..ac32e9dbfc47b 100644 --- a/src/sources/prometheus/scrape.rs +++ b/src/sources/prometheus/scrape.rs @@ -10,6 +10,7 @@ use vector_lib::configurable::configurable_component; use vector_lib::{config::LogNamespace, event::Event}; use super::parser; +use crate::http::QueryParameters; use crate::sources::util::http::HttpMethod; use crate::sources::util::http_client::{default_timeout, warn_if_interval_too_low}; use crate::{ @@ -94,7 +95,7 @@ pub struct PrometheusScrapeConfig { #[serde(default)] #[configurable(metadata(docs::additional_props_description = "A query string parameter."))] #[configurable(metadata(docs::examples = "query_example()"))] - query: HashMap>, + query: QueryParameters, #[configurable(derived)] tls: Option, @@ -330,6 +331,7 @@ mod test { use super::*; use crate::{ config, + http::QueryParameterValue, sinks::prometheus::exporter::PrometheusExporterConfig, test_util::{ components::{run_and_assert_source_compliance, HTTP_PULL_SOURCE_TAGS}, @@ -572,10 +574,13 @@ mod test { endpoint_tag: Some("endpoint".to_string()), honor_labels: false, query: HashMap::from([ - ("key1".to_string(), vec!["val2".to_string()]), + ( + "key1".to_string(), + QueryParameterValue::MultiParams(vec!["val2".to_string()]), + ), ( "key2".to_string(), - vec!["val1".to_string(), "val2".to_string()], + QueryParameterValue::MultiParams(vec!["val1".to_string(), "val2".to_string()]), ), ]), auth: None, diff --git a/src/sources/util/http_client.rs b/src/sources/util/http_client.rs index fd5ffb1b03260..5433a7001a417 100644 --- a/src/sources/util/http_client.rs +++ b/src/sources/util/http_client.rs @@ -17,6 +17,7 @@ use std::{collections::HashMap, future::ready}; use tokio_stream::wrappers::IntervalStream; use vector_lib::json_size::JsonSize; +use crate::http::QueryParameters; use crate::{ http::{Auth, HttpClient}, internal_events::{ @@ -82,7 +83,7 @@ pub(crate) trait HttpClientContext { } /// Builds a url for the HTTP requests. -pub(crate) fn build_url(uri: &Uri, query: &HashMap>) -> Uri { +pub(crate) fn build_url(uri: &Uri, query: &QueryParameters) -> Uri { let mut serializer = url::form_urlencoded::Serializer::new(String::new()); if let Some(query) = uri.query() { serializer.extend_pairs(url::form_urlencoded::parse(query.as_bytes())); diff --git a/website/cue/reference/components/sources/base/http_client.cue b/website/cue/reference/components/sources/base/http_client.cue index 609949072fba6..53414e3b7db06 100644 --- a/website/cue/reference/components/sources/base/http_client.cue +++ b/website/cue/reference/components/sources/base/http_client.cue @@ -515,7 +515,7 @@ base: components: sources: http_client: configuration: { options: "*": { description: "A query string parameter and it's value(s)." required: true - type: array: items: type: string: {} + type: string: {} } } } diff --git a/website/cue/reference/components/sources/base/prometheus_scrape.cue b/website/cue/reference/components/sources/base/prometheus_scrape.cue index 41eb855cc768d..4e0c68d88585b 100644 --- a/website/cue/reference/components/sources/base/prometheus_scrape.cue +++ b/website/cue/reference/components/sources/base/prometheus_scrape.cue @@ -99,7 +99,7 @@ base: components: sources: prometheus_scrape: configuration: { options: "*": { description: "A query string parameter." required: true - type: array: items: type: string: {} + type: string: {} } } }