Skip to content

Commit f6065a4

Browse files
committed
Minor adjustments
1 parent 0ba0fff commit f6065a4

File tree

2 files changed

+16
-24
lines changed

2 files changed

+16
-24
lines changed

quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,13 +321,18 @@ pub(super) async fn spawn_pipelines(
321321
Ok((indexing_pipeline_handle, merge_pipeline_handle))
322322
}
323323

324-
/// Delete Lambda file sources if they are old and there are too many of them
324+
/// Delete old Lambda file sources
325325
pub(super) async fn prune_file_sources(
326326
metastore: &mut MetastoreServiceClient,
327327
index_metadata: IndexMetadata,
328328
) -> anyhow::Result<()> {
329329
let prunable_sources: Vec<_> =
330330
filter_prunable_lambda_source_ids(index_metadata.sources.keys())?.collect();
331+
info!(
332+
existing = index_metadata.sources.len(),
333+
prunable = prunable_sources.len(),
334+
"prune file sources"
335+
);
331336
for src_id in prunable_sources {
332337
metastore
333338
.delete_source(DeleteSourceRequest {

quickwit/quickwit-lambda/src/indexer/ingest/source_id.rs

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,14 @@
1717
// You should have received a copy of the GNU Affero General Public License
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

20-
use std::collections::BTreeMap;
21-
2220
use anyhow::{bail, Context};
2321
use chrono::{DateTime, LocalResult, TimeZone, Utc};
2422

2523
const LAMBDA_SOURCE_ID_PREFIX: &str = "ingest-lambda-source-";
2624

2725
/// This duration should be large enough to prevent repeated notification
2826
/// deliveries from causing duplicates
29-
const FILE_SOURCE_RETENTION_HOURS: usize = 6;
27+
const FILE_SOURCE_RETENTION_SECONDS: i64 = 6 * 3600;
3028

3129
/// Create a source id for a Lambda file source, with the provided timestamp encoded in it
3230
pub(crate) fn create_lambda_source_id(time: DateTime<Utc>) -> String {
@@ -44,20 +42,19 @@ fn parse_source_id_timestamp(source_id: &str) -> anyhow::Result<DateTime<Utc>> {
4442
}
4543
}
4644

47-
/// Parse the provided source ids and return those where the file source is
48-
/// older than `MIN_FILE_SOURCE_RETENTION_HOURS` hours
45+
/// Parse the provided source ids and return those where it's a Lambda file
46+
/// source older than `FILE_SOURCE_RETENTION_SECONDS`
4947
pub(crate) fn filter_prunable_lambda_source_ids<'a>(
5048
source_ids: impl Iterator<Item = &'a String>,
5149
) -> anyhow::Result<impl Iterator<Item = &'a String>> {
52-
let src_timestamps = source_ids
53-
.filter(|src_id| src_id.starts_with(LAMBDA_SOURCE_ID_PREFIX))
50+
let src_timestamps: Vec<_> = source_ids
51+
.filter(|src_id| is_lambda_source_id(src_id))
5452
.map(|src_id| Ok((parse_source_id_timestamp(src_id)?, src_id)))
55-
.collect::<anyhow::Result<BTreeMap<_, _>>>()?;
53+
.collect::<anyhow::Result<_>>()?;
5654

5755
let prunable_sources = src_timestamps
5856
.into_iter()
59-
.rev()
60-
.filter(|(ts, _)| (Utc::now() - *ts).num_hours() > FILE_SOURCE_RETENTION_HOURS as i64)
57+
.filter(|(ts, _)| (Utc::now() - *ts).num_seconds() > FILE_SOURCE_RETENTION_SECONDS)
6158
.map(|(_, src_id)| src_id);
6259

6360
Ok(prunable_sources)
@@ -103,13 +100,13 @@ mod tests {
103100
fn test_filter_old() {
104101
let source_ids: Vec<String> = (0..5)
105102
.map(|i| {
106-
let hours_ago = i * FILE_SOURCE_RETENTION_HOURS * 2;
107-
Utc::now() - chrono::Duration::try_hours(hours_ago as i64).unwrap()
103+
let secs_ago = i * FILE_SOURCE_RETENTION_SECONDS * 2;
104+
Utc::now() - chrono::Duration::try_seconds(secs_ago).unwrap()
108105
})
109106
.map(create_lambda_source_id)
110107
.collect();
111108

112-
// Prune source ids that happen to be from newest to oldest
109+
// Prune source ids, only keeping the most recent one
113110
let prunable_sources = filter_prunable_lambda_source_ids(source_ids.iter())
114111
.unwrap()
115112
.collect::<HashSet<_>>();
@@ -118,16 +115,6 @@ mod tests {
118115
for source_id in source_ids.iter().skip(1) {
119116
assert!(prunable_sources.contains(source_id));
120117
}
121-
122-
// Prune source ids that happen to be from oldest to newst
123-
let prunable_sources = filter_prunable_lambda_source_ids(source_ids.iter().rev())
124-
.unwrap()
125-
.collect::<HashSet<_>>();
126-
assert_eq!(prunable_sources.len(), 4);
127-
assert!(!prunable_sources.contains(&source_ids[0]));
128-
for source_id in source_ids.iter().skip(1) {
129-
assert!(prunable_sources.contains(source_id));
130-
}
131118
}
132119

133120
#[test]

0 commit comments

Comments
 (0)