Skip to content

Commit

Permalink
closes #88
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Oct 23, 2024
1 parent 0b49c44 commit 61cac02
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 11 deletions.
35 changes: 35 additions & 0 deletions src/iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::snapshot_nonce::SnapshotNonce;

/// A wrapper around iterators to hold a snapshot moment
///
/// We need to hold the snapshot nonce so the GC watermark does not
/// move past this snapshot moment, removing data that may still be read.
///
/// This may not be strictly needed because an iterator holds a read lock to a memtable anyway
/// but for correctness it's probably better.
pub struct Iter<T, I: DoubleEndedIterator<Item = crate::Result<T>>> {
iter: I,

#[allow(unused)]
nonce: SnapshotNonce,
}

impl<T, I: DoubleEndedIterator<Item = crate::Result<T>>> Iter<T, I> {
pub fn new(nonce: SnapshotNonce, iter: I) -> Self {
Self { iter, nonce }
}
}

impl<T, I: DoubleEndedIterator<Item = crate::Result<T>>> Iterator for Iter<T, I> {
type Item = crate::Result<T>;

fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}

impl<T, I: DoubleEndedIterator<Item = crate::Result<T>>> DoubleEndedIterator for Iter<T, I> {
fn next_back(&mut self) -> Option<Self::Item> {
self.iter.next_back()
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ mod error;
mod file;
mod flush;
mod gc;
mod iter;
mod journal;
mod keyspace;
mod monitor;
Expand Down
12 changes: 12 additions & 0 deletions src/snapshot_nonce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ pub struct SnapshotNonce {
tracker: SnapshotTracker,
}

impl Clone for SnapshotNonce {
fn clone(&self) -> Self {
// IMPORTANT: Increment snapshot count in tracker
self.tracker.open(self.instant);

Self {
instant: self.instant,
tracker: self.tracker.clone(),
}
}
}

impl Drop for SnapshotNonce {
fn drop(&mut self) {
self.tracker.close(self.instant);
Expand Down
3 changes: 2 additions & 1 deletion src/tx/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use super::oracle::Oracle;
#[derive(Clone)]
#[allow(clippy::module_name_repetitions)]
pub struct TransactionalKeyspace {
pub(crate) inner: Keyspace,
#[doc(hidden)]
pub inner: Keyspace,

#[cfg(feature = "ssi_tx")]
pub(super) oracle: Arc<Oracle>,
Expand Down
30 changes: 20 additions & 10 deletions src/tx/read_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,13 @@ impl ReadTransaction {
&'a self,
partition: &'a TxPartitionHandle,
) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
partition
let iter = partition
.inner
.tree
.iter_with_seqno(self.nonce.instant, None)
.map(|item| Ok(item?))
.map(|item| Ok(item?));

crate::iter::Iter::new(self.nonce.clone(), iter)
}

/// Iterates over the transaction's state, returning keys only.
Expand All @@ -263,11 +265,13 @@ impl ReadTransaction {
&'a self,
partition: &'a TxPartitionHandle,
) -> impl DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static {
partition
let iter = partition
.inner
.tree
.keys_with_seqno(self.nonce.instant, None)
.map(|item| Ok(item?))
.map(|item| Ok(item?));

crate::iter::Iter::new(self.nonce.clone(), iter)
}

/// Iterates over the transaction's state, returning values only.
Expand All @@ -278,11 +282,13 @@ impl ReadTransaction {
&'a self,
partition: &'a TxPartitionHandle,
) -> impl DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static {
partition
let iter = partition
.inner
.tree
.values_with_seqno(self.nonce.instant, None)
.map(|item| Ok(item?))
.map(|item| Ok(item?));

crate::iter::Iter::new(self.nonce.clone(), iter)
}

/// Iterates over a range of the transaction's state.
Expand Down Expand Up @@ -311,11 +317,13 @@ impl ReadTransaction {
partition: &'a TxPartitionHandle,
range: R,
) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
partition
let iter = partition
.inner
.tree
.range_with_seqno(range, self.nonce.instant, None)
.map(|item| Ok(item?))
.map(|item| Ok(item?));

crate::iter::Iter::new(self.nonce.clone(), iter)
}

/// Iterates over a range of the transaction's state.
Expand Down Expand Up @@ -344,10 +352,12 @@ impl ReadTransaction {
partition: &'a TxPartitionHandle,
prefix: K,
) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
partition
let iter = partition
.inner
.tree
.prefix_with_seqno(prefix, self.nonce.instant, None)
.map(|item| Ok(item?))
.map(|item| Ok(item?));

crate::iter::Iter::new(self.nonce.clone(), iter)
}
}

0 comments on commit 61cac02

Please sign in to comment.