@@ -25,7 +25,6 @@ use anyhow::{bail, Context};
25
25
use chitchat:: transport:: ChannelTransport ;
26
26
use chitchat:: FailureDetectorConfig ;
27
27
use quickwit_actors:: { ActorHandle , Mailbox , Universe } ;
28
- use quickwit_cli:: run_index_checklist;
29
28
use quickwit_cluster:: { Cluster , ClusterMember } ;
30
29
use quickwit_common:: pubsub:: EventBroker ;
31
30
use quickwit_common:: runtimes:: RuntimesConfig ;
@@ -34,7 +33,7 @@ use quickwit_config::merge_policy_config::MergePolicyConfig;
34
33
use quickwit_config:: service:: QuickwitService ;
35
34
use quickwit_config:: {
36
35
load_index_config_from_user_config, ConfigFormat , IndexConfig , NodeConfig , SourceConfig ,
37
- SourceInputFormat , SourceParams , TransformConfig , CLI_SOURCE_ID ,
36
+ SourceInputFormat , SourceParams , TransformConfig ,
38
37
} ;
39
38
use quickwit_index_management:: IndexService ;
40
39
use quickwit_indexing:: actors:: {
@@ -44,10 +43,13 @@ use quickwit_indexing::models::{DetachIndexingPipeline, DetachMergePipeline, Spa
44
43
use quickwit_indexing:: IndexingPipeline ;
45
44
use quickwit_ingest:: IngesterPool ;
46
45
use quickwit_janitor:: { start_janitor_service, JanitorService } ;
47
- use quickwit_metastore:: CreateIndexRequestExt ;
46
+ use quickwit_metastore:: {
47
+ CreateIndexRequestExt , CreateIndexResponseExt , IndexMetadata , IndexMetadataResponseExt ,
48
+ } ;
48
49
use quickwit_proto:: indexing:: CpuCapacity ;
49
50
use quickwit_proto:: metastore:: {
50
- CreateIndexRequest , MetastoreError , MetastoreService , MetastoreServiceClient ,
51
+ CreateIndexRequest , IndexMetadataRequest , MetastoreError , MetastoreService ,
52
+ MetastoreServiceClient , ResetSourceCheckpointRequest ,
51
53
} ;
52
54
use quickwit_proto:: types:: { NodeId , PipelineUid } ;
53
55
use quickwit_search:: SearchJobPlacer ;
@@ -56,7 +58,11 @@ use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, Teleme
56
58
use tracing:: { debug, info, instrument} ;
57
59
58
60
use crate :: environment:: INDEX_ID ;
59
- use crate :: indexer:: environment:: { DISABLE_JANITOR , DISABLE_MERGE , INDEX_CONFIG_URI } ;
61
+ use crate :: indexer:: environment:: {
62
+ DISABLE_JANITOR , DISABLE_MERGE , INDEX_CONFIG_URI , MAX_CHECKPOINTS ,
63
+ } ;
64
+
65
+ const LAMBDA_SOURCE_ID : & str = "_ingest-lambda-source" ;
60
66
61
67
/// The indexing service needs to update its cluster chitchat state so that the control plane is
62
68
/// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
@@ -131,66 +137,71 @@ pub(super) async fn send_telemetry() {
131
137
quickwit_telemetry:: send_telemetry_event ( TelemetryEvent :: RunCommand ) . await ;
132
138
}
133
139
134
- pub ( super ) fn configure_source (
140
+ /// Convert the incomming file path to a source config
141
+ pub ( super ) async fn configure_source (
135
142
input_path : PathBuf ,
136
143
input_format : SourceInputFormat ,
137
144
vrl_script : Option < String > ,
138
- ) -> SourceConfig {
139
- let source_params = SourceParams :: file ( input_path) ;
145
+ ) -> anyhow:: Result < SourceConfig > {
140
146
let transform_config = vrl_script. map ( |vrl_script| TransformConfig :: new ( vrl_script, None ) ) ;
141
- SourceConfig {
142
- source_id : CLI_SOURCE_ID . to_string ( ) ,
147
+ let source_params = SourceParams :: file ( input_path. clone ( ) ) ;
148
+ Ok ( SourceConfig {
149
+ source_id : LAMBDA_SOURCE_ID . to_owned ( ) ,
143
150
num_pipelines : NonZeroUsize :: new ( 1 ) . expect ( "1 is always non-zero." ) ,
144
151
enabled : true ,
145
152
source_params,
146
153
transform_config,
147
154
input_format,
148
- }
155
+ } )
149
156
}
150
157
151
158
/// Check if the index exists, creating or overwriting it if necessary
152
159
pub ( super ) async fn init_index_if_necessary (
153
160
metastore : & mut MetastoreServiceClient ,
154
161
storage_resolver : & StorageResolver ,
155
- source_config : & SourceConfig ,
156
162
default_index_root_uri : & Uri ,
157
163
overwrite : bool ,
158
- ) -> anyhow:: Result < ( ) > {
159
- let checklist_result =
160
- run_index_checklist ( metastore, storage_resolver, & INDEX_ID , Some ( source_config) ) . await ;
161
- if let Err ( e) = checklist_result {
162
- let is_not_found = e
163
- . downcast_ref ( )
164
- . is_some_and ( |meta_error| matches ! ( meta_error, MetastoreError :: NotFound ( _) ) ) ;
165
- if !is_not_found {
166
- bail ! ( e) ;
164
+ ) -> anyhow:: Result < IndexMetadata > {
165
+ let metadata_result = metastore
166
+ . index_metadata ( IndexMetadataRequest :: for_index_id ( INDEX_ID . clone ( ) ) )
167
+ . await ;
168
+ let metadata = match metadata_result {
169
+ Ok ( _) if overwrite => {
170
+ info ! (
171
+ index_id = * INDEX_ID ,
172
+ "Overwrite enabled, clearing existing index" ,
173
+ ) ;
174
+ let mut index_service = IndexService :: new ( metastore. clone ( ) , storage_resolver. clone ( ) ) ;
175
+ index_service. clear_index ( & INDEX_ID ) . await ?;
176
+ metastore
177
+ . index_metadata ( IndexMetadataRequest :: for_index_id ( INDEX_ID . clone ( ) ) )
178
+ . await ?
179
+ . deserialize_index_metadata ( ) ?
167
180
}
168
- info ! (
169
- index_id = * INDEX_ID ,
170
- index_config_uri = * INDEX_CONFIG_URI ,
171
- "Index not found, creating it"
172
- ) ;
173
- let index_config = load_index_config ( storage_resolver, default_index_root_uri) . await ?;
174
- if index_config. index_id != * INDEX_ID {
175
- bail ! (
176
- "Expected index ID was {} but config file had {}" ,
177
- * INDEX_ID ,
178
- index_config. index_id,
181
+ Ok ( metadata_resp) => metadata_resp. deserialize_index_metadata ( ) ?,
182
+ Err ( MetastoreError :: NotFound ( _) ) => {
183
+ info ! (
184
+ index_id = * INDEX_ID ,
185
+ index_config_uri = * INDEX_CONFIG_URI ,
186
+ "Index not found, creating it"
179
187
) ;
188
+ let index_config = load_index_config ( storage_resolver, default_index_root_uri) . await ?;
189
+ if index_config. index_id != * INDEX_ID {
190
+ bail ! (
191
+ "Expected index ID was {} but config file had {}" ,
192
+ * INDEX_ID ,
193
+ index_config. index_id,
194
+ ) ;
195
+ }
196
+ let create_resp = metastore
197
+ . create_index ( CreateIndexRequest :: try_from_index_config ( & index_config) ?)
198
+ . await ?;
199
+ info ! ( "index created" ) ;
200
+ create_resp. deserialize_index_metadata ( ) ?
180
201
}
181
- metastore
182
- . create_index ( CreateIndexRequest :: try_from_index_config ( & index_config) ?)
183
- . await ?;
184
- info ! ( "index created" ) ;
185
- } else if overwrite {
186
- info ! (
187
- index_id = * INDEX_ID ,
188
- "Overwrite enabled, clearing existing index" ,
189
- ) ;
190
- let mut index_service = IndexService :: new ( metastore. clone ( ) , storage_resolver. clone ( ) ) ;
191
- index_service. clear_index ( & INDEX_ID ) . await ?;
192
- }
193
- Ok ( ( ) )
202
+ Err ( e) => bail ! ( e) ,
203
+ } ;
204
+ Ok ( metadata)
194
205
}
195
206
196
207
pub ( super ) async fn spawn_services (
@@ -249,6 +260,7 @@ pub(super) async fn spawn_services(
249
260
Ok ( ( indexing_service_handle, janitor_service_opt) )
250
261
}
251
262
263
+ /// Spawn and split an indexing pipeline
252
264
pub ( super ) async fn spawn_pipelines (
253
265
indexing_server_mailbox : & Mailbox < IndexingService > ,
254
266
source_config : SourceConfig ,
@@ -271,6 +283,43 @@ pub(super) async fn spawn_pipelines(
271
283
Ok ( ( indexing_pipeline_handle, merge_pipeline_handle) )
272
284
}
273
285
286
+ /// Prune old Lambda file checkpoints if there are too many
287
+ ///
288
+ /// Without pruning checkpoints accumulate indifinitely. This is particularly
289
+ /// problematic when indexing a lot of small files, as the metastore will grow
290
+ /// large even for a small index.
291
+ ///
292
+ /// The current implementation just deletes all checkpoints if there are more
293
+ /// than QW_LAMBDA_MAX_CHECKPOINTS. When this purging is performed, the Lambda
294
+ /// indexer might ingest the same file again if it receives a duplicate
295
+ /// notification.
296
+ pub ( super ) async fn prune_lambda_source (
297
+ metastore : & mut MetastoreServiceClient ,
298
+ index_metadata : IndexMetadata ,
299
+ ) -> anyhow:: Result < ( ) > {
300
+ let lambda_checkpoint_opt = index_metadata
301
+ . checkpoint
302
+ . source_checkpoint ( LAMBDA_SOURCE_ID ) ;
303
+
304
+ if let Some ( lambda_checkpoint) = lambda_checkpoint_opt {
305
+ if lambda_checkpoint. num_partitions ( ) > * MAX_CHECKPOINTS {
306
+ info ! (
307
+ partitions = lambda_checkpoint. num_partitions( ) ,
308
+ "prune Lambda checkpoints"
309
+ ) ;
310
+ metastore
311
+ . reset_source_checkpoint ( ResetSourceCheckpointRequest {
312
+ index_uid : Some ( index_metadata. index_uid . clone ( ) ) ,
313
+ source_id : LAMBDA_SOURCE_ID . to_owned ( ) ,
314
+ } )
315
+ . await ?;
316
+ }
317
+ }
318
+
319
+ Ok ( ( ) )
320
+ }
321
+
322
+ /// Observe the merge pipeline until there are no more ongoing merges
274
323
pub ( super ) async fn wait_for_merges (
275
324
merge_pipeline_handle : ActorHandle < MergePipeline > ,
276
325
) -> anyhow:: Result < ( ) > {
0 commit comments