@@ -20,6 +20,7 @@ pub mod utils;
20
20
21
21
use futures:: { future, stream, StreamExt } ;
22
22
use std:: collections:: HashSet ;
23
+ use std:: sync:: Arc ;
23
24
use std:: time:: Duration ;
24
25
25
26
use actix_web:: http:: header:: { self , HeaderMap } ;
@@ -31,7 +32,7 @@ use clokwerk::{AsyncScheduler, Interval};
31
32
use http:: { header as http_header, StatusCode } ;
32
33
use itertools:: Itertools ;
33
34
use relative_path:: RelativePathBuf ;
34
- use serde:: de:: Error ;
35
+ use serde:: de:: { DeserializeOwned , Error } ;
35
36
use serde_json:: error:: Error as SerdeError ;
36
37
use serde_json:: { to_vec, Value as JsonValue } ;
37
38
use tracing:: { error, info, warn} ;
@@ -45,7 +46,8 @@ use crate::rbac::role::model::DefaultPrivilege;
45
46
use crate :: rbac:: user:: User ;
46
47
use crate :: stats:: Stats ;
47
48
use crate :: storage:: {
48
- ObjectStorageError , ObjectStoreFormat , PARSEABLE_ROOT_DIRECTORY , STREAM_ROOT_DIRECTORY ,
49
+ ObjectStorage , ObjectStorageError , ObjectStoreFormat , PARSEABLE_ROOT_DIRECTORY ,
50
+ STREAM_ROOT_DIRECTORY ,
49
51
} ;
50
52
use crate :: HTTP_CLIENT ;
51
53
@@ -642,6 +644,9 @@ async fn fetch_nodes_info<T: Metadata>(
642
644
nodes : Vec < T > ,
643
645
) -> Result < Vec < utils:: ClusterInfo > , StreamError > {
644
646
let nodes_len = nodes. len ( ) ;
647
+ if nodes_len == 0 {
648
+ return Ok ( vec ! [ ] ) ;
649
+ }
645
650
let results = stream:: iter ( nodes)
646
651
. map ( |node| async move { fetch_node_info ( & node) . await } )
647
652
. buffer_unordered ( nodes_len) // No concurrency limit
@@ -702,52 +707,71 @@ pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
702
707
Ok ( arr)
703
708
}
704
709
705
- pub async fn remove_ingestor ( ingestor : Path < String > ) -> Result < impl Responder , PostError > {
706
- let domain_name = to_url_string ( ingestor . into_inner ( ) ) ;
710
+ pub async fn remove_node ( node_url : Path < String > ) -> Result < impl Responder , PostError > {
711
+ let domain_name = to_url_string ( node_url . into_inner ( ) ) ;
707
712
708
713
if check_liveness ( & domain_name) . await {
709
714
return Err ( PostError :: Invalid ( anyhow:: anyhow!(
710
- "The ingestor is currently live and cannot be removed"
715
+ "The node is currently live and cannot be removed"
711
716
) ) ) ;
712
717
}
713
718
let object_store = PARSEABLE . storage . get_object_store ( ) ;
714
719
715
- let ingestor_metadatas = object_store
720
+ // Delete ingestor metadata
721
+ let removed_ingestor =
722
+ remove_node_metadata :: < IngestorMetadata > ( & object_store, & domain_name) . await ?;
723
+
724
+ // Delete indexer metadata
725
+ let removed_indexer =
726
+ remove_node_metadata :: < IndexerMetadata > ( & object_store, & domain_name) . await ?;
727
+
728
+ let msg = if removed_ingestor || removed_indexer {
729
+ format ! ( "node {} removed successfully" , domain_name)
730
+ } else {
731
+ format ! ( "node {} is not found" , domain_name)
732
+ } ;
733
+
734
+ info ! ( "{}" , & msg) ;
735
+ Ok ( ( msg, StatusCode :: OK ) )
736
+ }
737
+
738
+ // Helper function to remove a specific type of node metadata
739
+ async fn remove_node_metadata < T : Metadata + DeserializeOwned + Default > (
740
+ object_store : & Arc < dyn ObjectStorage > ,
741
+ domain_name : & str ,
742
+ ) -> Result < bool , PostError > {
743
+ let node_type = T :: default ( ) . node_type ( ) . to_string ( ) ;
744
+
745
+ let metadatas = object_store
716
746
. get_objects (
717
747
Some ( & RelativePathBuf :: from ( PARSEABLE_ROOT_DIRECTORY ) ) ,
718
- Box :: new ( |file_name| file_name. starts_with ( "ingestor" ) ) ,
748
+ Box :: new ( move |file_name| file_name. starts_with ( & node_type ) ) ,
719
749
)
720
750
. await ?;
721
751
722
- let ingestor_metadata = ingestor_metadatas
752
+ let node_metadatas = metadatas
723
753
. iter ( )
724
- . map ( |elem| serde_json:: from_slice :: < IngestorMetadata > ( elem) . unwrap_or_default ( ) )
725
- . collect_vec ( ) ;
754
+ . filter_map ( |elem| match serde_json:: from_slice :: < T > ( elem) {
755
+ Ok ( meta) if meta. domain_name ( ) == domain_name => Some ( meta) ,
756
+ _ => None ,
757
+ } )
758
+ . collect :: < Vec < _ > > ( ) ;
726
759
727
- let ingestor_metadata = ingestor_metadata
728
- . iter ( )
729
- . filter ( |elem| elem. domain_name == domain_name)
730
- . collect_vec ( ) ;
760
+ if node_metadatas. is_empty ( ) {
761
+ return Ok ( false ) ;
762
+ }
731
763
732
- let ingestor_meta_filename = ingestor_metadata[ 0 ] . file_path ( ) . to_string ( ) ;
733
- let msg = match object_store
734
- . try_delete_ingestor_meta ( ingestor_meta_filename)
735
- . await
736
- {
737
- Ok ( _) => {
738
- format ! ( "Ingestor {} removed successfully" , domain_name)
739
- }
764
+ let node_meta_filename = node_metadatas[ 0 ] . file_path ( ) . to_string ( ) ;
765
+ match object_store. try_delete_node_meta ( node_meta_filename) . await {
766
+ Ok ( _) => Ok ( true ) ,
740
767
Err ( err) => {
741
768
if matches ! ( err, ObjectStorageError :: IoError ( _) ) {
742
- format ! ( "Ingestor {} is not found" , domain_name )
769
+ Ok ( false )
743
770
} else {
744
- format ! ( "Error removing ingestor {} \n Reason: {}" , domain_name , err)
771
+ Err ( PostError :: ObjectStorageError ( err) )
745
772
}
746
773
}
747
- } ;
748
-
749
- info ! ( "{}" , & msg) ;
750
- Ok ( ( msg, StatusCode :: OK ) )
774
+ }
751
775
}
752
776
753
777
/// Fetches metrics from a node (ingestor or indexer)
0 commit comments