Skip to content

fix(iota-data-ingestion-core): make file updates atomic #6918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions crates/iota-data-ingestion-core/src/progress_store/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use async_trait::async_trait;
use iota_types::messages_checkpoint::CheckpointSequenceNumber;
use serde_json::{Number, Value};
use tokio::{
fs::{File, OpenOptions},
fs::File,
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
};

Expand All @@ -35,6 +35,8 @@ use crate::{IngestionError, IngestionResult, progress_store::ProgressStore};
/// }
/// ```
pub struct FileProgressStore {
/// The path to the progress file.
path: PathBuf,
/// The [`File`] handle used to interact with the progress file.
file: File,
}
Expand All @@ -43,14 +45,15 @@ impl FileProgressStore {
/// Creates a new `FileProgressStore` by opening or creating the file at the
/// specified path.
pub async fn new(path: impl Into<PathBuf>) -> IngestionResult<Self> {
Self::open_or_create_file(path.into())
let path = path.into();
Self::open_or_create_file(&path)
.await
.map(|file| Self { file })
.map(|file| Self { file, path })
}

/// Open or create the file at the specified path.
async fn open_or_create_file(path: PathBuf) -> IngestionResult<File> {
Ok(OpenOptions::new()
async fn open_or_create_file(path: &PathBuf) -> IngestionResult<File> {
Ok(File::options()
.read(true)
.write(true)
.create(true)
Expand Down Expand Up @@ -95,11 +98,31 @@ impl FileProgressStore {

/// Writes the given data to the file, overwriting any existing content.
async fn write_to_file(&mut self, data: impl AsRef<[u8]>) -> IngestionResult<()> {
// before writing seek to the start of the file
self.file.seek(SeekFrom::Start(0)).await?;
// clear the file content
self.file.set_len(0).await?;
Ok(self.file.write_all(data.as_ref()).await?)
let tmp_path = self.path.with_extension("tmp");

{
let mut tmp_file = File::options()
.write(true)
.create(true)
.truncate(true)
.open(&tmp_path)
.await?;
tmp_file.write_all(data.as_ref()).await?;
tmp_file.sync_data().await?;

// only for testing add a small delay, useful for simulate crashes
if cfg!(test) {
tokio::time::sleep(std::time::Duration::from_nanos(10)).await;
}
}

// Atomically replace the original file
tokio::fs::rename(&tmp_path, &self.path).await?;

// Re-open the file handle for further reads
self.file = File::open(&self.path).await?;

Ok(())
}
}

Expand Down
71 changes: 70 additions & 1 deletion crates/iota-data-ingestion-core/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use iota_types::{
use prometheus::Registry;
use rand::{SeedableRng, prelude::StdRng};
use tempfile::NamedTempFile;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;

use crate::{
DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, IngestionResult,
ReaderOptions, Reducer, Worker, WorkerPool, progress_store::ExecutorProgress,
ProgressStore, ReaderOptions, Reducer, Worker, WorkerPool, progress_store::ExecutorProgress,
};

async fn add_worker_pool<W: Worker + 'static>(
Expand Down Expand Up @@ -328,6 +329,74 @@ async fn graceful_shutdown_faulty_reducer() {
assert_eq!(result.unwrap().get("test"), Some(&0));
}

/// Tests the atomicity of FileProgressStore's save operation by simulating a
/// crash/interruption.
///
/// This test attempts to save a new value with a very short timeout, simulating
/// a crash before the save completes. It verifies that if the save is
/// interrupted, the original value remains unchanged, demonstrating that
/// FileProgressStore does not leave the file in a partial or corrupted state
/// even if the save is not completed.
#[tokio::test]
async fn file_progress_store_save_timeout_simulates_crash() {
// Setup: create a FileProgressStore with initial data
let progress_file = NamedTempFile::new().unwrap();
let path = progress_file.path().to_path_buf();
let mut store = FileProgressStore::new(path.clone()).await.unwrap();

// Save an initial value
store.save("task1".to_string(), 42).await.unwrap();

// Confirm the value is present
let value = store.load("task1".to_string()).await.unwrap();
assert_eq!(value, 42);

// Attempt to save a new value, but with a very short timeout to simulate a
// crash/interruption
let result = timeout(
Duration::from_nanos(5),
store.save("task1".to_string(), 100),
)
.await;

// The operation should time out (simulate crash)
assert!(result.is_err(), "Save did not time out as expected");

// The value should still be the old value, as the save was interrupted
let value = store.load("task1".to_string()).await.unwrap();
assert_eq!(
value, 42,
"Value should remain unchanged after interrupted save"
);
}

/// Tests the basic save and load functionality of FileProgressStore.
///
/// This test saves an initial value, verifies it, then saves a new value and
/// verifies the update. It demonstrates that FileProgressStore correctly
/// persists and retrieves checkpoint data.
#[tokio::test]
async fn file_progress_store() {
// Setup: create a FileProgressStore with initial data
let progress_file = NamedTempFile::new().unwrap();
let path = progress_file.path().to_path_buf();
let mut store = FileProgressStore::new(path.clone()).await.unwrap();

// Save an initial value
store.save("task1".to_string(), 42).await.unwrap();

// Confirm the value is present
let value = store.load("task1".to_string()).await.unwrap();
assert_eq!(value, 42);

// Save a new value
store.save("task1".to_string(), 100).await.unwrap();

// Confirm the value is updated
let value = store.load("task1".to_string()).await.unwrap();
assert_eq!(value, 100);
}

fn temp_dir() -> std::path::PathBuf {
tempfile::tempdir()
.expect("Failed to open temporary directory")
Expand Down
Loading