@@ -24,7 +24,6 @@ use std::path::{Path, PathBuf};
24
24
use anyhow:: { bail, Context } ;
25
25
use chitchat:: transport:: ChannelTransport ;
26
26
use chitchat:: FailureDetectorConfig ;
27
- use chrono:: Utc ;
28
27
use quickwit_actors:: { ActorHandle , Mailbox , Universe } ;
29
28
use quickwit_cluster:: { Cluster , ClusterMember } ;
30
29
use quickwit_common:: pubsub:: EventBroker ;
@@ -49,20 +48,21 @@ use quickwit_metastore::{
49
48
} ;
50
49
use quickwit_proto:: indexing:: CpuCapacity ;
51
50
use quickwit_proto:: metastore:: {
52
- AddSourceRequest , CreateIndexRequest , DeleteSourceRequest , IndexMetadataRequest ,
53
- MetastoreError , MetastoreService , MetastoreServiceClient ,
51
+ CreateIndexRequest , DeleteSourceRequest , IndexMetadataRequest , MetastoreError ,
52
+ MetastoreService , MetastoreServiceClient ,
54
53
} ;
55
54
use quickwit_proto:: types:: { NodeId , PipelineUid } ;
56
55
use quickwit_search:: SearchJobPlacer ;
57
56
use quickwit_storage:: StorageResolver ;
58
57
use quickwit_telemetry:: payload:: { QuickwitFeature , QuickwitTelemetryInfo , TelemetryEvent } ;
59
58
use tracing:: { debug, info, instrument} ;
60
59
61
- use super :: source_id:: {
62
- create_lambda_source_id, filter_prunable_lambda_source_ids, is_lambda_source_id,
63
- } ;
64
60
use crate :: environment:: INDEX_ID ;
65
- use crate :: indexer:: environment:: { DISABLE_JANITOR , DISABLE_MERGE , INDEX_CONFIG_URI } ;
61
+ use crate :: indexer:: environment:: {
62
+ DISABLE_JANITOR , DISABLE_MERGE , INDEX_CONFIG_URI , MAX_CHECKPOINTS ,
63
+ } ;
64
+
65
+ const LAMBDA_SOURCE_ID : & str = "_ingest-lambda-source" ;
66
66
67
67
/// The indexing service needs to update its cluster chitchat state so that the control plane is
68
68
/// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
@@ -137,60 +137,22 @@ pub(super) async fn send_telemetry() {
137
137
quickwit_telemetry:: send_telemetry_event ( TelemetryEvent :: RunCommand ) . await ;
138
138
}
139
139
140
- /// Convert the incomming file path to a source config and save it to the metastore
141
- ///
142
- /// If a Lambda file source already exists with the same path, format and transform, reuse it.
140
+ /// Convert the incomming file path to a source config
143
141
pub ( super ) async fn configure_source (
144
- metastore : & mut MetastoreServiceClient ,
145
142
input_path : PathBuf ,
146
143
input_format : SourceInputFormat ,
147
- index_metadata : & IndexMetadata ,
148
144
vrl_script : Option < String > ,
149
145
) -> anyhow:: Result < SourceConfig > {
150
146
let transform_config = vrl_script. map ( |vrl_script| TransformConfig :: new ( vrl_script, None ) ) ;
151
147
let source_params = SourceParams :: file ( input_path. clone ( ) ) ;
152
-
153
- let existing_sources_for_config: Vec < _ > = index_metadata
154
- . sources
155
- . iter ( )
156
- . filter ( |( src_id, src_config) | {
157
- is_lambda_source_id ( src_id)
158
- && src_config. source_params == source_params
159
- && src_config. input_format == input_format
160
- && src_config. transform_config == transform_config
161
- } )
162
- . map ( |( src_id, _) | src_id)
163
- . collect ( ) ;
164
-
165
- let source_id = match existing_sources_for_config. len ( ) {
166
- 0 => create_lambda_source_id ( Utc :: now ( ) ) ,
167
- 1 => existing_sources_for_config[ 0 ] . clone ( ) ,
168
- n => bail ! (
169
- "Found {} existing Lambda sources for file {:?}, expected at most 1" ,
170
- n,
171
- input_path,
172
- ) ,
173
- } ;
174
-
175
- let src_config = SourceConfig {
176
- source_id,
148
+ Ok ( SourceConfig {
149
+ source_id : LAMBDA_SOURCE_ID . to_owned ( ) ,
177
150
num_pipelines : NonZeroUsize :: new ( 1 ) . expect ( "1 is always non-zero." ) ,
178
151
enabled : true ,
179
152
source_params,
180
153
transform_config,
181
154
input_format,
182
- } ;
183
-
184
- if existing_sources_for_config. is_empty ( ) {
185
- metastore
186
- . add_source ( AddSourceRequest {
187
- index_uid : Some ( index_metadata. index_uid . clone ( ) ) ,
188
- source_config_json : serde_json:: to_string ( & src_config) ?,
189
- } )
190
- . await ?;
191
- }
192
-
193
- Ok ( src_config)
155
+ } )
194
156
}
195
157
196
158
/// Check if the index exists, creating or overwriting it if necessary
@@ -321,26 +283,39 @@ pub(super) async fn spawn_pipelines(
321
283
Ok ( ( indexing_pipeline_handle, merge_pipeline_handle) )
322
284
}
323
285
324
- /// Delete old Lambda file sources
325
- pub ( super ) async fn prune_file_sources (
286
+ /// Prune old Lambda file checkpoints if there are too many
287
+ ///
288
+ /// Without pruning checkpoints accumulate indifinitely. This is particularly
289
+ /// problematic when indexing a lot of small files, as the metastore will grow
290
+ /// large even for a small index.
291
+ ///
292
+ /// The current implementation just deletes all checkpoints if there are more
293
+ /// than QW_LAMBDA_MAX_CHECKPOINTS. When this purging is performed, the Lambda
294
+ /// indexer might ingest the same file again if it receives a duplicate
295
+ /// notification.
296
+ pub ( super ) async fn prune_lambda_source (
326
297
metastore : & mut MetastoreServiceClient ,
327
298
index_metadata : IndexMetadata ,
328
299
) -> anyhow:: Result < ( ) > {
329
- let prunable_sources: Vec < _ > =
330
- filter_prunable_lambda_source_ids ( index_metadata. sources . keys ( ) ) ?. collect ( ) ;
331
- info ! (
332
- existing = index_metadata. sources. len( ) ,
333
- prunable = prunable_sources. len( ) ,
334
- "prune file sources"
335
- ) ;
336
- for src_id in prunable_sources {
337
- metastore
338
- . delete_source ( DeleteSourceRequest {
339
- index_uid : Some ( index_metadata. index_uid . clone ( ) ) ,
340
- source_id : src_id. clone ( ) ,
341
- } )
342
- . await ?;
300
+ let lambda_checkpoint_opt = index_metadata
301
+ . checkpoint
302
+ . source_checkpoint ( LAMBDA_SOURCE_ID ) ;
303
+
304
+ if let Some ( lambda_checkpoint) = lambda_checkpoint_opt {
305
+ if lambda_checkpoint. num_partitions ( ) > * MAX_CHECKPOINTS {
306
+ info ! (
307
+ partitions = lambda_checkpoint. num_partitions( ) ,
308
+ "prune Lambda checkpoints"
309
+ ) ;
310
+ metastore
311
+ . delete_source ( DeleteSourceRequest {
312
+ index_uid : Some ( index_metadata. index_uid . clone ( ) ) ,
313
+ source_id : LAMBDA_SOURCE_ID . to_owned ( ) ,
314
+ } )
315
+ . await ?;
316
+ }
343
317
}
318
+
344
319
Ok ( ( ) )
345
320
}
346
321
0 commit comments