Skip to content

Commit 5d7cd68

Browse files
chore: separate handlers for different modes (#937)
Co-authored-by: Nikhil Sinha <[email protected]>
1 parent 32412bf commit 5d7cd68

24 files changed

+1844
-1085
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -59,46 +59,6 @@ pub const INTERNAL_STREAM_NAME: &str = "pmeta";
5959

6060
const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
6161

62-
pub async fn sync_cache_with_ingestors(
63-
url: &str,
64-
ingestor: IngestorMetadata,
65-
body: bool,
66-
) -> Result<(), StreamError> {
67-
if !utils::check_liveness(&ingestor.domain_name).await {
68-
return Ok(());
69-
}
70-
let request_body: Bytes = Bytes::from(body.to_string());
71-
let client = reqwest::Client::new();
72-
let resp = client
73-
.put(url)
74-
.header(header::CONTENT_TYPE, "application/json")
75-
.header(header::AUTHORIZATION, ingestor.token)
76-
.body(request_body)
77-
.send()
78-
.await
79-
.map_err(|err| {
80-
// log the error and return a custom error
81-
log::error!(
82-
"Fatal: failed to set cache: {}\n Error: {:?}",
83-
ingestor.domain_name,
84-
err
85-
);
86-
StreamError::Network(err)
87-
})?;
88-
89-
// if the response is not successful, log the error and return a custom error
90-
// this could be a bit too much, but we need to be sure it covers all cases
91-
if !resp.status().is_success() {
92-
log::error!(
93-
"failed to set cache: {}\nResponse Returned: {:?}",
94-
ingestor.domain_name,
95-
resp.text().await
96-
);
97-
}
98-
99-
Ok(())
100-
}
101-
10262
// forward the create/update stream request to all ingestors to keep them in sync
10363
pub async fn sync_streams_with_ingestors(
10464
headers: HeaderMap,
@@ -122,7 +82,7 @@ pub async fn sync_streams_with_ingestors(
12282
continue;
12383
}
12484
let url = format!(
125-
"{}{}/logstream/{}",
85+
"{}{}/logstream/{}/sync",
12686
ingestor.domain_name,
12787
base_path_without_preceding_slash(),
12888
stream_name
@@ -176,7 +136,7 @@ pub async fn sync_users_with_roles_with_ingestors(
176136
continue;
177137
}
178138
let url = format!(
179-
"{}{}/user/{}/role",
139+
"{}{}/user/{}/role/sync",
180140
ingestor.domain_name,
181141
base_path_without_preceding_slash(),
182142
username
@@ -224,7 +184,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(),
224184
continue;
225185
}
226186
let url = format!(
227-
"{}{}/user/{}",
187+
"{}{}/user/{}/sync",
228188
ingestor.domain_name,
229189
base_path_without_preceding_slash(),
230190
username
@@ -285,7 +245,7 @@ pub async fn sync_user_creation_with_ingestors(
285245
continue;
286246
}
287247
let url = format!(
288-
"{}{}/user/{}",
248+
"{}{}/user/{}/sync",
289249
ingestor.domain_name,
290250
base_path_without_preceding_slash(),
291251
username
@@ -333,7 +293,7 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
333293
continue;
334294
}
335295
let url = format!(
336-
"{}{}/user/{}/generate-new-password",
296+
"{}{}/user/{}/generate-new-password/sync",
337297
ingestor.domain_name,
338298
base_path_without_preceding_slash(),
339299
username
@@ -389,7 +349,7 @@ pub async fn sync_role_update_with_ingestors(
389349
continue;
390350
}
391351
let url = format!(
392-
"{}{}/role/{}",
352+
"{}{}/role/{}/sync",
393353
ingestor.domain_name,
394354
base_path_without_preceding_slash(),
395355
name

0 commit comments

Comments
 (0)