diff --git a/crates/iota-data-ingestion-core/src/progress_store/file.rs b/crates/iota-data-ingestion-core/src/progress_store/file.rs index af7ff8f9c34..3d8b6b6363c 100644 --- a/crates/iota-data-ingestion-core/src/progress_store/file.rs +++ b/crates/iota-data-ingestion-core/src/progress_store/file.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use iota_types::messages_checkpoint::CheckpointSequenceNumber; use serde_json::{Number, Value}; use tokio::{ - fs::{File, OpenOptions}, + fs::File, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, }; @@ -35,6 +35,8 @@ use crate::{IngestionError, IngestionResult, progress_store::ProgressStore}; /// } /// ``` pub struct FileProgressStore { + /// The path to the progress file. + path: PathBuf, /// The [`File`] handle used to interact with the progress file. file: File, } @@ -43,14 +45,15 @@ impl FileProgressStore { /// Creates a new `FileProgressStore` by opening or creating the file at the /// specified path. pub async fn new(path: impl Into) -> IngestionResult { - Self::open_or_create_file(path.into()) + let path = path.into(); + Self::open_or_create_file(&path) .await - .map(|file| Self { file }) + .map(|file| Self { file, path }) } /// Open or create the file at the specified path. - async fn open_or_create_file(path: PathBuf) -> IngestionResult { - Ok(OpenOptions::new() + async fn open_or_create_file(path: &PathBuf) -> IngestionResult { + Ok(File::options() .read(true) .write(true) .create(true) @@ -95,11 +98,31 @@ impl FileProgressStore { /// Writes the given data to the file, overwriting any existing content. async fn write_to_file(&mut self, data: impl AsRef<[u8]>) -> IngestionResult<()> { - // before writing seek to the start of the file - self.file.seek(SeekFrom::Start(0)).await?; - // clear the file content - self.file.set_len(0).await?; - Ok(self.file.write_all(data.as_ref()).await?) + let tmp_path = self.path.with_extension("tmp"); + + { + let mut tmp_file = File::options() + .write(true) + .create(true) + .truncate(true) + .open(&tmp_path) + .await?; + tmp_file.write_all(data.as_ref()).await?; + tmp_file.sync_data().await?; + + // only for testing add a small delay, useful for simulate crashes + if cfg!(test) { + tokio::time::sleep(std::time::Duration::from_nanos(10)).await; + } + } + + // Atomically replace the original file + tokio::fs::rename(&tmp_path, &self.path).await?; + + // Re-open the file handle for further reads + self.file = File::open(&self.path).await?; + + Ok(()) } } diff --git a/crates/iota-data-ingestion-core/src/tests.rs b/crates/iota-data-ingestion-core/src/tests.rs index d3ddfe5eab7..86e14fe8500 100644 --- a/crates/iota-data-ingestion-core/src/tests.rs +++ b/crates/iota-data-ingestion-core/src/tests.rs @@ -27,11 +27,12 @@ use iota_types::{ use prometheus::Registry; use rand::{SeedableRng, prelude::StdRng}; use tempfile::NamedTempFile; +use tokio::time::timeout; use tokio_util::sync::CancellationToken; use crate::{ DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, IngestionResult, - ReaderOptions, Reducer, Worker, WorkerPool, progress_store::ExecutorProgress, + ProgressStore, ReaderOptions, Reducer, Worker, WorkerPool, progress_store::ExecutorProgress, }; async fn add_worker_pool( @@ -328,6 +329,74 @@ async fn graceful_shutdown_faulty_reducer() { assert_eq!(result.unwrap().get("test"), Some(&0)); } +/// Tests the atomicity of FileProgressStore's save operation by simulating a +/// crash/interruption. +/// +/// This test attempts to save a new value with a very short timeout, simulating +/// a crash before the save completes. It verifies that if the save is +/// interrupted, the original value remains unchanged, demonstrating that +/// FileProgressStore does not leave the file in a partial or corrupted state +/// even if the save is not completed. +#[tokio::test] +async fn file_progress_store_save_timeout_simulates_crash() { + // Setup: create a FileProgressStore with initial data + let progress_file = NamedTempFile::new().unwrap(); + let path = progress_file.path().to_path_buf(); + let mut store = FileProgressStore::new(path.clone()).await.unwrap(); + + // Save an initial value + store.save("task1".to_string(), 42).await.unwrap(); + + // Confirm the value is present + let value = store.load("task1".to_string()).await.unwrap(); + assert_eq!(value, 42); + + // Attempt to save a new value, but with a very short timeout to simulate a + // crash/interruption + let result = timeout( + Duration::from_nanos(5), + store.save("task1".to_string(), 100), + ) + .await; + + // The operation should time out (simulate crash) + assert!(result.is_err(), "Save did not time out as expected"); + + // The value should still be the old value, as the save was interrupted + let value = store.load("task1".to_string()).await.unwrap(); + assert_eq!( + value, 42, + "Value should remain unchanged after interrupted save" + ); +} + +/// Tests the basic save and load functionality of FileProgressStore. +/// +/// This test saves an initial value, verifies it, then saves a new value and +/// verifies the update. It demonstrates that FileProgressStore correctly +/// persists and retrieves checkpoint data. +#[tokio::test] +async fn file_progress_store() { + // Setup: create a FileProgressStore with initial data + let progress_file = NamedTempFile::new().unwrap(); + let path = progress_file.path().to_path_buf(); + let mut store = FileProgressStore::new(path.clone()).await.unwrap(); + + // Save an initial value + store.save("task1".to_string(), 42).await.unwrap(); + + // Confirm the value is present + let value = store.load("task1".to_string()).await.unwrap(); + assert_eq!(value, 42); + + // Save a new value + store.save("task1".to_string(), 100).await.unwrap(); + + // Confirm the value is updated + let value = store.load("task1".to_string()).await.unwrap(); + assert_eq!(value, 100); +} + fn temp_dir() -> std::path::PathBuf { tempfile::tempdir() .expect("Failed to open temporary directory")