Skip to content

Commit 3d844b3

Browse files
feat: optional P_TLS_SKIP_VERIFY to allow self-signed certificates for intra cluster calls (#1309)
allow self signed certificates for intra cluster communication. To be used only for testing purposes. --------- Signed-off-by: Nikhil Sinha <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent a3d52b7 commit 3d844b3

File tree

7 files changed

+51
-19
lines changed

7 files changed

+51
-19
lines changed

src/analytics.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::{
4343
option::Mode,
4444
parseable::PARSEABLE,
4545
stats::{self, Stats},
46-
storage, HTTP_CLIENT,
46+
storage, HTTP_CLIENT, INTRA_CLUSTER_CLIENT,
4747
};
4848

4949
const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80";
@@ -280,7 +280,7 @@ async fn fetch_ingestors_metrics(
280280
))
281281
.expect("Should be a valid URL");
282282

283-
let resp = HTTP_CLIENT
283+
let resp = INTRA_CLUSTER_CLIENT
284284
.get(uri)
285285
.header(header::AUTHORIZATION, im.token.clone())
286286
.header(header::CONTENT_TYPE, "application/json")

src/cli.rs

+12
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,18 @@ pub struct Options {
207207
)]
208208
pub trusted_ca_certs_path: Option<PathBuf>,
209209

210+
/// Allows invalid TLS certificates for intra-cluster communication.
211+
/// This is needed when nodes connect to each other via IP addresses
212+
/// which don't match the domain names in their certificates.
213+
/// SECURITY NOTE: Only enable this for trusted internal networks.
214+
#[arg(
215+
long,
216+
env = "P_TLS_SKIP_VERIFY",
217+
value_name = "bool",
218+
default_value = "false"
219+
)]
220+
pub tls_skip_verify: bool,
221+
210222
// Storage configuration
211223
#[arg(
212224
long,

src/handlers/http/cluster/mod.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use crate::storage::{
5050
ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY,
5151
STREAM_ROOT_DIRECTORY,
5252
};
53-
use crate::HTTP_CLIENT;
53+
use crate::INTRA_CLUSTER_CLIENT;
5454

5555
use super::base_path_without_preceding_slash;
5656
use super::ingest::PostError;
@@ -128,7 +128,7 @@ pub async fn sync_streams_with_ingestors(
128128
let headers = reqwest_headers_clone.clone();
129129
let body = body_clone.clone();
130130
async move {
131-
let res = HTTP_CLIENT
131+
let res = INTRA_CLUSTER_CLIENT
132132
.put(url)
133133
.headers(headers)
134134
.header(header::AUTHORIZATION, &ingestor.token)
@@ -179,7 +179,7 @@ pub async fn sync_users_with_roles_with_ingestors(
179179
let role_data = role_data.clone();
180180

181181
async move {
182-
let res = HTTP_CLIENT
182+
let res = INTRA_CLUSTER_CLIENT
183183
.put(url)
184184
.header(header::AUTHORIZATION, &ingestor.token)
185185
.header(header::CONTENT_TYPE, "application/json")
@@ -221,7 +221,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &str) -> Result<(), RBA
221221
);
222222

223223
async move {
224-
let res = HTTP_CLIENT
224+
let res = INTRA_CLUSTER_CLIENT
225225
.delete(url)
226226
.header(header::AUTHORIZATION, &ingestor.token)
227227
.send()
@@ -278,7 +278,7 @@ pub async fn sync_user_creation_with_ingestors(
278278
let user_data = user_data.clone();
279279

280280
async move {
281-
let res = HTTP_CLIENT
281+
let res = INTRA_CLUSTER_CLIENT
282282
.post(url)
283283
.header(header::AUTHORIZATION, &ingestor.token)
284284
.header(header::CONTENT_TYPE, "application/json")
@@ -320,7 +320,7 @@ pub async fn sync_password_reset_with_ingestors(username: &str) -> Result<(), RB
320320
);
321321

322322
async move {
323-
let res = HTTP_CLIENT
323+
let res = INTRA_CLUSTER_CLIENT
324324
.post(url)
325325
.header(header::AUTHORIZATION, &ingestor.token)
326326
.header(header::CONTENT_TYPE, "application/json")
@@ -364,7 +364,7 @@ pub async fn sync_role_update_with_ingestors(
364364
let privileges = privileges.clone();
365365

366366
async move {
367-
let res = HTTP_CLIENT
367+
let res = INTRA_CLUSTER_CLIENT
368368
.put(url)
369369
.header(header::AUTHORIZATION, &ingestor.token)
370370
.header(header::CONTENT_TYPE, "application/json")
@@ -491,7 +491,7 @@ pub async fn send_stream_delete_request(
491491
if !utils::check_liveness(&ingestor.domain_name).await {
492492
return Ok(());
493493
}
494-
let resp = HTTP_CLIENT
494+
let resp = INTRA_CLUSTER_CLIENT
495495
.delete(url)
496496
.header(header::CONTENT_TYPE, "application/json")
497497
.header(header::AUTHORIZATION, ingestor.token)
@@ -529,7 +529,7 @@ pub async fn send_retention_cleanup_request(
529529
if !utils::check_liveness(&ingestor.domain_name).await {
530530
return Ok(first_event_at);
531531
}
532-
let resp = HTTP_CLIENT
532+
let resp = INTRA_CLUSTER_CLIENT
533533
.post(url)
534534
.header(header::CONTENT_TYPE, "application/json")
535535
.header(header::AUTHORIZATION, ingestor.token)
@@ -636,7 +636,7 @@ async fn fetch_node_info<T: Metadata>(node: &T) -> Result<utils::ClusterInfo, St
636636
))
637637
.expect("should always be a valid url");
638638

639-
let resp = HTTP_CLIENT
639+
let resp = INTRA_CLUSTER_CLIENT
640640
.get(uri)
641641
.header(header::AUTHORIZATION, node.token().to_owned())
642642
.header(header::CONTENT_TYPE, "application/json")
@@ -855,7 +855,7 @@ where
855855
}
856856

857857
// Fetch metrics
858-
let res = HTTP_CLIENT
858+
let res = INTRA_CLUSTER_CLIENT
859859
.get(uri)
860860
.header(header::AUTHORIZATION, node.token())
861861
.header(header::CONTENT_TYPE, "application/json")

src/handlers/http/cluster/utils.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use crate::{
2020
handlers::http::{base_path_without_preceding_slash, modal::NodeType},
21-
HTTP_CLIENT,
21+
INTRA_CLUSTER_CLIENT,
2222
};
2323
use actix_web::http::header;
2424
use chrono::{DateTime, Utc};
@@ -188,7 +188,7 @@ pub async fn check_liveness(domain_name: &str) -> bool {
188188
}
189189
};
190190

191-
let req = HTTP_CLIENT
191+
let req = INTRA_CLUSTER_CLIENT
192192
.get(uri)
193193
.header(header::CONTENT_TYPE, "application/json")
194194
.send()

src/handlers/http/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use itertools::Itertools;
2525
use modal::{NodeMetadata, NodeType};
2626
use serde_json::Value;
2727

28-
use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT};
28+
use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, INTRA_CLUSTER_CLIENT};
2929

3030
use self::query::Query;
3131

@@ -119,7 +119,7 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec
119119
base_path_without_preceding_slash(),
120120
"query"
121121
);
122-
let reqw = HTTP_CLIENT
122+
let reqw = INTRA_CLUSTER_CLIENT
123123
.post(uri)
124124
.json(query)
125125
.header(http::header::AUTHORIZATION, im.token.clone())

src/lib.rs

+20
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pub use handlers::http::modal::{
5656
ingest_server::IngestServer, query_server::QueryServer, server::Server, ParseableServer,
5757
};
5858
use once_cell::sync::Lazy;
59+
use parseable::PARSEABLE;
5960
use reqwest::{Client, ClientBuilder};
6061

6162
// It is very unlikely that panic will occur when dealing with locks.
@@ -85,3 +86,22 @@ pub static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
8586
.build()
8687
.expect("Construction of client shouldn't fail")
8788
});
89+
90+
//separate client is created for intra cluster communication
91+
//allow invalid certificates for connecting other nodes in the cluster
92+
//required when querier/prism server tries to connect to other nodes via IP address directly
93+
//but the certificate is valid for a specific domain name
94+
pub static INTRA_CLUSTER_CLIENT: Lazy<Client> = Lazy::new(|| {
95+
ClientBuilder::new()
96+
.connect_timeout(Duration::from_secs(3)) // set a timeout of 3s for each connection setup
97+
.timeout(Duration::from_secs(30)) // set a timeout of 30s for each request
98+
.pool_idle_timeout(Duration::from_secs(90)) // set a timeout of 90s for each idle connection
99+
.pool_max_idle_per_host(32) // max 32 idle connections per host
100+
.gzip(true) // gzip compress for all requests
101+
.brotli(true) // brotli compress for all requests
102+
.use_rustls_tls() // use only the rustls backend
103+
.http1_only() // use only http/1.1
104+
.danger_accept_invalid_certs(PARSEABLE.options.tls_skip_verify)
105+
.build()
106+
.expect("Construction of client shouldn't fail")
107+
});

src/metrics/prom_utils.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::handlers::http::ingest::PostError;
2121
use crate::handlers::http::modal::Metadata;
2222
use crate::option::Mode;
2323
use crate::parseable::PARSEABLE;
24-
use crate::HTTP_CLIENT;
24+
use crate::INTRA_CLUSTER_CLIENT;
2525
use actix_web::http::header;
2626
use chrono::NaiveDateTime;
2727
use chrono::Utc;
@@ -237,7 +237,7 @@ impl Metrics {
237237
.map_err(|err| {
238238
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
239239
})?;
240-
let res = HTTP_CLIENT
240+
let res = INTRA_CLUSTER_CLIENT
241241
.get(uri)
242242
.header(header::CONTENT_TYPE, "application/json")
243243
.header(header::AUTHORIZATION, metadata.token())

0 commit comments

Comments
 (0)