From 61cac020df6d64fc8b30e2e3f857e46ee2927ea4 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 23 Oct 2024 18:24:54 +0200 Subject: [PATCH] closes #88 --- src/iter.rs | 35 +++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/snapshot_nonce.rs | 12 ++++++++++++ src/tx/keyspace.rs | 3 ++- src/tx/read_tx.rs | 30 ++++++++++++++++++++---------- 5 files changed, 70 insertions(+), 11 deletions(-) create mode 100644 src/iter.rs diff --git a/src/iter.rs b/src/iter.rs new file mode 100644 index 00000000..4b29913b --- /dev/null +++ b/src/iter.rs @@ -0,0 +1,35 @@ +use crate::snapshot_nonce::SnapshotNonce; + +/// A wrapper around iterators to hold a snapshot moment +/// +/// We need to hold the snapshot nonce so the GC watermark does not +/// move past this snapshot moment, removing data that may still be read. +/// +/// This may not be strictly needed because an iterator holds a read lock to a memtable anyway +/// but for correctness it's probably better. +pub struct Iter>> { + iter: I, + + #[allow(unused)] + nonce: SnapshotNonce, +} + +impl>> Iter { + pub fn new(nonce: SnapshotNonce, iter: I) -> Self { + Self { iter, nonce } + } +} + +impl>> Iterator for Iter { + type Item = crate::Result; + + fn next(&mut self) -> Option { + self.iter.next() + } +} + +impl>> DoubleEndedIterator for Iter { + fn next_back(&mut self) -> Option { + self.iter.next_back() + } +} diff --git a/src/lib.rs b/src/lib.rs index c742fabd..043296d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,6 +93,7 @@ mod error; mod file; mod flush; mod gc; +mod iter; mod journal; mod keyspace; mod monitor; diff --git a/src/snapshot_nonce.rs b/src/snapshot_nonce.rs index efbf3809..329fd414 100644 --- a/src/snapshot_nonce.rs +++ b/src/snapshot_nonce.rs @@ -10,6 +10,18 @@ pub struct SnapshotNonce { tracker: SnapshotTracker, } +impl Clone for SnapshotNonce { + fn clone(&self) -> Self { + // IMPORTANT: Increment snapshot count in tracker + self.tracker.open(self.instant); + + Self { + instant: self.instant, + tracker: self.tracker.clone(), + } + } +} + impl Drop for SnapshotNonce { fn drop(&mut self) { self.tracker.close(self.instant); diff --git a/src/tx/keyspace.rs b/src/tx/keyspace.rs index d6c8f938..7c938043 100644 --- a/src/tx/keyspace.rs +++ b/src/tx/keyspace.rs @@ -16,7 +16,8 @@ use super::oracle::Oracle; #[derive(Clone)] #[allow(clippy::module_name_repetitions)] pub struct TransactionalKeyspace { - pub(crate) inner: Keyspace, + #[doc(hidden)] + pub inner: Keyspace, #[cfg(feature = "ssi_tx")] pub(super) oracle: Arc, diff --git a/src/tx/read_tx.rs b/src/tx/read_tx.rs index e469c8a5..978f6ad5 100644 --- a/src/tx/read_tx.rs +++ b/src/tx/read_tx.rs @@ -248,11 +248,13 @@ impl ReadTransaction { &'a self, partition: &'a TxPartitionHandle, ) -> impl DoubleEndedIterator> + 'static { - partition + let iter = partition .inner .tree .iter_with_seqno(self.nonce.instant, None) - .map(|item| Ok(item?)) + .map(|item| Ok(item?)); + + crate::iter::Iter::new(self.nonce.clone(), iter) } /// Iterates over the transaction's state, returning keys only. @@ -263,11 +265,13 @@ impl ReadTransaction { &'a self, partition: &'a TxPartitionHandle, ) -> impl DoubleEndedIterator> + 'static { - partition + let iter = partition .inner .tree .keys_with_seqno(self.nonce.instant, None) - .map(|item| Ok(item?)) + .map(|item| Ok(item?)); + + crate::iter::Iter::new(self.nonce.clone(), iter) } /// Iterates over the transaction's state, returning values only. @@ -278,11 +282,13 @@ impl ReadTransaction { &'a self, partition: &'a TxPartitionHandle, ) -> impl DoubleEndedIterator> + 'static { - partition + let iter = partition .inner .tree .values_with_seqno(self.nonce.instant, None) - .map(|item| Ok(item?)) + .map(|item| Ok(item?)); + + crate::iter::Iter::new(self.nonce.clone(), iter) } /// Iterates over a range of the transaction's state. @@ -311,11 +317,13 @@ impl ReadTransaction { partition: &'a TxPartitionHandle, range: R, ) -> impl DoubleEndedIterator> + 'static { - partition + let iter = partition .inner .tree .range_with_seqno(range, self.nonce.instant, None) - .map(|item| Ok(item?)) + .map(|item| Ok(item?)); + + crate::iter::Iter::new(self.nonce.clone(), iter) } /// Iterates over a range of the transaction's state. @@ -344,10 +352,12 @@ impl ReadTransaction { partition: &'a TxPartitionHandle, prefix: K, ) -> impl DoubleEndedIterator> + 'static { - partition + let iter = partition .inner .tree .prefix_with_seqno(prefix, self.nonce.instant, None) - .map(|item| Ok(item?)) + .map(|item| Ok(item?)); + + crate::iter::Iter::new(self.nonce.clone(), iter) } }