Skip to content

Commit 0ba0fff

Browse files
committed
Simplify source cleanup rule
1 parent 4af8a0e commit 0ba0fff

File tree

2 files changed

+22
-21
lines changed

2 files changed

+22
-21
lines changed

quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,8 @@ pub(super) async fn prune_file_sources(
326326
metastore: &mut MetastoreServiceClient,
327327
index_metadata: IndexMetadata,
328328
) -> anyhow::Result<()> {
329-
let prunable_sources = filter_prunable_lambda_source_ids(index_metadata.sources.keys())?;
329+
let prunable_sources: Vec<_> =
330+
filter_prunable_lambda_source_ids(index_metadata.sources.keys())?.collect();
330331
for src_id in prunable_sources {
331332
metastore
332333
.delete_source(DeleteSourceRequest {

quickwit/quickwit-lambda/src/indexer/ingest/source_id.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ use anyhow::{bail, Context};
2323
use chrono::{DateTime, LocalResult, TimeZone, Utc};
2424

2525
const LAMBDA_SOURCE_ID_PREFIX: &str = "ingest-lambda-source-";
26-
const MIN_FILE_SOURCES_TO_KEEP: usize = 20;
27-
const MIN_FILE_SOURCE_RETENTION_HOURS: usize = 12;
26+
27+
/// This duration should be large enough to prevent repeated notification
28+
/// deliveries from causing duplicates
29+
const FILE_SOURCE_RETENTION_HOURS: usize = 6;
2830

2931
/// Create a source id for a Lambda file source, with the provided timestamp encoded in it
3032
pub(crate) fn create_lambda_source_id(time: DateTime<Utc>) -> String {
@@ -42,9 +44,8 @@ fn parse_source_id_timestamp(source_id: &str) -> anyhow::Result<DateTime<Utc>> {
4244
}
4345
}
4446

45-
/// Parse the provided source ids and return the ones that are prunable:
46-
/// - There are at least `MIN_FILE_SOURCES_TO_KEEP` Lambda file sources
47-
/// - The file source is older than `MIN_FILE_SOURCE_RETENTION_HOURS`` hours
47+
/// Parse the provided source ids and return those where the file source is
48+
/// older than `MIN_FILE_SOURCE_RETENTION_HOURS` hours
4849
pub(crate) fn filter_prunable_lambda_source_ids<'a>(
4950
source_ids: impl Iterator<Item = &'a String>,
5051
) -> anyhow::Result<impl Iterator<Item = &'a String>> {
@@ -56,8 +57,7 @@ pub(crate) fn filter_prunable_lambda_source_ids<'a>(
5657
let prunable_sources = src_timestamps
5758
.into_iter()
5859
.rev()
59-
.skip(MIN_FILE_SOURCES_TO_KEEP)
60-
.filter(|(ts, _)| (Utc::now() - *ts).num_hours() > MIN_FILE_SOURCE_RETENTION_HOURS as i64)
60+
.filter(|(ts, _)| (Utc::now() - *ts).num_hours() > FILE_SOURCE_RETENTION_HOURS as i64)
6161
.map(|(_, src_id)| src_id);
6262

6363
Ok(prunable_sources)
@@ -86,7 +86,7 @@ mod tests {
8686

8787
#[test]
8888
fn test_dont_filter_recent() {
89-
let source_ids: Vec<String> = (0..MIN_FILE_SOURCES_TO_KEEP + 5)
89+
let source_ids: Vec<String> = (0..20)
9090
.map(|i| {
9191
// only recent timestamps
9292
Utc::now() - chrono::Duration::try_seconds(i as i64).unwrap()
@@ -100,13 +100,11 @@ mod tests {
100100
}
101101

102102
#[test]
103-
fn test_filter_old_but_keep_min_number() {
104-
let source_ids: Vec<String> = (0..MIN_FILE_SOURCES_TO_KEEP + 3)
103+
fn test_filter_old() {
104+
let source_ids: Vec<String> = (0..5)
105105
.map(|i| {
106-
// old timestamps so that MIN_FILE_SOURCES_TO_KEEP is the limit
107-
Utc::now()
108-
- chrono::Duration::try_hours(MIN_FILE_SOURCE_RETENTION_HOURS as i64).unwrap()
109-
- chrono::Duration::try_hours(i as i64).unwrap()
106+
let hours_ago = i * FILE_SOURCE_RETENTION_HOURS * 2;
107+
Utc::now() - chrono::Duration::try_hours(hours_ago as i64).unwrap()
110108
})
111109
.map(create_lambda_source_id)
112110
.collect();
@@ -115,18 +113,20 @@ mod tests {
115113
let prunable_sources = filter_prunable_lambda_source_ids(source_ids.iter())
116114
.unwrap()
117115
.collect::<HashSet<_>>();
118-
assert_eq!(prunable_sources.len(), 3);
119-
for source_id in source_ids.iter().take(3) {
120-
assert!(!prunable_sources.contains(source_id));
116+
assert_eq!(prunable_sources.len(), 4);
117+
assert!(!prunable_sources.contains(&source_ids[0]));
118+
for source_id in source_ids.iter().skip(1) {
119+
assert!(prunable_sources.contains(source_id));
121120
}
122121

123122
// Prune source ids that happen to be from oldest to newst
124123
let prunable_sources = filter_prunable_lambda_source_ids(source_ids.iter().rev())
125124
.unwrap()
126125
.collect::<HashSet<_>>();
127-
assert_eq!(prunable_sources.len(), 3);
128-
for source_id in source_ids.iter().take(3) {
129-
assert!(!prunable_sources.contains(source_id));
126+
assert_eq!(prunable_sources.len(), 4);
127+
assert!(!prunable_sources.contains(&source_ids[0]));
128+
for source_id in source_ids.iter().skip(1) {
129+
assert!(prunable_sources.contains(source_id));
130130
}
131131
}
132132

0 commit comments

Comments
 (0)