diff --git a/CHANGELOG.md b/CHANGELOG.md index d4ac243..3d0d556 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ In Development - Add `--list-dates` option - The `` command-line argument is now optional and defaults to the current directory +- The `--inventory-jobs` and `--object-jobs` options have been eliminated in + favor of a new `--jobs` option +- Files & directories in the backup tree that are not listed in the inventory + are deleted +- Increased MSRV to 1.81 v0.1.0-alpha.2 (2025-01-06) --------------------------- diff --git a/Cargo.lock b/Cargo.lock index 5adf174..cab046e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,9 +70,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.5.10" +version = "1.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" +checksum = "c03a50b30228d3af8865ce83376b4e99e1ffa34728220fe2860e4df0bb5278d6" dependencies = [ "aws-credential-types", "aws-runtime", @@ -81,7 +81,7 @@ dependencies = [ "aws-sdk-sts", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.60.7", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -112,9 +112,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.4" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5ac934720fbb46206292d2c75b57e67acfc56fe7dfd34fb9a02334af08409ea" +checksum = "b16d1aa50accc11a4b4d5c50f7fb81cc0cf60328259c587d0e6b0f11385bde46" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -138,9 +138,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.65.0" +version = "1.68.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3ba2c5c0f2618937ce3d4a5ad574b86775576fa24006bcb3128c6e2cbf3c34e" +checksum = "bc5ddf1dc70287dc9a2f953766a1fe15e3e74aef02fd1335f2afa475c9b4f4fc" dependencies = [ "aws-credential-types", "aws-runtime", @@ -149,7 +149,7 @@ dependencies = [ "aws-smithy-checksums", "aws-smithy-eventstream", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -172,15 +172,15 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.50.0" +version = "1.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ca43a4ef210894f93096039ef1d6fa4ad3edfabb3be92b80908b9f2e4b4eab" +checksum = "1605dc0bf9f0a4b05b451441a17fcb0bda229db384f23bf5cead3adbab0664ac" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -194,15 +194,15 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.51.0" +version = "1.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abaf490c2e48eed0bb8e2da2fb08405647bd7f253996e0f93b981958ea0f73b0" +checksum = "59f3f73466ff24f6ad109095e0f3f2c830bfb4cd6c8b12f744c8e61ebf4d3ba1" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -216,15 +216,15 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.51.0" +version = "1.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b68fde0d69c8bfdc1060ea7da21df3e39f6014da316783336deff0a9ec28f4bf" +checksum = "861d324ef69247c6f3c6823755f408a68877ffb1a9afaff6dd8b0057c760de60" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -254,7 +254,7 @@ dependencies = [ "hex", "hmac", "http 0.2.12", - "http 1.1.0", + "http 1.2.0", "once_cell", "p256", "percent-encoding", @@ -330,15 +330,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "aws-smithy-json" -version = "0.60.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" -dependencies = [ - "aws-smithy-types", -] - [[package]] name = "aws-smithy-json" version = "0.61.1" @@ -395,7 +386,7 @@ dependencies = [ "aws-smithy-types", "bytes", "http 0.2.12", - "http 1.1.0", + "http 1.2.0", "pin-project-lite", "tokio", "tracing", @@ -413,7 +404,7 @@ dependencies = [ "bytes-utils", "futures-core", "http 0.2.12", - "http 1.1.0", + "http 1.2.0", "http-body 0.4.6", "http-body 1.0.1", "http-body-util", @@ -810,7 +801,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1051,9 +1042,9 @@ checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" [[package]] name = "hashbrown" -version = "0.15.1" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" dependencies = [ "allocator-api2", "equivalent", @@ -1094,9 +1085,9 @@ dependencies = [ [[package]] name = "http" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea" dependencies = [ "bytes", "fnv", @@ -1121,7 +1112,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http 1.1.0", + "http 1.2.0", ] [[package]] @@ -1132,7 +1123,7 @@ checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "pin-project-lite", ] @@ -1335,7 +1326,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown 0.15.1", + "hashbrown 0.15.2", ] [[package]] @@ -1402,7 +1393,7 @@ version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" dependencies = [ - "hashbrown 0.15.1", + "hashbrown 0.15.2", ] [[package]] @@ -1724,7 +1715,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1796,6 +1787,7 @@ dependencies = [ "aws-smithy-runtime-api", "clap", "csv", + "either", "flate2", "fs-err", "futures-util", @@ -1804,6 +1796,7 @@ dependencies = [ "md-5", "memory-stats", "percent-encoding", + "pin-project-lite", "regex", "rstest", "serde", @@ -2082,7 +2075,7 @@ dependencies = [ "getrandom", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9d1836b..0a10738 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "s3invsync" version = "0.1.0-alpha.2" edition = "2021" -rust-version = "1.80" +rust-version = "1.81" description = "AWS S3 Inventory-based backup tool with efficient incremental & versionId support" authors = [ "DANDI Developers ", @@ -24,14 +24,16 @@ aws-smithy-async = "1.2.3" aws-smithy-runtime-api = "1.7.3" clap = { version = "4.5.26", default-features = false, features = ["derive", "error-context", "help", "std", "suggestions", "usage", "wrap_help"] } csv = "1.3.1" +either = "1.13.0" flate2 = "1.0.35" fs-err = { version = "3.0.0", features = ["tokio"] } -futures-util = "0.3.31" +futures-util = { version = "0.3.31", default-features = false, features = ["std"] } hex = "0.4.3" lockable = "0.1.1" md-5 = "0.10.6" memory-stats = "1.2.0" percent-encoding = "2.3.1" +pin-project-lite = "0.2.16" regex = "1.11.1" serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.135" @@ -39,7 +41,7 @@ strum = { version = "0.26.3", features = ["derive"] } tempfile = "3.15.0" thiserror = "2.0.11" time = { version = "0.3.37", features = ["macros", "parsing"] } -tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "signal"] } +tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "signal", "sync"] } tokio-util = { version = "0.7.13", features = ["rt"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["local-time", "time"] } diff --git a/README.md b/README.md index bad085c..d52f60d 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [![Project Status: WIP – Initial development is in progress, but there has not yet been a stable, usable release suitable for the public.](https://www.repostatus.org/badges/latest/wip.svg)](https://www.repostatus.org/#wip) [![CI Status](https://github.com/dandi/s3invsync/actions/workflows/test.yml/badge.svg)](https://github.com/dandi/s3invsync/actions/workflows/test.yml) [![codecov.io](https://codecov.io/gh/dandi/s3invsync/branch/main/graph/badge.svg)](https://codecov.io/gh/dandi/s3invsync) -[![Minimum Supported Rust Version](https://img.shields.io/badge/MSRV-1.80-orange)](https://www.rust-lang.org) +[![Minimum Supported Rust Version](https://img.shields.io/badge/MSRV-1.81-orange)](https://www.rust-lang.org) [![MIT License](https://img.shields.io/github/license/dandi/s3invsync.svg)](https://opensource.org/licenses/MIT) [GitHub](https://github.com/dandi/s3invsync) | [Issues](https://github.com/dandi/s3invsync/issues) | [Changelog](https://github.com/dandi/s3invsync/blob/main/CHANGELOG.md) @@ -92,7 +92,9 @@ When downloading a given key from S3, the latest version (if not deleted) is stored at `{outdir}/{key}`, and the versionIds and etags of all latest object versions in a given directory are stored in `.s3invsync.versions.json` in that directory. Each non-latest, non-deleted version of a given key is stored at -`{outdir}/{key}.old.{versionId}.{etag}`. +`{outdir}/{key}.old.{versionId}.{etag}`. Any other files or directories under +`` that do not correspond to an object listed in the inventory are +deleted. Options ------- @@ -110,8 +112,8 @@ Options inventory for the given date is used) or in the format `YYYY-MM-DDTHH-MMZ` (to specify a specific inventory). -- `-I `, `--inventory-jobs ` — Specify the maximum number of inventory - list files to download & process at once [default: 20] +- `-J `, `--jobs ` — Specify the maximum number of concurrent + download jobs [default: 20] - `--list-dates` — List available inventory manifest dates instead of backing anything up @@ -120,9 +122,6 @@ Options Possible values are "`ERROR`", "`WARN`", "`INFO`", "`DEBUG`", and "`TRACE`" (all case-insensitive). [default value: `DEBUG`] -- `-O `, `--object-jobs ` — Specify the maximum number of inventory - entries to download & process at once [default: 20] - - `--path-filter ` — Only download objects whose keys match the given [regular expression](https://docs.rs/regex/latest/regex/#syntax) diff --git a/src/consts.rs b/src/consts.rs index b66da2b..99dae40 100644 --- a/src/consts.rs +++ b/src/consts.rs @@ -1,3 +1,7 @@ /// The name of the file in which metadata (version ID and etag) are stored for /// the latest versions of objects in each directory pub(crate) static METADATA_FILENAME: &str = ".s3invsync.versions.json"; + +/// The number of initial bytes of an inventory csv.gz file to fetch when +/// peeking at just the first entry +pub(crate) const CSV_GZIP_PEEK_SIZE: usize = 1024; diff --git a/src/inventory/item.rs b/src/inventory/item.rs index c331ecc..bd8bbf3 100644 --- a/src/inventory/item.rs +++ b/src/inventory/item.rs @@ -1,5 +1,6 @@ use crate::keypath::KeyPath; use crate::s3::S3Location; +use crate::util::make_old_filename; use time::OffsetDateTime; /// An entry in an inventory list file @@ -9,6 +10,16 @@ pub(crate) enum InventoryEntry { Item(InventoryItem), } +impl InventoryEntry { + /// Returns the entry's key + pub(crate) fn key(&self) -> &str { + match self { + InventoryEntry::Directory(Directory { key, .. }) => key, + InventoryEntry::Item(InventoryItem { key, .. }) => key.as_ref(), + } + } +} + /// An entry in an inventory list file pointing to a directory object #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct Directory { @@ -60,6 +71,20 @@ impl InventoryItem { S3Location::new(self.bucket.clone(), String::from(&self.key)) .with_version_id(self.version_id.clone()) } + + /// Returns whether the object is a delete marker + pub(crate) fn is_deleted(&self) -> bool { + self.details == ItemDetails::Deleted + } + + /// If the object is not a delete marker and is not the latest version of + /// the key, return the base filename at which it will be backed up. + pub(crate) fn old_filename(&self) -> Option { + let ItemDetails::Present { ref etag, .. } = self.details else { + return None; + }; + (!self.is_latest).then(|| make_old_filename(self.key.name(), &self.version_id, etag)) + } } /// Metadata about an object's content diff --git a/src/keypath.rs b/src/keypath.rs index 4078114..774ca41 100644 --- a/src/keypath.rs +++ b/src/keypath.rs @@ -15,6 +15,14 @@ use thiserror::Error; pub(crate) struct KeyPath(String); impl KeyPath { + /// Return the filename portion of the path + pub(crate) fn name(&self) -> &str { + self.0 + .split('/') + .next_back() + .expect("path should be nonempty") + } + /// Split the path into the directory component (if any) and filename pub(crate) fn split(&self) -> (Option<&str>, &str) { match self.0.rsplit_once('/') { @@ -162,6 +170,23 @@ mod tests { use assert_matches::assert_matches; use rstest::rstest; + #[rstest] + #[case("foo", "foo")] + #[case("foo/bar/baz", "baz")] + fn test_name(#[case] p: KeyPath, #[case] name: &str) { + assert_eq!(p.name(), name); + } + + #[rstest] + #[case("foo", None, "foo")] + #[case("foo/bar", Some("foo"), "bar")] + #[case("foo/bar/baz", Some("foo/bar"), "baz")] + fn test_split(#[case] p: KeyPath, #[case] dirname: Option<&str>, #[case] filename: &str) { + let (d, f) = p.split(); + assert_eq!(d, dirname); + assert_eq!(f, filename); + } + #[rstest] #[case("foo.nwb")] #[case("foo/bar.nwb")] @@ -200,6 +225,7 @@ mod tests { #[case(".old.bar.baz", false)] #[case("foo.old..baz", false)] #[case("foo.old..", false)] + #[case(".s3invsync.versions.json", true)] fn test_is_special_component(#[case] s: &str, #[case] r: bool) { assert_eq!(is_special_component(s), r); } diff --git a/src/main.rs b/src/main.rs index abfaa28..946e7bc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod consts; mod inventory; mod keypath; mod manifest; +mod nursery; mod s3; mod syncer; mod timestamps; @@ -39,10 +40,9 @@ struct Arguments { #[arg(short, long)] date: Option, - /// Set the maximum number of inventory list files to download & process at - /// once - #[arg(short = 'I', long, default_value = "20")] - inventory_jobs: NonZeroUsize, + /// Set the maximum number of concurrent download jobs + #[arg(short = 'J', long, default_value = "20")] + jobs: NonZeroUsize, /// List available inventory manifest dates instead of backing anything up #[arg(long)] @@ -57,11 +57,6 @@ struct Arguments { )] log_level: Level, - /// Set the maximum number of inventory entries to download & process at - /// once - #[arg(short = 'O', long, default_value = "20")] - object_jobs: NonZeroUsize, - /// Only download objects whose keys match the given regular expression #[arg(long, value_name = "REGEX")] path_filter: Option, @@ -134,8 +129,7 @@ async fn run(args: Arguments) -> anyhow::Result<()> { outdir, manifest_date, start_time, - args.inventory_jobs, - args.object_jobs, + args.jobs, args.path_filter, args.compress_filter_msgs, ); diff --git a/src/nursery.rs b/src/nursery.rs new file mode 100644 index 0000000..d45b56e --- /dev/null +++ b/src/nursery.rs @@ -0,0 +1,251 @@ +//! Simple tokio-based task group/nursery +use futures_util::{stream::FuturesUnordered, Stream, StreamExt}; +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; +use tokio::{ + sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + task::{JoinError, JoinHandle}, +}; + +/// A handle for spawning new tasks in a task group/nursery. +/// +/// `Nursery` is cloneable and sendable, and so it can be used to spawn tasks +/// from inside other tasks in the nursery. The nursery returned by +/// [`Nursery::new()`] and all clones thereof must be dropped before the +/// corresponding [`NurseryStream`] can yield `None`. +#[derive(Debug)] +pub(crate) struct Nursery { + sender: UnboundedSender>, +} + +impl Nursery { + /// Create a new nursery and return a handle for spawning tasks and a + /// [`Stream`] of task return values. `T` is the `Output` type of the + /// futures that will be spawned in the nursery. + pub(crate) fn new() -> (Nursery, NurseryStream) { + let (sender, receiver) = unbounded_channel(); + ( + Nursery { sender }, + NurseryStream { + receiver, + tasks: FuturesUnordered::new(), + }, + ) + } + + /// Spawn a future that returns `T` in the nursery. + pub(crate) fn spawn(&self, fut: Fut) + where + Fut: Future + Send + 'static, + { + let _ = self.sender.send(FragileHandle::new(tokio::spawn(fut))); + } +} + +// Clone can't be derived, as that would erroneously add `T: Clone` bounds to +// the impl. +impl Clone for Nursery { + fn clone(&self) -> Nursery { + Nursery { + sender: self.sender.clone(), + } + } +} + +/// A [`Stream`] of the values returned by the tasks spawned in a nursery. +/// +/// The corresponding [`Nursery`] and all clones thereof must be dropped before +/// the stream can yield `None`. +/// +/// When a `NurseryStream` is dropped, all tasks in the nursery are aborted. +#[derive(Debug)] +pub(crate) struct NurseryStream { + receiver: UnboundedReceiver>, + tasks: FuturesUnordered>, +} + +impl Stream for NurseryStream { + type Item = T; + + /// Poll for one of the tasks in the nursery to complete and return its + /// return value. + /// + /// # Panics + /// + /// If a task panics, this method resumes unwinding the panic. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let closed = loop { + match self.receiver.poll_recv(cx) { + Poll::Pending => break false, + Poll::Ready(Some(handle)) => self.tasks.push(handle), + Poll::Ready(None) => break true, + } + }; + match ready!(self.tasks.poll_next_unpin(cx)) { + Some(Ok(r)) => Some(r).into(), + Some(Err(e)) => match e.try_into_panic() { + Ok(barf) => std::panic::resume_unwind(barf), + Err(e) => unreachable!( + "Task in nursery should not have been aborted before dropping stream, but got {e:?}" + ), + }, + None => { + if closed { + // All Nursery clones dropped and all results yielded; end + // of stream + None.into() + } else { + Poll::Pending + } + } + } + } +} + +pin_project! { + /// A wrapper around `tokio::task::JoinHandle` that aborts the task on drop. + #[derive(Debug)] + struct FragileHandle { + #[pin] + inner: JoinHandle + } + + impl PinnedDrop for FragileHandle { + fn drop(this: Pin<&mut Self>) { + this.project().inner.abort(); + } + } +} + +impl FragileHandle { + fn new(inner: JoinHandle) -> Self { + FragileHandle { inner } + } +} + +impl Future for FragileHandle { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.inner.poll(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_util::{FutureExt, StreamExt}; + use std::time::Duration; + use tokio::{sync::oneshot, time::timeout}; + + #[test] + fn nursery_is_send() { + #[allow(dead_code)] + fn require_send(_t: T) {} + + #[allow(dead_code)] + fn check_nursery_send() { + let (nursery, _) = Nursery::::new(); + require_send(nursery); + } + } + + #[tokio::test] + async fn collect() { + let (nursery, nursery_stream) = Nursery::new(); + nursery.spawn(std::future::ready(1)); + nursery.spawn(std::future::ready(2)); + nursery.spawn(std::future::ready(3)); + drop(nursery); + let mut values = nursery_stream.collect::>().await; + values.sort_unstable(); + assert_eq!(values, vec![1, 2, 3]); + } + + #[tokio::test] + async fn nested_spawn() { + let (nursery, nursery_stream) = Nursery::new(); + let inner = nursery.clone(); + nursery.spawn(async move { + inner.spawn(std::future::ready(0)); + std::future::ready(1).await + }); + nursery.spawn(std::future::ready(2)); + nursery.spawn(std::future::ready(3)); + drop(nursery); + let mut values = nursery_stream.collect::>().await; + values.sort_unstable(); + assert_eq!(values, vec![0, 1, 2, 3]); + } + + #[tokio::test] + async fn reraise_panic() { + let (nursery, mut nursery_stream) = Nursery::new(); + nursery.spawn(async { panic!("I can't take this anymore!") }); + drop(nursery); + let r = std::panic::AssertUnwindSafe(nursery_stream.next()) + .catch_unwind() + .await; + assert!(r.is_err()); + } + + #[tokio::test] + async fn no_close_until_drop() { + let (nursery, mut nursery_stream) = Nursery::new(); + nursery.spawn(std::future::ready(1)); + nursery.spawn(std::future::ready(2)); + nursery.spawn(std::future::ready(3)); + let mut values = Vec::new(); + values.push(nursery_stream.next().await.unwrap()); + values.push(nursery_stream.next().await.unwrap()); + values.push(nursery_stream.next().await.unwrap()); + values.sort_unstable(); + assert_eq!(values, vec![1, 2, 3]); + let r = timeout(Duration::from_millis(100), nursery_stream.next()).await; + assert!(r.is_err()); + drop(nursery); + let r = timeout(Duration::from_millis(100), nursery_stream.next()).await; + assert_eq!(r, Ok(None)); + } + + #[tokio::test] + async fn drop_tasks_on_drop_stream() { + enum Void {} + + let (nursery, nursery_stream) = Nursery::new(); + let (sender, receiver) = oneshot::channel::(); + nursery.spawn({ + async move { + std::future::pending::<()>().await; + drop(sender); + } + }); + drop(nursery); + drop(nursery_stream); + assert!(receiver.await.is_err()); + } + + #[tokio::test] + async fn nest_nurseries() { + let (nursery, nursery_stream) = Nursery::new(); + nursery.spawn(async { + let (nursery, nursery_stream) = Nursery::new(); + nursery.spawn(std::future::ready(1)); + nursery.spawn(std::future::ready(2)); + nursery.spawn(std::future::ready(3)); + drop(nursery); + nursery_stream + .fold(0, |accum, i| async move { accum + i }) + .await + }); + nursery.spawn(std::future::ready(4)); + nursery.spawn(std::future::ready(5)); + drop(nursery); + let mut values = nursery_stream.collect::>().await; + values.sort_unstable(); + assert_eq!(values, vec![4, 5, 6]); + } +} diff --git a/src/s3/mod.rs b/src/s3/mod.rs index 5247c0d..455dcbd 100644 --- a/src/s3/mod.rs +++ b/src/s3/mod.rs @@ -3,7 +3,8 @@ mod location; mod streams; pub(crate) use self::location::S3Location; use self::streams::{ListManifestDates, ListObjectsError}; -use crate::inventory::{CsvReader, InventoryList}; +use crate::consts::CSV_GZIP_PEEK_SIZE; +use crate::inventory::{CsvReader, CsvReaderError, InventoryEntry, InventoryList}; use crate::manifest::{CsvManifest, FileSpec}; use crate::timestamps::{Date, DateHM, DateMaybeHM}; use aws_credential_types::{ @@ -240,6 +241,39 @@ impl S3Client { Ok(InventoryList::for_downloaded_csv(path, url, reader)) } + /// Fetch the first [`CSV_GZIP_PEEK_SIZE`] bytes of the CSV inventory list + /// file described by `fspec` and extract the first line. Returns `None` + /// if the file is empty. + #[tracing::instrument(skip_all, fields(key = fspec.key))] + pub(crate) async fn peek_inventory_csv( + &self, + fspec: &FileSpec, + ) -> Result, CsvPeekError> { + tracing::debug!("Peeking at first {CSV_GZIP_PEEK_SIZE} bytes of file"); + let url = self.inventory_base.with_key(&fspec.key); + let obj = self.get_object(&url).await?; + let mut bytestream = obj.body; + let mut header = std::collections::VecDeque::with_capacity(CSV_GZIP_PEEK_SIZE); + while let Some(blob) = + bytestream + .try_next() + .await + .map_err(|source| CsvPeekError::Download { + url: url.clone(), + source, + })? + { + header.extend(blob); + if header.len() >= CSV_GZIP_PEEK_SIZE { + break; + } + } + CsvReader::from_gzipped_reader(header, fspec.file_schema.clone()) + .next() + .transpose() + .map_err(|source| CsvPeekError::Decode { url, source }) + } + /// Download the object at `url` and write its bytes to `outfile`. If /// `md5_digest` is non-`None` (in which case it must be a 32-character /// lowercase hexadecimal string), it is used to validate the download. @@ -442,6 +476,28 @@ pub(crate) enum CsvDownloadError { }, } +/// Error returned by [`S3Client::peek_inventory_csv()`] +#[derive(Debug, Error)] +pub(crate) enum CsvPeekError { + /// Failed to perform "Get Object" request + #[error(transparent)] + Get(#[from] GetError), + + /// Error while receiving bytes for the object + #[error("failed downloading contents for {url}")] + Download { + url: S3Location, + source: ByteStreamError, + }, + + /// Failed to read first line from header + #[error("failed to decode first line from peeking at {url}")] + Decode { + url: S3Location, + source: CsvReaderError, + }, +} + /// Error returned by [`S3Client::get_object()`] when a "Get Object" request /// fails #[derive(Debug, Error)] diff --git a/src/syncer/metadata.rs b/src/syncer/metadata.rs new file mode 100644 index 0000000..c96292d --- /dev/null +++ b/src/syncer/metadata.rs @@ -0,0 +1,153 @@ +use super::*; +use crate::util::make_old_filename; +use serde::{Deserialize, Serialize}; +use std::io::ErrorKind; + +/// Metadata about the latest version of a key +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(super) struct Metadata { + /// The object's version ID + pub(super) version_id: String, + + /// The object's etag + pub(super) etag: String, +} + +impl Metadata { + /// Return the filename used for backing up a non-latest object that has + /// `self` as its metadata and `basename` as the filename portion of its + /// key + pub(super) fn old_filename(&self, basename: &str) -> String { + make_old_filename(basename, &self.version_id, &self.etag) + } +} + +/// Handle for manipulating the metadata for the latest version of a key in a +/// local JSON database +pub(super) struct FileMetadataManager<'a> { + syncer: &'a Syncer, + + /// The manager for the directory's database + inner: MetadataManager<'a>, + + /// The filename of the object + filename: &'a str, +} + +impl<'a> FileMetadataManager<'a> { + pub(super) fn new(syncer: &'a Syncer, parentdir: &'a Path, filename: &'a str) -> Self { + FileMetadataManager { + syncer, + inner: MetadataManager::new(parentdir), + filename, + } + } + + /// Acquire a lock on this JSON database + async fn lock(&self) -> Guard<'a> { + self.syncer.lock_path(self.database_path().to_owned()).await + } + + fn database_path(&self) -> &Path { + &self.inner.database_path + } + + /// Retrieve the metadata for the key from the database + pub(super) async fn get(&self) -> anyhow::Result { + tracing::trace!(file = self.filename, database = %self.database_path().display(), "Fetching object metadata for file from database"); + let mut data = { + let _guard = self.lock().await; + self.inner.load()? + }; + let Some(md) = data.remove(self.filename) else { + anyhow::bail!( + "No entry for {:?} in {}", + self.filename, + self.database_path().display() + ); + }; + Ok(md) + } + + /// Set the metadata for the key in the database to `md` + pub(super) async fn set(&self, md: Metadata) -> anyhow::Result<()> { + tracing::trace!(file = self.filename, database = %self.database_path().display(), "Setting object metadata for file in database"); + let _guard = self.lock().await; + let mut data = self.inner.load()?; + data.insert(self.filename.to_owned(), md); + self.inner.store(data)?; + Ok(()) + } + + /// Remove the metadata for the key from the database + pub(super) async fn delete(&self) -> anyhow::Result<()> { + tracing::trace!(file = self.filename, database = %self.database_path().display(), "Deleting object metadata for file from database"); + let _guard = self.lock().await; + let mut data = self.inner.load()?; + if data.remove(self.filename).is_some() { + self.inner.store(data)?; + } + Ok(()) + } +} + +/// Handle for manipulating the metadata a local JSON database +pub(super) struct MetadataManager<'a> { + /// The local directory in which the downloaded object and the JSON + /// database are both located + dirpath: &'a Path, + + /// The path to the JSON database + database_path: PathBuf, +} + +impl<'a> MetadataManager<'a> { + pub(super) fn new(dirpath: &'a Path) -> MetadataManager<'a> { + MetadataManager { + dirpath, + database_path: dirpath.join(METADATA_FILENAME), + } + } + + /// Read & parse the database file. If the file does not exist, return an + /// empty map. + pub(super) fn load(&self) -> anyhow::Result> { + let content = match fs_err::read_to_string(&self.database_path) { + Ok(content) => content, + Err(e) if e.kind() == ErrorKind::NotFound => String::from("{}"), + Err(e) => return Err(e.into()), + }; + serde_json::from_str(&content).with_context(|| { + format!( + "failed to deserialize contents of {}", + self.database_path.display() + ) + }) + } + + /// Set the content of the database file to the serialized map + pub(super) fn store(&self, data: BTreeMap) -> anyhow::Result<()> { + let fp = tempfile::Builder::new() + .prefix(".s3invsync.versions.") + .tempfile_in(self.dirpath) + .with_context(|| { + format!( + "failed to create temporary database file for updating {}", + self.database_path.display() + ) + })?; + serde_json::to_writer_pretty(fp.as_file(), &data).with_context(|| { + format!( + "failed to serialize metadata to {}", + self.database_path.display() + ) + })?; + fp.persist(&self.database_path).with_context(|| { + format!( + "failed to persist temporary database file to {}", + self.database_path.display() + ) + })?; + Ok(()) + } +} diff --git a/src/syncer.rs b/src/syncer/mod.rs similarity index 63% rename from src/syncer.rs rename to src/syncer/mod.rs index 834913a..2f8cb32 100644 --- a/src/syncer.rs +++ b/src/syncer/mod.rs @@ -1,12 +1,19 @@ +mod metadata; +mod treetracker; +use self::metadata::*; +use self::treetracker::*; use crate::consts::METADATA_FILENAME; use crate::inventory::{InventoryEntry, InventoryItem, ItemDetails}; -use crate::manifest::CsvManifest; +use crate::keypath::is_special_component; +use crate::manifest::{CsvManifest, FileSpec}; +use crate::nursery::{Nursery, NurseryStream}; use crate::s3::S3Client; use crate::timestamps::DateHM; use crate::util::*; use anyhow::Context; -use serde::{Deserialize, Serialize}; +use futures_util::StreamExt; use std::collections::BTreeMap; +use std::future::Future; use std::io::ErrorKind; use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; @@ -14,7 +21,7 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, Mutex, }; -use tokio::task::JoinSet; +use tokio::sync::Notify; use tokio_util::sync::CancellationToken; /// Capacity of async channels @@ -23,6 +30,8 @@ const CHANNEL_SIZE: usize = 65535; /// Lock guard returned by [`Syncer::lock_path()`] type Guard<'a> = as lockable::Lockable>::Guard<'a>; +type ObjChannelItem = (InventoryItem, Option>); + /// Object responsible for syncing an S3 bucket to a local backup by means of /// the bucket's S3 Inventory pub(crate) struct Syncer { @@ -38,11 +47,8 @@ pub(crate) struct Syncer { /// The time at which the overall backup procedure started start_time: std::time::Instant, - /// The number of concurrent downloads of CSV inventory lists - inventory_jobs: NonZeroUsize, - - /// The number of concurrent downloads of S3 objects - object_jobs: NonZeroUsize, + /// The number of concurrent downloads jobs + jobs: NonZeroUsize, /// Only download objects whose keys match the given regex path_filter: Option, @@ -56,10 +62,10 @@ pub(crate) struct Syncer { /// A clone of the channel used for sending inventory entries off to be /// downloaded. This is set to `None` after spawning all of the inventory /// list download tasks. - obj_sender: Mutex>>, + obj_sender: Mutex>>, /// A clone of the channel used for receiving inventory entries to download - obj_receiver: async_channel::Receiver, + obj_receiver: async_channel::Receiver, /// Whether the backup was terminated by Ctrl-C terminated: AtomicBool, @@ -76,8 +82,7 @@ impl Syncer { outdir: PathBuf, manifest_date: DateHM, start_time: std::time::Instant, - inventory_jobs: NonZeroUsize, - object_jobs: NonZeroUsize, + jobs: NonZeroUsize, path_filter: Option, compress_filter_msgs: Option, ) -> Arc { @@ -87,8 +92,7 @@ impl Syncer { outdir, manifest_date, start_time, - inventory_jobs, - object_jobs, + jobs, path_filter, locks: lockable::LockPool::new(), token: CancellationToken::new(), @@ -100,6 +104,20 @@ impl Syncer { } pub(crate) async fn run(self: &Arc, manifest: CsvManifest) -> Result<(), MultiError> { + self.spawwn_cltrc_listener(); + let fspecs = self.sort_csvs_by_first_line(manifest.files).await?; + tracing::trace!(path = %self.outdir.display(), "Creating root output directory"); + fs_err::create_dir_all(&self.outdir).map_err(|e| MultiError(vec![e.into()]))?; + let (nursery, nursery_stream) = Nursery::new(); + self.spawn_inventory_task(&nursery, fspecs); + self.spawn_object_tasks(&nursery); + drop(nursery); + let r = self.await_nursery(nursery_stream).await; + self.filterlog.finish(); + r + } + + fn spawwn_cltrc_listener(self: &Arc) { tokio::spawn({ let this = self.clone(); async move { @@ -110,11 +128,13 @@ impl Syncer { } } }); + } - tracing::trace!(path = %self.outdir.display(), "Creating root output directory"); - fs_err::create_dir_all(&self.outdir).map_err(|e| MultiError(vec![e.into()]))?; - let mut joinset = JoinSet::new(); - let (fspec_sender, fspec_receiver) = async_channel::bounded(CHANNEL_SIZE); + fn spawn_inventory_task( + self: &Arc, + nursery: &Nursery>, + fspecs: Vec, + ) { let obj_sender = { let guard = self .obj_sender @@ -125,45 +145,52 @@ impl Syncer { .cloned() .expect("obj_sender should not be None") }; - - for _ in 0..self.inventory_jobs.get() { - let clnt = self.client.clone(); - let token = self.token.clone(); - let recv = fspec_receiver.clone(); - let sender = obj_sender.clone(); - joinset.spawn(async move { - while let Ok(fspec) = recv.recv().await { - let clnt = clnt.clone(); - let sender = sender.clone(); - let r = token - .run_until_cancelled(async move { - let entries = clnt.download_inventory_csv(fspec).await?; - for entry in entries { - match entry? { - InventoryEntry::Directory(d) => { - tracing::debug!(url = %d.url(), "Ignoring directory entry in inventory list"); - } - InventoryEntry::Item(item) => { - if sender.send(item).await.is_err() { - // Assume we're shutting down - return Ok(()); - } + let this = self.clone(); + let subnursery = nursery.clone(); + nursery.spawn( + self.until_cancelled_ok(async move { + let mut tracker = TreeTracker::new(); + for spec in fspecs { + let entries = this.client.download_inventory_csv(spec).await?; + for entry in entries { + match entry.context("error reading from inventory list file")? { + InventoryEntry::Directory(d) => { + tracing::debug!(url = %d.url(), "Ignoring directory entry in inventory list"); + } + InventoryEntry::Item(item) => { + let notify = if !item.is_deleted() { + let notify = Arc::new(Notify::new()); + for dir in tracker.add(&item.key, notify.clone(), item.old_filename())? { + subnursery.spawn({ + this.until_cancelled_ok({ + let this = this.clone(); + async move { this.cleanup_dir(dir).await } + }) + }); } + Some(notify) + } else { + None + }; + if obj_sender.send((item, notify)).await.is_err() { + // Assume we're shutting down + return Ok(()); } } - Ok(()) - }) - .await; - match r { - Some(Ok(())) => (), - Some(Err(e)) => return Err(e), - None => return Ok(()), + } } } + for dir in tracker.finish() { + subnursery.spawn({ + this.until_cancelled_ok({ + let this = this.clone(); + async move { this.cleanup_dir(dir).await } + }) + }); + } Ok(()) - }); - } - drop(obj_sender); + }) + ); { let mut guard = self .obj_sender @@ -171,49 +198,103 @@ impl Syncer { .expect("obj_sender mutex should not be poisoned"); *guard = None; } - drop(fspec_receiver); - - joinset.spawn(async move { - for fspec in manifest.files { - if fspec_sender.send(fspec).await.is_err() { - return Ok(()); - } - } - Ok(()) - }); + } - for _ in 0..self.object_jobs.get() { + fn spawn_object_tasks(self: &Arc, nursery: &Nursery>) { + for _ in 0..self.jobs.get() { let this = self.clone(); let recv = self.obj_receiver.clone(); - joinset.spawn(async move { - while let Ok(item) = recv.recv().await { + nursery.spawn(async move { + while let Ok((item, notify)) = recv.recv().await { if this.token.is_cancelled() { return Ok(()); } - Box::pin(this.process_item(item)).await?; + let r = Box::pin(this.process_item(item)).await; + if let Some(n) = notify { + n.notify_one(); + } + r?; } Ok(()) }); } + } - let mut errors = Vec::new(); - while let Some(r) = joinset.join_next().await { - match r { - Ok(Ok(())) => (), - Ok(Err(e)) => { - tracing::error!(error = ?e, "Error occurred"); - if errors.is_empty() { - tracing::info!("Shutting down in response to error"); - self.shutdown(); + /// Fetch the first line of each inventory list file in `specs` and sort + /// the list by the keys in those lines + async fn sort_csvs_by_first_line( + self: &Arc, + specs: Vec, + ) -> Result, MultiError> { + tracing::info!("Peeking at inventory lists in order to sort by first line ..."); + let (nursery, nursery_stream) = Nursery::new(); + let mut receiver = { + let specs = Arc::new(Mutex::new(specs)); + let (output_sender, output_receiver) = tokio::sync::mpsc::channel(CHANNEL_SIZE); + for _ in 0..self.jobs.get() { + let clnt = self.client.clone(); + let specs = specs.clone(); + let sender = output_sender.clone(); + nursery.spawn(self.until_cancelled_ok(async move { + while let Some(fspec) = { + let mut guard = specs.lock().expect("specs mutex should not be poisoned"); + guard.pop() + } { + if let Some(entry) = clnt.peek_inventory_csv(&fspec).await? { + if sender.send((fspec, entry)).await.is_err() { + // Assume we're shutting down + return Ok(()); + } + } } - errors.push(e); - } - Err(e) if e.is_panic() => std::panic::resume_unwind(e.into_panic()), - Err(_) => (), + Ok(()) + })); } + output_receiver + }; + drop(nursery); + let mut firsts2fspecs = BTreeMap::new(); + while let Some((fspec, entry)) = receiver.recv().await { + firsts2fspecs.insert(entry.key().to_owned(), fspec); } - self.filterlog.finish(); + self.await_nursery(nursery_stream).await?; + Ok(firsts2fspecs.into_values().collect()) + } + + /// Run the given future to completion, cancelling it if `token` is + /// cancelled, in which case `Ok(())` is returned. + fn until_cancelled_ok( + &self, + fut: Fut, + ) -> impl Future> + Send + 'static + where + Fut: Future> + Send + 'static, + { + // Use an async block instead of making the method async so that the + // future won't capture &self and thus will be 'static + let token = self.token.clone(); + async move { token.run_until_cancelled(fut).await.unwrap_or(Ok(())) } + } + /// Wait for all tasks in a nursery to complete. If any errors occur, + /// [`Syncer::shutdown()`] is called, and a [`MultiError`] of all errors + /// (including a message about Ctrl-C being received if that happened) is + /// returned. + async fn await_nursery( + &self, + mut stream: NurseryStream>, + ) -> Result<(), MultiError> { + let mut errors = Vec::new(); + while let Some(r) = stream.next().await { + if let Err(e) = r { + tracing::error!(error = ?e, "Error occurred"); + if errors.is_empty() { + tracing::info!("Shutting down in response to error"); + self.shutdown(); + } + errors.push(e); + } + } if self.terminated.load(Ordering::Acquire) { errors.push(anyhow::anyhow!("Shut down due to Ctrl-C")); } @@ -224,7 +305,7 @@ impl Syncer { } } - fn shutdown(self: &Arc) { + fn shutdown(&self) { if !self.token.is_cancelled() { self.token.cancel(); self.obj_receiver.close(); @@ -263,7 +344,7 @@ impl Syncer { } else { self.outdir.clone() }; - let mdmanager = MetadataManager::new(self, &parentdir, filename); + let mdmanager = FileMetadataManager::new(self, &parentdir, filename); if item.is_latest { tracing::info!("Object is latest version of key"); @@ -444,134 +525,72 @@ impl Syncer { "Process info", ); } -} - -/// Metadata about the latest version of a key -#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -struct Metadata { - /// The object's version ID - version_id: String, - - /// The object's etag - etag: String, -} - -impl Metadata { - /// Return the filename used for backing up a non-latest object that has - /// `self` as its metadata and `basename` as the filename portion of its - /// key - fn old_filename(&self, basename: &str) -> String { - format!("{}.old.{}.{}", basename, self.version_id, self.etag) - } -} - -/// Handle for manipulating the metadata for the latest version of a key in a -/// local JSON database -struct MetadataManager<'a> { - syncer: &'a Syncer, - - /// The local directory in which the downloaded object and the JSON - /// database are both located - dirpath: &'a Path, - - /// The path to the JSON database - database_path: PathBuf, - - /// The filename of the object - filename: &'a str, -} -impl<'a> MetadataManager<'a> { - fn new(syncer: &'a Syncer, parentdir: &'a Path, filename: &'a str) -> Self { - MetadataManager { - syncer, - dirpath: parentdir, - database_path: parentdir.join(METADATA_FILENAME), - filename, + #[tracing::instrument(skip_all, fields(dirpath = %dir.path().unwrap_or("")))] + async fn cleanup_dir(&self, dir: Directory>) -> anyhow::Result<()> { + let mut notifiers = Vec::new(); + let dir = dir.map(|n| { + notifiers.push(n); + }); + for n in notifiers { + n.notified().await; } - } - - /// Acquire a lock on this JSON database - async fn lock(&self) -> Guard<'a> { - self.syncer.lock_path(self.database_path.clone()).await - } - - /// Read & parse the database file. If the file does not exist, return an - /// empty map. - fn load(&self) -> anyhow::Result> { - let content = match fs_err::read_to_string(&self.database_path) { - Ok(content) => content, - Err(e) if e.kind() == ErrorKind::NotFound => String::from("{}"), - Err(e) => return Err(e.into()), + let dirpath = match dir.path() { + Some(p) => self.outdir.join(p), + None => self.outdir.clone(), }; - serde_json::from_str(&content).with_context(|| { - format!( - "failed to deserialize contents of {}", - self.database_path.display() - ) - }) - } - - /// Set the content of the database file to the serialized map - fn store(&self, data: BTreeMap) -> anyhow::Result<()> { - let fp = tempfile::Builder::new() - .prefix(".s3invsync.versions.") - .tempfile_in(self.dirpath) - .with_context(|| { - format!( - "failed to create temporary database file for updating {}", - self.database_path.display() - ) - })?; - serde_json::to_writer_pretty(fp.as_file(), &data).with_context(|| { - format!( - "failed to serialize metadata to {}", - self.database_path.display() - ) - })?; - fp.persist(&self.database_path).with_context(|| { - format!( - "failed to persist temporary database file to {}", - self.database_path.display() - ) - })?; - Ok(()) - } - - /// Retrieve the metadata for the key from the database - async fn get(&self) -> anyhow::Result { - tracing::trace!(file = self.filename, database = %self.database_path.display(), "Fetching object metadata for file from database"); - let mut data = { - let _guard = self.lock().await; - self.load()? - }; - let Some(md) = data.remove(self.filename) else { - anyhow::bail!( - "No entry for {:?} in {}", - self.filename, - self.database_path.display() - ); + let mut files_to_delete = Vec::new(); + let mut dirs_to_delete = Vec::new(); + let mut dbdeletions = Vec::new(); + let iter = match fs_err::read_dir(&dirpath) { + Ok(iter) => iter, + Err(e) if e.kind() == ErrorKind::NotFound => return Ok(()), + Err(e) => return Err(e.into()), }; - Ok(md) - } - - /// Set the metadata for the key in the database to `md` - async fn set(&self, md: Metadata) -> anyhow::Result<()> { - tracing::trace!(file = self.filename, database = %self.database_path.display(), "Setting object metadata for file in database"); - let _guard = self.lock().await; - let mut data = self.load()?; - data.insert(self.filename.to_owned(), md); - self.store(data)?; - Ok(()) - } - - /// Remove the metadata for the key from the database - async fn delete(&self) -> anyhow::Result<()> { - tracing::trace!(file = self.filename, database = %self.database_path.display(), "Deleting object metadata for file from database"); - let _guard = self.lock().await; - let mut data = self.load()?; - if data.remove(self.filename).is_some() { - self.store(data)?; + for entry in iter { + let entry = entry?; + let is_dir = entry.file_type()?.is_dir(); + let to_delete = match entry.file_name().to_str() { + Some(name) => { + if is_dir { + !dir.contains_dir(name) + } else { + let b = !dir.contains_file(name) && name != METADATA_FILENAME; + if b && !is_special_component(name) { + dbdeletions.push(name.to_owned()); + } + b + } + } + None => true, + }; + if to_delete { + if is_dir { + dirs_to_delete.push(entry.path()); + } else { + files_to_delete.push(entry.path()); + } + } + } + for p in files_to_delete { + tracing::debug!(path = %p.display(), "File does not belong in backup; deleting"); + if let Err(e) = fs_err::remove_file(&p) { + tracing::warn!(error = %e, path = %p.display(), "Failed to delete file"); + } + } + for p in dirs_to_delete { + tracing::debug!(path = %p.display(), "Directory does not belong in backup; deleting"); + if let Err(e) = fs_err::tokio::remove_dir_all(&p).await { + tracing::warn!(error = %e, path = %p.display(), "Failed to delete directory"); + } + } + if !dbdeletions.is_empty() { + let manager = MetadataManager::new(&dirpath); + let mut data = manager.load()?; + for name in dbdeletions { + data.remove(&name); + } + manager.store(data)?; } Ok(()) } diff --git a/src/syncer/treetracker/inner.rs b/src/syncer/treetracker/inner.rs new file mode 100644 index 0000000..3bf25b0 --- /dev/null +++ b/src/syncer/treetracker/inner.rs @@ -0,0 +1,315 @@ +use crate::keypath::KeyPath; +use either::Either; +use std::cmp::Ordering; +use std::collections::HashMap; + +/// An "open" directory within a [`TreeTracker`][super::TreeTracker], i.e., one +/// to which keys are currently being added (either directly or to a descendant +/// directory) +#[derive(Clone, Debug, Eq, PartialEq)] +pub(super) struct PartialDirectory { + /// All files & directories in this directory that have been seen so far, + /// excluding `current_subdir` + pub(super) entries: Vec>, + + /// The name of the subdirectory of this directory that is currently + /// "open", if any + pub(super) current_subdir: Option, +} + +impl PartialDirectory { + /// Create a new, empty `PartialDirectory` + pub(super) fn new() -> Self { + PartialDirectory { + entries: Vec::new(), + current_subdir: None, + } + } + + /// Returns true if the directory is empty, i.e., if no entries have been + /// registered in it + pub(super) fn is_empty(&self) -> bool { + self.entries.is_empty() && self.current_subdir.is_none() + } + + /// Mark the current "open" subdirectory as closed, adding to `entries` + /// + /// # Panics + /// + /// Panics if there is no current open subdirectory. + pub(super) fn close_current(&mut self) { + let Some(name) = self.current_subdir.take() else { + panic!("PartialDirectory::close_current() called without a current directory"); + }; + self.entries.push(Entry::dir(name)); + } + + /// Returns true if the last entry added to this directory is a subdirectory + pub(super) fn last_entry_is_dir(&self) -> bool { + self.current_subdir.is_some() + } + + /// Compare `cname` against the name of the last entry added to this + /// directory + pub(super) fn cmp_vs_last_entry(&self, cname: CmpName<'_>) -> Option { + self.current_subdir + .as_deref() + .map(|cd| cname.cmp(&CmpName::Dir(cd))) + .or_else(|| self.entries.last().map(|en| cname.cmp(&en.cmp_name()))) + } +} + +/// A file or directory entry in an "open" directory tracked by +/// [`TreeTracker`][super::TreeTracker] +#[derive(Clone, Debug, Eq, PartialEq)] +pub(super) enum Entry { + File { + /// The filename + name: String, + + /// If the latest version of the corresponding key has been added, this + /// is its payload. + value: Option, + + /// Mapping from "old filenames" registered for the key to their + /// payloads + old_filenames: HashMap, + }, + Dir { + /// The name of the directory + name: String, + }, +} + +impl Entry { + /// Create a new `Entry::File` with the given `name`. If `old_filename` is + /// `None`, `value` is used as the payload for the latest version of the + /// key; otherwise, the given old filename is registered with `value` as + /// its payload. + pub(super) fn file>( + name: S, + value: T, + old_filename: Option, + ) -> Entry { + if let Some(of) = old_filename { + Entry::File { + name: name.into(), + value: None, + old_filenames: HashMap::from([(of, value)]), + } + } else { + Entry::File { + name: name.into(), + value: Some(value), + old_filenames: HashMap::new(), + } + } + } + + /// Create a new `Entry::Dir` with the given `name` + pub(super) fn dir>(name: S) -> Entry { + Entry::Dir { name: name.into() } + } + + /// Returns the name of the entry + pub(super) fn name(&self) -> &str { + match self { + Entry::File { name, .. } => name, + Entry::Dir { name } => name, + } + } + + /// Returns the name of the entry as a [`CmpName`] + pub(super) fn cmp_name(&self) -> CmpName<'_> { + match self { + Entry::File { name, .. } => CmpName::File(name.as_ref()), + Entry::Dir { name } => CmpName::Dir(name.as_ref()), + } + } +} + +/// A wrapper around an individual path name component that compares it to +/// other components as though they were part of longer paths, i.e., directory +/// names have an implicit trailing '/' added. As an exception, if a file name +/// and a directory name are equal aside from the trailing '/', this type +/// compares them as equal. +#[derive(Clone, Copy, Debug)] +pub(super) enum CmpName<'a> { + File(&'a str), + Dir(&'a str), +} + +impl CmpName<'_> { + /// Returns the inner name, without any trailing slashes + pub(super) fn name(&self) -> &str { + match self { + CmpName::File(s) => s, + CmpName::Dir(s) => s, + } + } + + /// Returns an iterator over all characters in the name. If the name is + /// for a directory, a `'/'` is emitted at the end of the iterator. + pub(super) fn chars(&self) -> impl Iterator + '_ { + match self { + CmpName::File(s) => Either::Left(s.chars()), + CmpName::Dir(s) => Either::Right(s.chars().chain(std::iter::once('/'))), + } + } +} + +impl PartialEq for CmpName<'_> { + fn eq(&self, other: &CmpName<'_>) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl Eq for CmpName<'_> {} + +impl PartialOrd for CmpName<'_> { + fn partial_cmp(&self, other: &CmpName<'_>) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for CmpName<'_> { + fn cmp(&self, other: &CmpName<'_>) -> Ordering { + if self.name() == other.name() { + Ordering::Equal + } else { + self.chars().cmp(other.chars()) + } + } +} + +/// An iterator over the path components of a key path +#[derive(Clone, Debug, Eq, PartialEq)] +pub(super) struct KeyComponents<'a, T> { + i: usize, + path: &'a str, + value: Option, + old_filename: Option>, +} + +impl<'a, T> KeyComponents<'a, T> { + pub(super) fn new(key: &'a KeyPath, value: T, old_filename: Option) -> Self { + KeyComponents { + i: 0, + path: key.as_ref(), + value: Some(value), + old_filename: Some(old_filename), + } + } +} + +impl<'a, T> Iterator for KeyComponents<'a, T> { + type Item = (usize, Component<'a, T>); + + fn next(&mut self) -> Option { + let c = match self.path.find('/') { + Some(i) => { + let name = &self.path[..i]; + self.path = &self.path[(i + 1)..]; + Component::Dir(name) + } + None => Component::File(self.path, self.value.take()?, self.old_filename.take()?), + }; + let i = self.i; + self.i += 1; + Some((i, c)) + } +} + +/// A path component of a key path +#[derive(Clone, Debug, Eq, PartialEq)] +pub(super) enum Component<'a, T> { + /// `name` (no trailing slash) + Dir(&'a str), + + /// `name`, `value`, `old_filename` + File(&'a str, T, Option), +} + +impl<'a, T> Component<'a, T> { + pub(super) fn cmp_name(&self) -> CmpName<'a> { + match self { + Component::Dir(name) => CmpName::Dir(name), + Component::File(name, _, _) => CmpName::File(name), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + mod cmp_name { + use super::*; + + #[test] + fn dir_eq_file() { + assert!(CmpName::File("foo") == CmpName::Dir("foo")); + } + + #[test] + fn pre_slash_dir_before_dir() { + assert!(CmpName::Dir("apple!banana") < CmpName::Dir("apple")); + } + + #[test] + fn pre_slash_file_before_dir() { + assert!(CmpName::File("apple!banana") < CmpName::Dir("apple")); + } + + #[test] + fn pre_slash_dir_after_file() { + assert!(CmpName::Dir("apple!banana") > CmpName::File("apple")); + } + + #[test] + fn pre_slash_file_after_file() { + assert!(CmpName::File("apple!banana") > CmpName::File("apple")); + } + } + + mod key_components { + use super::*; + + #[test] + fn plain() { + let key = "foo/bar/quux.txt".parse::().unwrap(); + let mut iter = KeyComponents::new(&key, 1, None); + assert_eq!(iter.next(), Some((0, Component::Dir("foo")))); + assert_eq!(iter.next(), Some((1, Component::Dir("bar")))); + assert_eq!(iter.next(), Some((2, Component::File("quux.txt", 1, None)))); + assert_eq!(iter.next(), None); + assert_eq!(iter.next(), None); + } + + #[test] + fn filename_only() { + let key = "quux.txt".parse::().unwrap(); + let mut iter = KeyComponents::new(&key, 1, None); + assert_eq!(iter.next(), Some((0, Component::File("quux.txt", 1, None)))); + assert_eq!(iter.next(), None); + assert_eq!(iter.next(), None); + } + + #[test] + fn with_old_filename() { + let key = "foo/bar/quux.txt".parse::().unwrap(); + let mut iter = KeyComponents::new(&key, 1, Some("quux.old.1.2".into())); + assert_eq!(iter.next(), Some((0, Component::Dir("foo")))); + assert_eq!(iter.next(), Some((1, Component::Dir("bar")))); + assert_eq!( + iter.next(), + Some(( + 2, + Component::File("quux.txt", 1, Some("quux.old.1.2".into())) + )) + ); + assert_eq!(iter.next(), None); + assert_eq!(iter.next(), None); + } + } +} diff --git a/src/syncer/treetracker/mod.rs b/src/syncer/treetracker/mod.rs new file mode 100644 index 0000000..fc8f55c --- /dev/null +++ b/src/syncer/treetracker/mod.rs @@ -0,0 +1,1088 @@ +mod inner; +use self::inner::*; +use crate::keypath::KeyPath; +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; +use thiserror::Error; + +/// A type for tracking keys as they're encountered and — assuming the keys are +/// in sorted order — reporting the contents of directories for which no +/// further keys will be seen. +/// +/// When a key is encountered, it can be for either its latest version or a +/// non-latest version; non-latest versions are identified by "old filename" +/// strings (the base filenames with which they are saved), which need not be +/// sorted, but must be unique. +/// +/// Each key version additionally has a payload of type `T`, used by +/// `s3invsync` for storing [`tokio::sync::Notify`] values. +#[derive(Clone, Debug, Eq, PartialEq)] +pub(super) struct TreeTracker( + /// A stack of currently "open" directories, i.e., directories for which + /// the `TreeTracker` is currently receiving keys. The first element + /// represents the root of the directory tree, the next element (if any) + /// represents the current "open" directory within that, etc. + /// + /// # Invariants + /// + /// - The stack is nonempty. (A `TreeTracker` with an empty stack is + /// referred to as "void" by the documentation and by error messages.) + /// + /// - For all elements `pd` other than the last, + /// `pd.current_subdirs.is_some()`. + /// + /// - For the last element `pd`, `pd.current_subdirs.is_none()`. + /// + /// - Once at least one key has been added to the tracker, for all elements + /// `pd`, either `pd.entries.last()` or `pd.current_subdir` is + /// non-`None`. + Vec>, +); + +impl TreeTracker { + /// Create a new, empty `TreeTracker` + pub(super) fn new() -> Self { + TreeTracker(vec![PartialDirectory::new()]) + } + + /// Register the key `key` with payload `value` and the given old filename + /// (if this is not the latest version of the key). + /// + /// If encountering the key indicates that the currently "open" directory and + /// zero or more of its parents will no longer be receiving any further + /// keys, those directories are returned, innermost first. + /// + /// # Errors + /// + /// This method returns `Err` if `key` is lexicographically less than the + /// previous distinct key, if `key` has already been encountered as a + /// directory path, if a parent directory of `key` has already been + /// encountered as a file path, or if `key` equals the previous key and + /// this is the second time that `add()` was called with that key & + /// `old_filename` value. + pub(super) fn add( + &mut self, + key: &KeyPath, + value: T, + old_filename: Option, + ) -> Result>, TreeTrackerError> { + let mut popped_dirs = Vec::new(); + let mut partiter = KeyComponents::new(key, value, old_filename); + while let Some((i, part)) = partiter.next() { + let Some(pd) = self.0.get_mut(i) else { + unreachable!( + "TreeTracker::add() iteration should not go past the end of the stack" + ); + }; + let cmp_name = part.cmp_name(); + match part { + Component::File(name, value, old_filename) => { + match (pd.last_entry_is_dir(), pd.cmp_vs_last_entry(cmp_name)) { + (in_dir, Some(Ordering::Greater)) => { + if in_dir { + // Close current dirs + for _ in (i + 1)..(self.0.len()) { + popped_dirs.push(self.pop()); + } + } + self.push_file(name, value, old_filename)?; + break; + } + (true, Some(Ordering::Equal)) => { + return Err(TreeTrackerError::Conflict(key.into())); + } + (false, Some(Ordering::Equal)) => { + self.push_file(name, value, old_filename)?; + } + (_, Some(Ordering::Less)) => { + return Err(TreeTrackerError::Unsorted { + before: self.last_key(), + after: key.into(), + }); + } + (_, None) => { + assert!( + self.is_empty(), + "top dir of TreeTracker should be root when empty" + ); + self.push_file(name, value, old_filename)?; + break; + } + } + } + Component::Dir(name) => { + match (pd.last_entry_is_dir(), pd.cmp_vs_last_entry(cmp_name)) { + (in_dir, Some(Ordering::Greater)) => { + if in_dir { + // Close current dirs + for _ in (i + 1)..(self.0.len()) { + popped_dirs.push(self.pop()); + } + } + self.push_parts(name, partiter)?; + break; + } + (true, Some(Ordering::Equal)) => continue, + (false, Some(Ordering::Equal)) => { + return Err(TreeTrackerError::Conflict(self.last_key())); + } + (_, Some(Ordering::Less)) => { + return Err(TreeTrackerError::Unsorted { + before: self.last_key(), + after: key.into(), + }); + } + (_, None) => { + assert!( + self.is_empty(), + "top dir of TreeTracker should be root when empty" + ); + self.push_parts(name, partiter)?; + break; + } + } + } + } + } + Ok(popped_dirs) + } + + /// Indicate to the `TreeTracker` that all keys have been encountered. + /// Returns all remaining "open" directories, innermost first. + pub(super) fn finish(mut self) -> Vec> { + let mut dirs = Vec::new(); + while !self.0.is_empty() { + dirs.push(self.pop()); + } + dirs + } + + /// Returns `true` if the `TreeTracker` is empty, i.e., if the stack is + /// empty or its only item is empty. + fn is_empty(&self) -> bool { + self.0.is_empty() || (self.0.len() == 1 && self.0[0].is_empty()) + } + + /// Call [`TreeTracker::push_dir()`] on `first_dirname`, and then call + /// [`TreeTracker::push_dir()`] or [`TreeTracker::push_file()`], as + /// appropriate, on each element of the iterator + /// `rest`. + fn push_parts( + &mut self, + first_dirname: &str, + rest: KeyComponents<'_, T>, + ) -> Result<(), TreeTrackerError> { + self.push_dir(first_dirname); + for (_, part) in rest { + match part { + Component::Dir(name) => self.push_dir(name), + Component::File(name, value, old_filename) => { + self.push_file(name, value, old_filename)?; + } + } + } + Ok(()) + } + + /// Open a directory named `name` inside the current innermost open + /// directory. + /// + /// # Panics + /// + /// Panics if the tracker is void or if the current innermost directory + /// already contains an open directory. + fn push_dir(&mut self, name: &str) { + let Some(pd) = self.0.last_mut() else { + panic!("TreeTracker::push_dir() called on void tracker"); + }; + assert!( + pd.current_subdir.is_none(), + "TreeTracker::push_dir() called when top dir has subdir" + ); + pd.current_subdir = Some(name.to_owned()); + self.0.push(PartialDirectory::new()); + } + + /// Add the key with filename `name` to the current innermost open + /// directory if is not already present. If `old_filename` is `None`, + /// `value` is used as the payload for the latest version of the key; + /// otherwise, the given old filename is added to the file's collection of + /// old filenames, and `value` is used as the corresponding payload. + /// + /// # Errors + /// + /// This method returns `Err` if `key` equals the previous key and there is + /// already a payload for the given `old_filename` value. + /// + /// # Panics + /// + /// Panics if the tracker is void, if the current innermost directory + /// already contains an open directory, if `name` is less than the previous + /// name added to the innermost open directory, or if `name` equals the + /// previous name and the previously-added entry is a directory. + fn push_file( + &mut self, + name: &str, + value: T, + old_filename: Option, + ) -> Result<(), TreeTrackerError> { + let Some(pd) = self.0.last_mut() else { + panic!("TreeTracker::push_file() called on void tracker"); + }; + assert!( + pd.current_subdir.is_none(), + "TreeTracker::push_file() called when top dir has subdir" + ); + if let Some(en) = pd.entries.last_mut() { + match CmpName::File(name).cmp(&en.cmp_name()) { + Ordering::Less => { + panic!("TreeTracker::push_file() called with filename less than previous name") + } + Ordering::Equal => { + let Entry::File { + old_filenames, + value: envalue, + .. + } = en + else { + panic!("TreeTracker::push_file() called with filename equal to previous name and previous is not a file"); + }; + if let Some(of) = old_filename { + if old_filenames.insert(of.clone(), value).is_some() { + return Err(TreeTrackerError::DuplicateOldFile { + key: self.last_key(), + old_filename: of, + }); + } + } else if envalue.is_none() { + *envalue = Some(value); + } else { + return Err(TreeTrackerError::DuplicateFile(self.last_key())); + } + } + Ordering::Greater => pd.entries.push(Entry::file(name, value, old_filename)), + } + } else { + pd.entries.push(Entry::file(name, value, old_filename)); + } + Ok(()) + } + + /// "Close" the current innermost open directory and return it as a + /// [`Directory`]. + /// + /// # Panics + /// + /// Panics if the tracker is void or if the current innermost directory + /// already contains an open directory. + fn pop(&mut self) -> Directory { + let Some(pd) = self.0.pop() else { + panic!("TreeTracker::pop() called on void tracker"); + }; + assert!( + pd.current_subdir.is_none(), + "TreeTracker::pop() called when top dir has subdir" + ); + let entries = pd.entries; + let path = (!self.0.is_empty()).then(|| self.last_key()); + if let Some(ppd) = self.0.last_mut() { + ppd.close_current(); + } + let mut files = HashMap::new(); + let mut directories = HashSet::new(); + for en in entries { + match en { + Entry::File { + name, + value, + old_filenames, + } => { + if let Some(value) = value { + files.insert(name, value); + } + files.extend(old_filenames); + } + Entry::Dir { name } => { + directories.insert(name); + } + } + } + Directory { + path, + files, + directories, + } + } + + /// Returns the key most recently added to the tracker. + /// + /// # Panics + /// + /// Panics if no keys have been added to the tracker or the stack contains + /// more than one element yet one of them is empty. + fn last_key(&self) -> String { + let mut s = String::new(); + for pd in &self.0 { + if let Some(name) = pd + .current_subdir + .as_deref() + .or_else(|| pd.entries.last().map(Entry::name)) + { + if !s.is_empty() { + s.push('/'); + } + s.push_str(name); + } else { + assert!( + self.is_empty(), + "TreeTracker dir should be empty root when empty" + ); + panic!("TreeTracker::last_key() called on empty tracker"); + } + } + s + } +} + +/// A directory, along with a collection of the names of the files & +/// subdirectories within. +#[derive(Clone, Debug, Eq, PartialEq)] +pub(super) struct Directory { + /// The forward-slash-separated path to the directory, relative to the root + /// of the tree tracked by the creating [`TreeTracker`]. If the directory + /// is the root directory, this is `None`. + path: Option, + + /// A mapping from file names (including "old filenames") in the directory + /// to their payloads. + files: HashMap, + + /// A set of the names of the subdirectories within the directory + directories: HashSet, +} + +impl Directory { + /// Returns the forward-slash-separated path to the directory, relative to + /// the root of the tree tracked by the creating [`TreeTracker`]. If the + /// directory is the root directory, this is `None`. + pub(super) fn path(&self) -> Option<&str> { + self.path.as_deref() + } + + /// Returns `true` if the directory contains a file with the given name + pub(super) fn contains_file(&self, name: &str) -> bool { + self.files.contains_key(name) + } + + /// Returns `true` if the directory contains a subdirectory with the given name + pub(super) fn contains_dir(&self, name: &str) -> bool { + self.directories.contains(name) + } + + /// Apply the given function to all file payloads in the directory + pub(super) fn map U>(self, mut f: F) -> Directory { + Directory { + path: self.path, + files: self + .files + .into_iter() + .map(|(name, value)| (name, f(value))) + .collect(), + directories: self.directories, + } + } +} + +/// Error returned by [`TreeTracker::add()`] +#[derive(Clone, Debug, Eq, Error, PartialEq)] +pub(super) enum TreeTrackerError { + /// The given key is lexicographically less than the previously-added key + #[error("received keys in unsorted order: {before:?} came before {after:?}")] + Unsorted { + /// The previously-added key + before: String, + + /// The key passed to the [`TreeTracker::add()`] call + after: String, + }, + + /// A path was registered as both a file and a directory + #[error("path {0:?} is used as both a file and a directory")] + Conflict(String), + + /// The given key was provided with an `old_filename` of `None`, and this + /// is the second time that happened. + #[error("file key {0:?} encountered more than once")] + DuplicateFile(String), + + /// The given key was provided with a non-`None` `old_filename`, and this + /// is the second time that key & old filename were provided. + #[error("key {key:?} has multiple non-latest versions with filename {old_filename:?}")] + DuplicateOldFile { key: String, old_filename: String }, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn same_dir() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo/bar.txt".parse::().unwrap(), 1, None), + Ok(Vec::new()) + ); + assert_eq!( + tracker.add(&"foo/quux.txt".parse::().unwrap(), 2, None), + Ok(Vec::new()) + ); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 2); + assert_eq!(dirs[0].path(), Some("foo")); + assert_eq!( + dirs[0].files, + HashMap::from([("bar.txt".into(), 1), ("quux.txt".into(), 2),]) + ); + assert!(dirs[0].directories.is_empty()); + assert_eq!(dirs[1].path(), None); + assert!(dirs[1].files.is_empty()); + assert_eq!(dirs[1].directories, HashSet::from(["foo".into()])); + } + + #[test] + fn different_dir() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo/bar.txt".parse::().unwrap(), 1, None), + Ok(Vec::new()) + ); + let dirs = tracker + .add(&"glarch/quux.txt".parse::().unwrap(), 2, None) + .unwrap(); + assert_eq!(dirs.len(), 1); + assert_eq!(dirs[0].path(), Some("foo")); + assert_eq!(dirs[0].files, HashMap::from([("bar.txt".into(), 1)])); + assert!(dirs[0].directories.is_empty()); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 2); + assert_eq!(dirs[0].path(), Some("glarch")); + assert_eq!(dirs[0].files, HashMap::from([("quux.txt".into(), 2)])); + assert!(dirs[0].directories.is_empty()); + assert_eq!(dirs[1].path(), None); + assert!(dirs[1].files.is_empty()); + assert_eq!( + dirs[1].directories, + HashSet::from(["foo".into(), "glarch".into()]) + ); + } + + #[test] + fn different_subdir() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo/bar/apple.txt".parse::().unwrap(), 1, None), + Ok(Vec::new()) + ); + let dirs = tracker + .add(&"foo/quux/banana.txt".parse::().unwrap(), 2, None) + .unwrap(); + assert_eq!(dirs.len(), 1); + assert_eq!(dirs[0].path(), Some("foo/bar")); + assert_eq!(dirs[0].files, HashMap::from([("apple.txt".into(), 1)])); + assert!(dirs[0].directories.is_empty()); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 3); + assert_eq!(dirs[0].path(), Some("foo/quux")); + assert_eq!(dirs[0].files, HashMap::from([("banana.txt".into(), 2)])); + assert!(dirs[0].directories.is_empty()); + assert_eq!(dirs[1].path(), Some("foo")); + assert!(dirs[1].files.is_empty()); + assert_eq!( + dirs[1].directories, + HashSet::from(["bar".into(), "quux".into()]) + ); + assert_eq!(dirs[2].path(), None); + assert!(dirs[2].files.is_empty()); + assert_eq!(dirs[2].directories, HashSet::from(["foo".into()])); + } + + #[test] + fn preslash_dir_then_toslash_dir() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"foo/apple!banana/gnusto.txt".parse::().unwrap(), + 1, + None + ), + Ok(Vec::new()) + ); + let dirs = tracker + .add(&"foo/apple/cleesh.txt".parse::().unwrap(), 2, None) + .unwrap(); + assert_eq!(dirs.len(), 1); + assert_eq!(dirs[0].path(), Some("foo/apple!banana")); + assert_eq!(dirs[0].files, HashMap::from([("gnusto.txt".into(), 1)])); + assert!(dirs[0].directories.is_empty()); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 3); + assert_eq!(dirs[0].path(), Some("foo/apple")); + assert_eq!(dirs[0].files, HashMap::from([("cleesh.txt".into(), 2)])); + assert!(dirs[0].directories.is_empty()); + assert_eq!(dirs[1].path(), Some("foo")); + assert!(dirs[1].files.is_empty()); + assert_eq!( + dirs[1].directories, + HashSet::from(["apple!banana".into(), "apple".into()]) + ); + assert_eq!(dirs[2].path(), None); + assert!(dirs[1].files.is_empty()); + assert_eq!(dirs[2].directories, HashSet::from([("foo".into())])); + } + + #[test] + fn preslash_file_then_toslash_file() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"foo/bar/apple!banana.txt".parse::().unwrap(), + 1, + None + ), + Ok(Vec::new()) + ); + let e = tracker + .add(&"foo/bar/apple".parse::().unwrap(), 2, None) + .unwrap_err(); + assert_eq!( + e, + TreeTrackerError::Unsorted { + before: "foo/bar/apple!banana.txt".into(), + after: "foo/bar/apple".into() + } + ); + } + + #[test] + fn tostash_file_then_preslash_file() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo/bar/apple".parse::().unwrap(), 1, None), + Ok(Vec::new()) + ); + assert_eq!( + tracker.add( + &"foo/bar/apple!banana.txt".parse::().unwrap(), + 2, + None + ), + Ok(Vec::new()) + ); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 3); + assert_eq!(dirs[0].path(), Some("foo/bar")); + assert_eq!( + dirs[0].files, + HashMap::from([("apple".into(), 1), ("apple!banana.txt".into(), 2)]) + ); + assert!(dirs[0].directories.is_empty()); + assert_eq!(dirs[1].path(), Some("foo")); + assert!(dirs[1].files.is_empty()); + assert_eq!(dirs[1].directories, HashSet::from(["bar".into()])); + assert_eq!(dirs[2].path(), None); + assert!(dirs[2].files.is_empty()); + assert_eq!(dirs[2].directories, HashSet::from(["foo".into()])); + } + + #[test] + fn preslash_dir_then_toslash_file() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"foo/apple!banana/gnusto.txt".parse::().unwrap(), + 1, + None, + ), + Ok(Vec::new()) + ); + let e = tracker + .add(&"foo/apple".parse::().unwrap(), 2, None) + .unwrap_err(); + assert_eq!( + e, + TreeTrackerError::Unsorted { + before: "foo/apple!banana/gnusto.txt".into(), + after: "foo/apple".into() + } + ); + } + + #[test] + fn preslash_file_then_toslash_dir() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"foo/bar/apple!banana.txt".parse::().unwrap(), + 1, + None + ), + Ok(Vec::new()) + ); + assert_eq!( + tracker.add( + &"foo/bar/apple/apricot.txt".parse::().unwrap(), + 2, + None + ), + Ok(Vec::new()) + ); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 4); + assert_eq!(dirs[0].path(), Some("foo/bar/apple")); + assert_eq!(dirs[0].files, HashMap::from([("apricot.txt".into(), 2)])); + assert!(dirs[0].directories.is_empty()); + assert_eq!(dirs[1].path(), Some("foo/bar")); + assert_eq!( + dirs[1].files, + HashMap::from([("apple!banana.txt".into(), 1)]) + ); + assert_eq!(dirs[1].directories, HashSet::from(["apple".into()])); + assert_eq!(dirs[2].path(), Some("foo")); + assert!(dirs[2].files.is_empty()); + assert_eq!(dirs[2].directories, HashSet::from(["bar".into()])); + assert_eq!(dirs[3].path(), None); + assert!(dirs[3].files.is_empty()); + assert_eq!(dirs[3].directories, HashSet::from(["foo".into()])); + } + + #[test] + fn path_conflict_file_then_dir() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo/bar".parse::().unwrap(), 1, None), + Ok(Vec::new()) + ); + let e = tracker + .add(&"foo/bar/apple.txt".parse::().unwrap(), 2, None) + .unwrap_err(); + assert_eq!(e, TreeTrackerError::Conflict("foo/bar".into())); + } + + #[test] + fn path_conflict_dir_then_file() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo/bar/quux.txt".parse::().unwrap(), 1, None), + Ok(Vec::new()) + ); + let e = tracker + .add(&"foo/bar".parse::().unwrap(), 2, None) + .unwrap_err(); + assert_eq!(e, TreeTrackerError::Conflict("foo/bar".into())); + } + + #[test] + fn just_finish() { + let tracker = TreeTracker::<()>::new(); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 1); + assert_eq!(dirs[0].path(), None); + assert!(dirs[0].files.is_empty()); + assert!(dirs[0].directories.is_empty()); + } + + #[test] + fn multidir_finish() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"apple/banana/coconut/date.txt".parse::().unwrap(), + 1, + None + ), + Ok(Vec::new()) + ); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 4); + assert_eq!(dirs[0].path(), Some("apple/banana/coconut")); + assert_eq!(dirs[0].files, HashMap::from([("date.txt".into(), 1)])); + assert!(dirs[0].directories.is_empty()); + assert_eq!(dirs[1].path(), Some("apple/banana")); + assert!(dirs[1].files.is_empty()); + assert_eq!(dirs[1].directories, HashSet::from(["coconut".into()])); + assert_eq!(dirs[2].path(), Some("apple")); + assert!(dirs[2].files.is_empty()); + assert_eq!(dirs[2].directories, HashSet::from(["banana".into()])); + assert_eq!(dirs[3].path(), None); + assert!(dirs[3].files.is_empty()); + assert_eq!(dirs[3].directories, HashSet::from(["apple".into()])); + } + + #[test] + fn closedir_then_files_in_parent() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"apple/banana/coconut.txt".parse::().unwrap(), + 1, + None + ), + Ok(Vec::new()) + ); + let dirs = tracker + .add(&"apple/kumquat.txt".parse::().unwrap(), 2, None) + .unwrap(); + assert_eq!(dirs.len(), 1); + assert_eq!(dirs[0].path(), Some("apple/banana")); + assert_eq!(dirs[0].files, HashMap::from([("coconut.txt".into(), 1)])); + assert!(dirs[0].directories.is_empty()); + + assert_eq!( + tracker.add(&"apple/mango.txt".parse::().unwrap(), 3, None), + Ok(Vec::new()) + ); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 2); + assert_eq!(dirs[0].path(), Some("apple")); + assert_eq!( + dirs[0].files, + HashMap::from([("kumquat.txt".into(), 2), ("mango.txt".into(), 3)]) + ); + assert_eq!(dirs[0].directories, HashSet::from(["banana".into()])); + assert_eq!(dirs[1].path(), None); + assert!(dirs[1].files.is_empty()); + assert_eq!(dirs[1].directories, HashSet::from(["apple".into()])); + } + + #[test] + fn closedir_then_dirs_in_parent() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"apple/banana/coconut.txt".parse::().unwrap(), + 1, + None + ), + Ok(Vec::new()) + ); + let dirs = tracker + .add( + &"apple/eggplant/kumquat.txt".parse::().unwrap(), + 2, + None, + ) + .unwrap(); + assert_eq!(dirs.len(), 1); + assert_eq!(dirs[0].path(), Some("apple/banana")); + assert_eq!(dirs[0].files, HashMap::from([("coconut.txt".into(), 1)])); + assert!(dirs[0].directories.is_empty()); + let dirs = tracker + .add( + &"apple/mango/tangerine.txt".parse::().unwrap(), + 3, + None, + ) + .unwrap(); + assert_eq!(dirs.len(), 1); + assert_eq!(dirs[0].path(), Some("apple/eggplant")); + assert_eq!(dirs[0].files, HashMap::from([("kumquat.txt".into(), 2)])); + assert!(dirs[0].directories.is_empty()); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 3); + assert_eq!(dirs[0].path(), Some("apple/mango")); + assert_eq!(dirs[0].files, HashMap::from([("tangerine.txt".into(), 3)])); + assert!(dirs[0].directories.is_empty()); + assert_eq!(dirs[1].path(), Some("apple")); + assert!(dirs[1].files.is_empty()); + assert_eq!( + dirs[1].directories, + HashSet::from(["banana".into(), "eggplant".into(), "mango".into()]) + ); + assert_eq!(dirs[2].path(), None); + assert!(dirs[1].files.is_empty()); + assert_eq!(dirs[2].directories, HashSet::from(["apple".into()])); + } + + #[test] + fn close_multiple_dirs() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"apple/banana/coconut/date.txt".parse::().unwrap(), + 1, + None + ), + Ok(Vec::new()) + ); + let dirs = tracker + .add(&"foo.txt".parse::().unwrap(), 2, None) + .unwrap(); + assert_eq!(dirs.len(), 3); + assert_eq!(dirs[0].path(), Some("apple/banana/coconut")); + assert_eq!(dirs[0].files, HashMap::from([("date.txt".into(), 1)])); + assert!(dirs[0].directories.is_empty()); + assert_eq!(dirs[1].path(), Some("apple/banana")); + assert!(dirs[1].files.is_empty()); + assert_eq!(dirs[1].directories, HashSet::from(["coconut".into()])); + assert_eq!(dirs[2].path(), Some("apple")); + assert!(dirs[2].files.is_empty()); + assert_eq!(dirs[2].directories, HashSet::from(["banana".into()])); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 1); + assert_eq!(dirs[0].path(), None); + assert_eq!(dirs[0].files, HashMap::from([("foo.txt".into(), 2)])); + assert_eq!(dirs[0].directories, HashSet::from(["apple".into()])); + } + + #[test] + fn same_file_twice() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo/bar/quux.txt".parse::().unwrap(), 1, None), + Ok(Vec::new()) + ); + let e = tracker + .add(&"foo/bar/quux.txt".parse::().unwrap(), 2, None) + .unwrap_err(); + assert_eq!( + e, + TreeTrackerError::DuplicateFile("foo/bar/quux.txt".into()) + ); + } + + #[test] + fn unsorted_parent_dirs() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo/gnusto/quux.txt".parse::().unwrap(), 1, None), + Ok(Vec::new()) + ); + let e = tracker + .add(&"foo/bar/cleesh.txt".parse::().unwrap(), 2, None) + .unwrap_err(); + assert_eq!( + e, + TreeTrackerError::Unsorted { + before: "foo/gnusto/quux.txt".into(), + after: "foo/bar/cleesh.txt".into() + } + ); + } + + #[test] + fn file_then_preceding_dir() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo/gnusto.txt".parse::().unwrap(), 1, None), + Ok(Vec::new()) + ); + let e = tracker + .add(&"foo/bar/cleesh.txt".parse::().unwrap(), 2, None) + .unwrap_err(); + assert_eq!( + e, + TreeTrackerError::Unsorted { + before: "foo/gnusto.txt".into(), + after: "foo/bar/cleesh.txt".into() + } + ); + } + + #[test] + fn files_in_root() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo.txt".parse::().unwrap(), 1, None), + Ok(Vec::new()) + ); + assert_eq!( + tracker.add(&"gnusto/cleesh.txt".parse::().unwrap(), 2, None), + Ok(Vec::new()) + ); + let dirs = tracker + .add(&"quux.txt".parse::().unwrap(), 3, None) + .unwrap(); + assert_eq!(dirs.len(), 1); + assert_eq!(dirs[0].path(), Some("gnusto")); + assert_eq!(dirs[0].files, HashMap::from([("cleesh.txt".into(), 2)])); + assert!(dirs[0].directories.is_empty()); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 1); + assert_eq!(dirs[0].path(), None); + assert_eq!( + dirs[0].files, + HashMap::from([("foo.txt".into(), 1), ("quux.txt".into(), 3)]) + ); + assert_eq!(dirs[0].directories, HashSet::from(["gnusto".into()])); + } + + #[test] + fn old_filename() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"foo/bar.txt".parse::().unwrap(), + 1, + Some("bar.txt.old.1.2".into()) + ), + Ok(Vec::new()) + ); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 2); + assert_eq!( + dirs[0], + Directory { + path: Some("foo".into()), + files: HashMap::from([("bar.txt.old.1.2".into(), 1)]), + directories: HashSet::new(), + } + ); + assert_eq!( + dirs[1], + Directory { + path: None, + files: HashMap::new(), + directories: HashSet::from(["foo".into()]), + } + ); + } + + #[test] + fn multiple_old_filenames() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"foo/bar.txt".parse::().unwrap(), + 1, + Some("bar.txt.old.a.b".into()) + ), + Ok(Vec::new()) + ); + assert_eq!( + tracker.add( + &"foo/bar.txt".parse::().unwrap(), + 2, + Some("bar.txt.old.1.2".into()) + ), + Ok(Vec::new()) + ); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 2); + assert_eq!( + dirs[0], + Directory { + path: Some("foo".into()), + files: HashMap::from([ + ("bar.txt.old.a.b".into(), 1), + ("bar.txt.old.1.2".into(), 2), + ]), + directories: HashSet::new(), + } + ); + assert_eq!( + dirs[1], + Directory { + path: None, + files: HashMap::new(), + directories: HashSet::from(["foo".into()]), + } + ); + } + + #[test] + fn old_and_non_old() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"foo/bar.txt".parse::().unwrap(), + 1, + Some("bar.txt.old.a.b".into()) + ), + Ok(Vec::new()) + ); + assert_eq!( + tracker.add(&"foo/bar.txt".parse::().unwrap(), 2, None), + Ok(Vec::new()) + ); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 2); + assert_eq!( + dirs[0], + Directory { + path: Some("foo".into()), + files: HashMap::from([("bar.txt.old.a.b".into(), 1), ("bar.txt".into(), 2),]), + directories: HashSet::new(), + } + ); + assert_eq!( + dirs[1], + Directory { + path: None, + files: HashMap::new(), + directories: HashSet::from(["foo".into()]), + } + ); + } + + #[test] + fn non_old_and_old() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add(&"foo/bar.txt".parse::().unwrap(), 1, None,), + Ok(Vec::new()) + ); + assert_eq!( + tracker.add( + &"foo/bar.txt".parse::().unwrap(), + 2, + Some("bar.txt.old.a.b".into()) + ), + Ok(Vec::new()) + ); + let dirs = tracker.finish(); + assert_eq!(dirs.len(), 2); + assert_eq!( + dirs[0], + Directory { + path: Some("foo".into()), + files: HashMap::from([("bar.txt".into(), 1), ("bar.txt.old.a.b".into(), 2),]), + directories: HashSet::new(), + } + ); + assert_eq!( + dirs[1], + Directory { + path: None, + files: HashMap::new(), + directories: HashSet::from(["foo".into()]), + } + ); + } + + #[test] + fn duplicate_old_filenames() { + let mut tracker = TreeTracker::new(); + assert_eq!( + tracker.add( + &"foo/bar.txt".parse::().unwrap(), + 1, + Some("bar.txt.old.1.2".into()) + ), + Ok(Vec::new()) + ); + let e = tracker + .add( + &"foo/bar.txt".parse::().unwrap(), + 2, + Some("bar.txt.old.1.2".into()), + ) + .unwrap_err(); + assert_eq!( + e, + TreeTrackerError::DuplicateOldFile { + key: "foo/bar.txt".into(), + old_filename: "bar.txt.old.1.2".into() + } + ); + } +} diff --git a/src/util.rs b/src/util.rs index 4c60be1..1f7f753 100644 --- a/src/util.rs +++ b/src/util.rs @@ -113,6 +113,13 @@ pub(crate) fn force_create_dir_all>>( Ok(()) } +/// Construct the base filename for backing up an object that is not the latest +/// version of its key, where `basename` is the filename portion of the key, +/// `version_id` is the object's version ID, and `etag` is its etag. +pub(crate) fn make_old_filename(basename: &str, version_id: &str, etag: &str) -> String { + format!("{basename}.old.{version_id}.{etag}") +} + #[cfg(test)] mod tests { use super::*;