@@ -26,6 +26,7 @@ use crate::handlers::http::logstream::error::StreamError;
26
26
use crate :: option:: CONFIG ;
27
27
28
28
use crate :: metrics:: prom_utils:: Metrics ;
29
+ use crate :: rbac:: user:: User ;
29
30
use crate :: stats:: Stats ;
30
31
use crate :: storage:: object_storage:: ingestor_metadata_path;
31
32
use crate :: storage:: { ObjectStorageError , STREAM_ROOT_DIRECTORY } ;
@@ -39,13 +40,15 @@ use itertools::Itertools;
39
40
use relative_path:: RelativePathBuf ;
40
41
use serde:: de:: Error ;
41
42
use serde_json:: error:: Error as SerdeError ;
42
- use serde_json:: Value as JsonValue ;
43
+ use serde_json:: { to_vec , Value as JsonValue } ;
43
44
use url:: Url ;
44
45
type IngestorMetadataArr = Vec < IngestorMetadata > ;
45
46
46
47
use self :: utils:: StorageStats ;
47
48
48
49
use super :: base_path_without_preceding_slash;
50
+ use super :: rbac:: RBACError ;
51
+ use std:: collections:: HashSet ;
49
52
use std:: time:: Duration ;
50
53
51
54
use super :: modal:: IngestorMetadata ;
@@ -94,7 +97,7 @@ pub async fn sync_cache_with_ingestors(
94
97
Ok ( ( ) )
95
98
}
96
99
97
- // forward the request to all ingestors to keep them in sync
100
+ // forward the create/update stream request to all ingestors to keep them in sync
98
101
pub async fn sync_streams_with_ingestors (
99
102
headers : HeaderMap ,
100
103
body : Bytes ,
@@ -142,7 +145,218 @@ pub async fn sync_streams_with_ingestors(
142
145
log:: error!(
143
146
"failed to forward upsert stream request to ingestor: {}\n Response Returned: {:?}" ,
144
147
ingestor. domain_name,
145
- res
148
+ res. text( ) . await
149
+ ) ;
150
+ }
151
+ }
152
+
153
+ Ok ( ( ) )
154
+ }
155
+
156
+ // forward the role update request to all ingestors to keep them in sync
157
+ pub async fn sync_users_with_roles_with_ingestors (
158
+ username : & String ,
159
+ role : & HashSet < String > ,
160
+ ) -> Result < ( ) , RBACError > {
161
+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
162
+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
163
+ RBACError :: Anyhow ( err)
164
+ } ) ?;
165
+
166
+ let client = reqwest:: Client :: new ( ) ;
167
+ let role = to_vec ( & role. clone ( ) ) . map_err ( |err| {
168
+ log:: error!( "Fatal: failed to serialize role: {:?}" , err) ;
169
+ RBACError :: SerdeError ( err)
170
+ } ) ?;
171
+ for ingestor in ingestor_infos. iter ( ) {
172
+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
173
+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
174
+ continue ;
175
+ }
176
+ let url = format ! (
177
+ "{}{}/user/{}/role" ,
178
+ ingestor. domain_name,
179
+ base_path_without_preceding_slash( ) ,
180
+ username
181
+ ) ;
182
+
183
+ let res = client
184
+ . put ( url)
185
+ . header ( header:: AUTHORIZATION , & ingestor. token )
186
+ . header ( header:: CONTENT_TYPE , "application/json" )
187
+ . body ( role. clone ( ) )
188
+ . send ( )
189
+ . await
190
+ . map_err ( |err| {
191
+ log:: error!(
192
+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
193
+ ingestor. domain_name,
194
+ err
195
+ ) ;
196
+ RBACError :: Network ( err)
197
+ } ) ?;
198
+
199
+ if !res. status ( ) . is_success ( ) {
200
+ log:: error!(
201
+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
202
+ ingestor. domain_name,
203
+ res. text( ) . await
204
+ ) ;
205
+ }
206
+ }
207
+
208
+ Ok ( ( ) )
209
+ }
210
+
211
+ // forward the delete user request to all ingestors to keep them in sync
212
+ pub async fn sync_user_deletion_with_ingestors ( username : & String ) -> Result < ( ) , RBACError > {
213
+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
214
+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
215
+ RBACError :: Anyhow ( err)
216
+ } ) ?;
217
+
218
+ let client = reqwest:: Client :: new ( ) ;
219
+ for ingestor in ingestor_infos. iter ( ) {
220
+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
221
+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
222
+ continue ;
223
+ }
224
+ let url = format ! (
225
+ "{}{}/user/{}" ,
226
+ ingestor. domain_name,
227
+ base_path_without_preceding_slash( ) ,
228
+ username
229
+ ) ;
230
+
231
+ let res = client
232
+ . delete ( url)
233
+ . header ( header:: AUTHORIZATION , & ingestor. token )
234
+ . send ( )
235
+ . await
236
+ . map_err ( |err| {
237
+ log:: error!(
238
+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
239
+ ingestor. domain_name,
240
+ err
241
+ ) ;
242
+ RBACError :: Network ( err)
243
+ } ) ?;
244
+
245
+ if !res. status ( ) . is_success ( ) {
246
+ log:: error!(
247
+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
248
+ ingestor. domain_name,
249
+ res. text( ) . await
250
+ ) ;
251
+ }
252
+ }
253
+
254
+ Ok ( ( ) )
255
+ }
256
+
257
+ // forward the create user request to all ingestors to keep them in sync
258
+ pub async fn sync_user_creation_with_ingestors (
259
+ user : User ,
260
+ role : & Option < HashSet < String > > ,
261
+ ) -> Result < ( ) , RBACError > {
262
+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
263
+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
264
+ RBACError :: Anyhow ( err)
265
+ } ) ?;
266
+
267
+ let mut user = user. clone ( ) ;
268
+
269
+ if let Some ( role) = role {
270
+ user. roles . clone_from ( role) ;
271
+ }
272
+ let username = user. username ( ) ;
273
+ let client = reqwest:: Client :: new ( ) ;
274
+
275
+ let user = to_vec ( & user) . map_err ( |err| {
276
+ log:: error!( "Fatal: failed to serialize user: {:?}" , err) ;
277
+ RBACError :: SerdeError ( err)
278
+ } ) ?;
279
+
280
+ for ingestor in ingestor_infos. iter ( ) {
281
+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
282
+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
283
+ continue ;
284
+ }
285
+ let url = format ! (
286
+ "{}{}/user/{}" ,
287
+ ingestor. domain_name,
288
+ base_path_without_preceding_slash( ) ,
289
+ username
290
+ ) ;
291
+
292
+ let res = client
293
+ . post ( url)
294
+ . header ( header:: AUTHORIZATION , & ingestor. token )
295
+ . header ( header:: CONTENT_TYPE , "application/json" )
296
+ . body ( user. clone ( ) )
297
+ . send ( )
298
+ . await
299
+ . map_err ( |err| {
300
+ log:: error!(
301
+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
302
+ ingestor. domain_name,
303
+ err
304
+ ) ;
305
+ RBACError :: Network ( err)
306
+ } ) ?;
307
+
308
+ if !res. status ( ) . is_success ( ) {
309
+ log:: error!(
310
+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
311
+ ingestor. domain_name,
312
+ res. text( ) . await
313
+ ) ;
314
+ }
315
+ }
316
+
317
+ Ok ( ( ) )
318
+ }
319
+
320
+ // forward the password reset request to all ingestors to keep them in sync
321
+ pub async fn sync_password_reset_with_ingestors ( username : & String ) -> Result < ( ) , RBACError > {
322
+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
323
+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
324
+ RBACError :: Anyhow ( err)
325
+ } ) ?;
326
+ let client = reqwest:: Client :: new ( ) ;
327
+
328
+ for ingestor in ingestor_infos. iter ( ) {
329
+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
330
+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
331
+ continue ;
332
+ }
333
+ let url = format ! (
334
+ "{}{}/user/{}/generate-new-password" ,
335
+ ingestor. domain_name,
336
+ base_path_without_preceding_slash( ) ,
337
+ username
338
+ ) ;
339
+
340
+ let res = client
341
+ . post ( url)
342
+ . header ( header:: AUTHORIZATION , & ingestor. token )
343
+ . header ( header:: CONTENT_TYPE , "application/json" )
344
+ . send ( )
345
+ . await
346
+ . map_err ( |err| {
347
+ log:: error!(
348
+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
349
+ ingestor. domain_name,
350
+ err
351
+ ) ;
352
+ RBACError :: Network ( err)
353
+ } ) ?;
354
+
355
+ if !res. status ( ) . is_success ( ) {
356
+ log:: error!(
357
+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
358
+ ingestor. domain_name,
359
+ res. text( ) . await
146
360
) ;
147
361
}
148
362
}
0 commit comments