diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 00000000..8bccd51c --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,2 @@ +[profile.default] +slow-timeout = "1m" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5ae32bd2..a0303d64 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -65,8 +65,12 @@ jobs: run: cargo test --features __internal_whitebox -- whitebox_ --test-threads=1 - name: Run tests run: cargo nextest run --features lz4,miniz,single_writer_tx,bloom + - name: Run SSI tests + run: cargo nextest run --no-default-features --features ssi_tx tx_ssi_ - name: Run doc tests run: cargo test --doc + - name: Run SSI doc tests + run: cargo test --no-default-features --features ssi_tx --doc - name: Build & test examples run: node compile_examples.mjs cross: diff --git a/Cargo.toml b/Cargo.toml index 25247475..9a27d0cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ lz4 = ["lsm-tree/lz4"] miniz = ["lsm-tree/miniz"] bloom = ["lsm-tree/bloom"] single_writer_tx = [] +ssi_tx = [] __internal_whitebox = [] [dependencies] @@ -42,6 +43,7 @@ rand = "0.8.5" [package.metadata.cargo-all-features] denylist = ["__internal_whitebox"] +skip_feature_sets = [["ssi_tx", "single_writer_tx"]] [[bench]] name = "lsmt" diff --git a/README.md b/README.md index ca8373f2..3ae06c0e 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Fjall is an LSM-based embeddable key-value storage engine written in Rust. It fe - Automatic background maintenance - Partitions (a.k.a. column families) with cross-partition atomic semantics - Built-in compression (default = LZ4) -- Single-writer, multi-reader transactions (optional) +- Serializable transactions (optional) - Key-value separation for large blob use cases (optional) Each `Keyspace` is a single logical database and is split into `partitions` (a.k.a. column families) - you should probably only use a single keyspace for your application. Each partition is physically a single LSM-tree and its own logical collection (a persistent, sorted map); however, write operations across partitions are atomic as they are persisted in a single keyspace-level journal, which will be recovered on restart. @@ -117,6 +117,13 @@ Allows opening a transactional Keyspace for single-writer (serialized) transacti *Enabled by default.* +### ssi_tx + +Allows opening a transactional Keyspace for multi-writer, serializable transactions, allowing RYOW (read-your-own-write), fetch-and-update and other atomic operations. +Conflict checking is done using optimistic concurrency control. + +*Disabled by default.* + ## Stable disk format The disk format is stable as of 1.0.0. diff --git a/examples/tx-atomic-counter/src/main.rs b/examples/tx-atomic-counter/src/main.rs index a4cb9fd7..faeea40b 100644 --- a/examples/tx-atomic-counter/src/main.rs +++ b/examples/tx-atomic-counter/src/main.rs @@ -38,7 +38,6 @@ fn main() -> fjall::Result<()> { write_tx.insert(&counters, "c1", next.to_be_bytes()); write_tx.commit()?; - keyspace.persist(PersistMode::Buffer)?; println!("worker {idx} incremented to {next}"); diff --git a/examples/tx-mpmc-queue/src/main.rs b/examples/tx-mpmc-queue/src/main.rs index 04744332..a3397ffa 100644 --- a/examples/tx-mpmc-queue/src/main.rs +++ b/examples/tx-mpmc-queue/src/main.rs @@ -32,7 +32,6 @@ fn main() -> fjall::Result<()> { let task_id = scru128::new_string(); tasks.insert(&task_id, &task_id)?; - keyspace.persist(PersistMode::Buffer)?; println!("producer {idx} created task {task_id}"); @@ -67,7 +66,6 @@ fn main() -> fjall::Result<()> { tx.remove(&tasks, &key); tx.commit()?; - keyspace.persist(PersistMode::Buffer)?; let task_id = std::str::from_utf8(&key).unwrap(); println!("consumer {idx} completed task {task_id}"); diff --git a/examples/tx-partition-move/src/main.rs b/examples/tx-partition-move/src/main.rs index 826d8a8b..3d80d494 100644 --- a/examples/tx-partition-move/src/main.rs +++ b/examples/tx-partition-move/src/main.rs @@ -37,7 +37,6 @@ fn main() -> fjall::Result<()> { tx.insert(&dst, &key, &value); tx.commit()?; - keyspace.persist(PersistMode::Buffer)?; let task_id = std::str::from_utf8(&key).unwrap(); println!("consumer {idx} moved {task_id}"); diff --git a/examples/tx-ssi-atomic-counter/.run b/examples/tx-ssi-atomic-counter/.run new file mode 100644 index 00000000..e69de29b diff --git a/examples/tx-ssi-atomic-counter/Cargo.toml b/examples/tx-ssi-atomic-counter/Cargo.toml new file mode 100644 index 00000000..9aa1e79a --- /dev/null +++ b/examples/tx-ssi-atomic-counter/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "tx-atomic-counter" +version = "0.0.1" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fjall = { path = "../../", default-features = false, features = [ + "bloom", + "lz4", + "ssi_tx", +] } +rand = "0.8.5" diff --git a/examples/tx-ssi-atomic-counter/README.md b/examples/tx-ssi-atomic-counter/README.md new file mode 100644 index 00000000..6e3b3e93 --- /dev/null +++ b/examples/tx-ssi-atomic-counter/README.md @@ -0,0 +1,3 @@ +# tx-ssi-atomic-counter + +This example demonstrates using transactions for atomic updates. diff --git a/examples/tx-ssi-atomic-counter/src/main.rs b/examples/tx-ssi-atomic-counter/src/main.rs new file mode 100644 index 00000000..7040176c --- /dev/null +++ b/examples/tx-ssi-atomic-counter/src/main.rs @@ -0,0 +1,58 @@ +use fjall::{Config, PersistMode}; +use std::path::Path; + +const LIMIT: u64 = 100; + +fn main() -> fjall::Result<()> { + let path = Path::new(".fjall_data"); + + let keyspace = Config::new(path).temporary(true).open_transactional()?; + let counters = keyspace.open_partition("counters", Default::default())?; + + counters.insert("c1", 0_u64.to_be_bytes())?; + + let workers = (0_u8..4) + .map(|idx| { + let keyspace = keyspace.clone(); + let counters = counters.clone(); + + std::thread::spawn(move || { + use rand::Rng; + + let mut rng = rand::thread_rng(); + + loop { + let mut write_tx = keyspace.write_tx().unwrap(); + + let item = write_tx.get(&counters, "c1")?.unwrap(); + + let mut bytes = [0; 8]; + bytes.copy_from_slice(&item); + let prev = u64::from_be_bytes(bytes); + + if prev >= LIMIT { + return Ok::<_, fjall::Error>(()); + } + + let next = prev + 1; + + write_tx.insert(&counters, "c1", next.to_be_bytes()); + write_tx.commit()?.ok(); + + println!("worker {idx} incremented to {next}"); + + let ms = rng.gen_range(10..400); + std::thread::sleep(std::time::Duration::from_millis(ms)); + } + }) + }) + .collect::>(); + + for worker in workers { + worker.join().unwrap()?; + } + + assert_eq!(&*counters.get("c1").unwrap().unwrap(), LIMIT.to_be_bytes()); + + Ok(()) +} diff --git a/examples/tx-ssi-cc/.run b/examples/tx-ssi-cc/.run new file mode 100644 index 00000000..e69de29b diff --git a/examples/tx-ssi-cc/Cargo.toml b/examples/tx-ssi-cc/Cargo.toml new file mode 100644 index 00000000..4fc985de --- /dev/null +++ b/examples/tx-ssi-cc/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "tx-ssi-cc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fjall = { path = "../../", default-features = false, features = [ + "bloom", + "lz4", + "ssi_tx", +] } diff --git a/examples/tx-ssi-cc/README.md b/examples/tx-ssi-cc/README.md new file mode 100644 index 00000000..932c461f --- /dev/null +++ b/examples/tx-ssi-cc/README.md @@ -0,0 +1,3 @@ +# tx-ssi-cc + +This example demonstrates concurrent transactions using SSI (serializable snapshot isolation). diff --git a/examples/tx-ssi-cc/src/main.rs b/examples/tx-ssi-cc/src/main.rs new file mode 100644 index 00000000..00e88b6b --- /dev/null +++ b/examples/tx-ssi-cc/src/main.rs @@ -0,0 +1,53 @@ +use std::time::{Duration, Instant}; + +fn main() -> fjall::Result<()> { + let keyspace = fjall::Config::default() + .temporary(true) + .open_transactional()?; + let items = keyspace.open_partition("items", Default::default())?; + + let start = Instant::now(); + + let t1 = { + let keyspace = keyspace.clone(); + let items = items.clone(); + + std::thread::spawn(move || { + let mut wtx = keyspace.write_tx().unwrap(); + println!("Started tx1"); + std::thread::sleep(Duration::from_secs(3)); + wtx.insert(&items, "a", "a"); + wtx.commit() + }) + }; + + let t2 = { + let keyspace = keyspace.clone(); + let items = items.clone(); + + std::thread::spawn(move || { + let mut wtx = keyspace.write_tx().unwrap(); + println!("Started tx2"); + std::thread::sleep(Duration::from_secs(3)); + wtx.insert(&items, "b", "b"); + wtx.commit() + }) + }; + + t1.join() + .expect("should join")? + .expect("tx should not fail"); + + t2.join() + .expect("should join")? + .expect("tx should not fail"); + + // NOTE: We would expect a single writer tx implementation to finish in + // ~6 seconds + println!("Done in {:?}, items.len={}", start.elapsed(), { + let rtx = keyspace.read_tx(); + rtx.len(&items)? + }); + + Ok(()) +} diff --git a/examples/tx-ssi-mpmc-queue/.run b/examples/tx-ssi-mpmc-queue/.run new file mode 100644 index 00000000..e69de29b diff --git a/examples/tx-ssi-mpmc-queue/Cargo.toml b/examples/tx-ssi-mpmc-queue/Cargo.toml new file mode 100644 index 00000000..33dfd4c8 --- /dev/null +++ b/examples/tx-ssi-mpmc-queue/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tx-mpmc-queue" +version = "0.0.1" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fjall = { path = "../../", default-features = false, features = [ + "bloom", + "lz4", + "ssi_tx", +] } +rand = "0.8.5" +scru128 = "3.0.2" diff --git a/examples/tx-ssi-mpmc-queue/README.md b/examples/tx-ssi-mpmc-queue/README.md new file mode 100644 index 00000000..689d3c0b --- /dev/null +++ b/examples/tx-ssi-mpmc-queue/README.md @@ -0,0 +1,3 @@ +# tx-ssi-mpmc-queue + +This example demonstrates implementing a FIFO-MPMC queue using transactions. diff --git a/examples/tx-ssi-mpmc-queue/src/main.rs b/examples/tx-ssi-mpmc-queue/src/main.rs new file mode 100644 index 00000000..8a9e3541 --- /dev/null +++ b/examples/tx-ssi-mpmc-queue/src/main.rs @@ -0,0 +1,98 @@ +use fjall::{Config, PersistMode}; +use std::path::Path; +use std::sync::{ + atomic::{AtomicUsize, Ordering::Relaxed}, + Arc, +}; + +const PRODUCER_COUNT: usize = 4; +const PRODUCING_COUNT: usize = 100; + +const EXPECTED_COUNT: usize = PRODUCER_COUNT * PRODUCING_COUNT; + +fn main() -> fjall::Result<()> { + let path = Path::new(".fjall_data"); + + let keyspace = Config::new(path).temporary(true).open_transactional()?; + let tasks = keyspace.open_partition("tasks", Default::default())?; + + let counter = Arc::new(AtomicUsize::default()); + + let producers = (0..PRODUCER_COUNT) + .map(|idx| { + let keyspace = keyspace.clone(); + let tasks = tasks.clone(); + + std::thread::spawn(move || { + use rand::Rng; + + let mut rng = rand::thread_rng(); + + for _ in 0..PRODUCING_COUNT { + let task_id = scru128::new_string(); + + tasks.insert(&task_id, &task_id)?; + + println!("producer {idx} created task {task_id}"); + + let ms = rng.gen_range(10..100); + std::thread::sleep(std::time::Duration::from_millis(ms)); + } + + println!("producer {idx} done"); + + Ok::<_, fjall::Error>(()) + }) + }) + .collect::>(); + + let consumers = (0..4) + .map(|idx| { + let keyspace = keyspace.clone(); + let tasks = tasks.clone(); + let counter = counter.clone(); + + std::thread::spawn(move || { + use rand::Rng; + + let mut rng = rand::thread_rng(); + + loop { + let mut tx = keyspace.write_tx().unwrap(); + + // TODO: NOTE: + // Tombstones will add up over time, making first KV slower + // Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete + // would be good for this type of workload + if let Some((key, _)) = tx.first_key_value(&tasks)? { + tx.remove(&tasks, &key); + + if tx.commit()?.is_ok() { + counter.fetch_add(1, Relaxed); + } + + let task_id = std::str::from_utf8(&key).unwrap(); + println!("consumer {idx} completed task {task_id}"); + + let ms = rng.gen_range(50..200); + std::thread::sleep(std::time::Duration::from_millis(ms)); + } else if counter.load(Relaxed) == EXPECTED_COUNT { + return Ok::<_, fjall::Error>(()); + } + } + }) + }) + .collect::>(); + + for t in producers { + t.join().unwrap()?; + } + + for t in consumers { + t.join().unwrap()?; + } + + assert_eq!(EXPECTED_COUNT, counter.load(Relaxed)); + + Ok(()) +} diff --git a/examples/tx-ssi-partition-move/.run b/examples/tx-ssi-partition-move/.run new file mode 100644 index 00000000..e69de29b diff --git a/examples/tx-ssi-partition-move/Cargo.toml b/examples/tx-ssi-partition-move/Cargo.toml new file mode 100644 index 00000000..b190bb14 --- /dev/null +++ b/examples/tx-ssi-partition-move/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tx-partition-move" +version = "0.0.1" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fjall = { path = "../../", default-features = false, features = [ + "bloom", + "lz4", + "ssi_tx", +] } +rand = "0.8.5" +scru128 = "3.0.2" diff --git a/examples/tx-ssi-partition-move/README.md b/examples/tx-ssi-partition-move/README.md new file mode 100644 index 00000000..e046d0af --- /dev/null +++ b/examples/tx-ssi-partition-move/README.md @@ -0,0 +1,3 @@ +# tx-ssi-partition-move + +This example demonstrates atomically moving items between partitions using transactions. diff --git a/examples/tx-ssi-partition-move/src/main.rs b/examples/tx-ssi-partition-move/src/main.rs new file mode 100644 index 00000000..358927bf --- /dev/null +++ b/examples/tx-ssi-partition-move/src/main.rs @@ -0,0 +1,62 @@ +use fjall::{Config, PersistMode}; +use std::path::Path; + +const ITEM_COUNT: u64 = 200; + +fn main() -> fjall::Result<()> { + let path = Path::new(".fjall_data"); + + let keyspace = Config::new(path).temporary(true).open_transactional()?; + let src = keyspace.open_partition("src", Default::default())?; + let dst = keyspace.open_partition("dst", Default::default())?; + + for _ in 0..ITEM_COUNT { + src.insert(scru128::new_string(), "")?; + } + + let movers = (0..4) + .map(|idx| { + let keyspace = keyspace.clone(); + let src = src.clone(); + let dst = dst.clone(); + + std::thread::spawn(move || { + use rand::Rng; + + let mut rng = rand::thread_rng(); + + loop { + let mut tx = keyspace.write_tx().unwrap(); + + // TODO: NOTE: + // Tombstones will add up over time, making first KV slower + // Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete + // would be good for this type of workload + if let Some((key, value)) = tx.first_key_value(&src)? { + tx.remove(&src, &key); + tx.insert(&dst, &key, &value); + + tx.commit()?.ok(); + + let task_id = std::str::from_utf8(&key).unwrap(); + println!("consumer {idx} moved {task_id}"); + + let ms = rng.gen_range(10..100); + std::thread::sleep(std::time::Duration::from_millis(ms)); + } else { + return Ok::<_, fjall::Error>(()); + } + } + }) + }) + .collect::>(); + + for t in movers { + t.join().unwrap()?; + } + + assert_eq!(ITEM_COUNT, keyspace.read_tx().len(&dst)? as u64); + assert!(keyspace.read_tx().is_empty(&src)?); + + Ok(()) +} diff --git a/src/config.rs b/src/config.rs index a49361f6..3c446f64 100644 --- a/src/config.rs +++ b/src/config.rs @@ -233,7 +233,7 @@ impl Config { /// # Errors /// /// Will return `Err` if an IO error occurs. - #[cfg(feature = "single_writer_tx")] + #[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))] pub fn open_transactional(self) -> crate::Result { crate::TxKeyspace::open(self) } diff --git a/src/error.rs b/src/error.rs index ed65f125..eaf55e02 100644 --- a/src/error.rs +++ b/src/error.rs @@ -26,7 +26,7 @@ pub enum Error { /// Invalid or unparsable data format version InvalidVersion(Option), - /// A previous flush operation failed, indicating a hardware-related failure + /// A previous flush / commit operation failed, indicating a hardware-related failure /// /// Future writes will not be accepted as consistency cannot be guaranteed. /// diff --git a/src/lib.rs b/src/lib.rs index 08663230..c742fabd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,7 +103,7 @@ mod snapshot_nonce; mod snapshot_tracker; mod tracked_snapshot; -#[cfg(feature = "single_writer_tx")] +#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))] mod tx; mod version; @@ -127,7 +127,7 @@ pub use { version::Version, }; -#[cfg(feature = "single_writer_tx")] +#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))] pub use tx::{ keyspace::{TransactionalKeyspace, TxKeyspace}, partition::TransactionalPartitionHandle, @@ -142,15 +142,15 @@ pub type WriteBatch = Batch; pub type Partition = PartitionHandle; /// Alias for [`TransactionalPartitionHandle`] -#[cfg(feature = "single_writer_tx")] +#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))] pub type TxPartition = TransactionalPartitionHandle; /// Alias for [`TransactionalPartitionHandle`] -#[cfg(feature = "single_writer_tx")] +#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))] pub type TxPartitionHandle = TransactionalPartitionHandle; /// Alias for [`TransactionalPartitionHandle`] -#[cfg(feature = "single_writer_tx")] +#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))] pub type TransactionalPartition = TransactionalPartitionHandle; /// A snapshot moment diff --git a/src/snapshot_tracker.rs b/src/snapshot_tracker.rs index 9d8bbfda..41dacff2 100644 --- a/src/snapshot_tracker.rs +++ b/src/snapshot_tracker.rs @@ -35,8 +35,8 @@ impl Default for SnapshotTrackerInner { fn default() -> Self { Self { data: DashMap::default(), + safety_gap: 50, freed_count: AtomicU64::default(), - safety_gap: 100, lowest_freed_instant: RwLock::default(), } } diff --git a/src/tx/conflict_manager.rs b/src/tx/conflict_manager.rs new file mode 100644 index 00000000..c2b1f6c9 --- /dev/null +++ b/src/tx/conflict_manager.rs @@ -0,0 +1,183 @@ +use crate::batch::PartitionKey; +use core::ops::Bound; +use lsm_tree::Slice; +use std::{ + collections::{BTreeMap, BTreeSet}, + ops::RangeBounds, +}; + +#[derive(Clone, Debug)] +enum Read { + Single(Slice), + Range { + start: Bound, + end: Bound, + }, + All, +} + +#[derive(Default, Debug)] +pub struct ConflictManager { + reads: BTreeMap>, + conflict_keys: BTreeMap>, +} + +impl ConflictManager { + fn push_read(&mut self, partition: &PartitionKey, read: Read) { + if let Some(tbl) = self.reads.get_mut(partition) { + tbl.push(read); + } else { + self.reads.entry(partition.clone()).or_default().push(read); + } + } + + pub fn mark_read(&mut self, partition: &PartitionKey, key: &Slice) { + self.push_read(partition, Read::Single(key.clone())); + } + + pub fn mark_conflict(&mut self, partition: &PartitionKey, key: &[u8]) { + if let Some(tbl) = self.conflict_keys.get_mut(partition) { + tbl.insert(key.into()); + } else { + self.conflict_keys + .entry(partition.clone()) + .or_default() + .insert(key.into()); + } + } + + pub fn mark_range(&mut self, partition: &PartitionKey, range: impl RangeBounds) { + let start = match range.start_bound() { + Bound::Included(k) => Bound::Included(k.clone()), + Bound::Excluded(k) => Bound::Excluded(k.clone()), + Bound::Unbounded => Bound::Unbounded, + }; + + let end = match range.end_bound() { + Bound::Included(k) => Bound::Included(k.clone()), + Bound::Excluded(k) => Bound::Excluded(k.clone()), + Bound::Unbounded => Bound::Unbounded, + }; + + let read = if start == Bound::Unbounded && end == Bound::Unbounded { + Read::All + } else { + Read::Range { start, end } + }; + + self.push_read(partition, read); + } + + #[allow(clippy::too_many_lines)] + pub fn has_conflict(&self, other: &Self) -> bool { + if self.reads.is_empty() { + return false; + } + + for (partition, keys) in &self.reads { + if let Some(other_conflict_keys) = other.conflict_keys.get(partition) { + for ro in keys { + match ro { + Read::Single(k) => { + if other_conflict_keys.contains(k) { + return true; + } + } + Read::Range { start, end } => match (start, end) { + (Bound::Included(start), Bound::Included(end)) => { + if other_conflict_keys + .range::(( + Bound::Included(start), + Bound::Included(end), + )) + .next() + .is_some() + { + return true; + } + } + (Bound::Included(start), Bound::Excluded(end)) => { + if other_conflict_keys + .range::(( + Bound::Included(start), + Bound::Excluded(end), + )) + .next() + .is_some() + { + return true; + } + } + (Bound::Included(start), Bound::Unbounded) => { + if other_conflict_keys + .range::((Bound::Included(start), Bound::Unbounded)) + .next() + .is_some() + { + return true; + } + } + (Bound::Excluded(start), Bound::Included(end)) => { + if other_conflict_keys + .range::(( + Bound::Excluded(start), + Bound::Included(end), + )) + .next() + .is_some() + { + return true; + } + } + (Bound::Excluded(start), Bound::Excluded(end)) => { + if other_conflict_keys + .range::(( + Bound::Excluded(start), + Bound::Excluded(end), + )) + .next() + .is_some() + { + return true; + } + } + (Bound::Excluded(start), Bound::Unbounded) => { + if other_conflict_keys + .range::((Bound::Excluded(start), Bound::Unbounded)) + .next() + .is_some() + { + return true; + } + } + (Bound::Unbounded, Bound::Included(end)) => { + let range = ..=end; + for write in other_conflict_keys { + if range.contains(&write) { + return true; + } + } + } + (Bound::Unbounded, Bound::Excluded(end)) => { + let range = ..end; + for write in other_conflict_keys { + if range.contains(&write) { + return true; + } + } + } + (Bound::Unbounded, Bound::Unbounded) => unreachable!(), + }, + Read::All => { + if !other_conflict_keys.is_empty() { + return true; + } + } + } + } + } + } + + false + } +} diff --git a/src/tx/keyspace.rs b/src/tx/keyspace.rs index 5ee141a6..d867ef92 100644 --- a/src/tx/keyspace.rs +++ b/src/tx/keyspace.rs @@ -9,12 +9,20 @@ use crate::{ }; use std::sync::{Arc, Mutex}; -/// Transaction keyspace +#[cfg(feature = "ssi_tx")] +use super::oracle::Oracle; + +/// Transactional keyspace #[derive(Clone)] #[allow(clippy::module_name_repetitions)] pub struct TransactionalKeyspace { - inner: Keyspace, - tx_lock: Arc>, + pub(crate) inner: Keyspace, + + #[cfg(feature = "ssi_tx")] + pub(super) oracle: Arc, + + #[cfg(feature = "single_writer_tx")] + lock: Arc>, } /// Alias for [`TransactionalKeyspace`] @@ -23,17 +31,16 @@ pub type TxKeyspace = TransactionalKeyspace; impl TxKeyspace { /// Starts a new writeable transaction. + #[cfg(feature = "single_writer_tx")] #[must_use] pub fn write_tx(&self) -> WriteTransaction { - let lock = self.tx_lock.lock().expect("lock is poisoned"); - - // IMPORTANT: Get the seqno *after* getting the lock + let guard = self.lock.lock().expect("poisoned tx lock"); let instant = self.inner.instant(); let mut write_tx = WriteTransaction::new( - self.inner.clone(), - lock, + self.clone(), SnapshotNonce::new(instant, self.inner.snapshot_tracker.clone()), + guard, ); if !self.inner.config.manual_journal_persist { @@ -43,6 +50,34 @@ impl TxKeyspace { write_tx } + /// Starts a new writeable transaction. + /// + /// # Errors + /// + /// Will return `Err` if creation failed. + #[cfg(feature = "ssi_tx")] + pub fn write_tx(&self) -> crate::Result { + let instant = { + // acquire a lock here to prevent getting a stale snapshot seqno + // this will drain at least part of the commit queue, but ordering + // is platform-dependent since we use std::sync::Mutex + let _guard = self.oracle.write_serialize_lock()?; + + self.inner.instant() + }; + + let mut write_tx = WriteTransaction::new( + self.clone(), + SnapshotNonce::new(instant, self.inner.snapshot_tracker.clone()), + ); + + if !self.inner.config.manual_journal_persist { + write_tx = write_tx.durability(Some(PersistMode::Buffer)); + } + + Ok(write_tx) + } + /// Starts a new read-only transaction. #[must_use] pub fn read_tx(&self) -> ReadTransaction { @@ -100,7 +135,7 @@ impl TxKeyspace { Ok(TxPartitionHandle { inner: partition, - tx_lock: self.tx_lock.clone(), + keyspace: self.clone(), }) } @@ -159,8 +194,15 @@ impl TxKeyspace { inner.start_background_threads()?; Ok(Self { + #[cfg(feature = "ssi_tx")] + oracle: Arc::new(Oracle { + write_serialize_lock: Mutex::default(), + seqno: inner.seqno.clone(), + snapshot_tracker: inner.snapshot_tracker.clone(), + }), inner, - tx_lock: Arc::default(), + #[cfg(feature = "single_writer_tx")] + lock: Default::default(), }) } } diff --git a/src/tx/mod.rs b/src/tx/mod.rs index ddcf8c6d..967cdd11 100644 --- a/src/tx/mod.rs +++ b/src/tx/mod.rs @@ -10,3 +10,11 @@ pub mod read_tx; #[allow(clippy::module_name_repetitions)] pub mod write_tx; + +#[cfg(feature = "ssi_tx")] +mod conflict_manager; + +#[cfg(feature = "ssi_tx")] +mod oracle; + +pub(crate) mod write; diff --git a/src/tx/oracle.rs b/src/tx/oracle.rs new file mode 100644 index 00000000..2c446b6f --- /dev/null +++ b/src/tx/oracle.rs @@ -0,0 +1,116 @@ +use crate::snapshot_tracker::SnapshotTracker; +use crate::Instant; + +use super::conflict_manager::ConflictManager; +use lsm_tree::SequenceNumberCounter; +use std::collections::BTreeMap; +use std::fmt; +use std::sync::{Mutex, MutexGuard, PoisonError}; + +pub enum CommitOutcome { + Ok, + Aborted(E), + Conflicted, +} + +pub struct Oracle { + pub(super) write_serialize_lock: Mutex>, + pub(super) seqno: SequenceNumberCounter, + pub(super) snapshot_tracker: SnapshotTracker, +} + +impl Oracle { + #[allow(clippy::nursery)] + pub(super) fn with_commit Result<(), E>>( + &self, + instant: Instant, + conflict_checker: ConflictManager, + f: F, + ) -> crate::Result> { + let mut committed_txns = self + .write_serialize_lock + .lock() + .map_err(|_| crate::Error::Poisoned)?; + + // If the committed_txn.ts is less than Instant that implies that the + // committed_txn finished before the current transaction started. + // We don't need to check for conflict in that case. + // This change assumes linearizability. Lack of linearizability could + // cause the read ts of a new txn to be lower than the commit ts of + // a txn before it. + let conflicted = + committed_txns + .range((instant + 1)..) + .any(|(_ts, other_conflict_checker)| { + conflict_checker.has_conflict(other_conflict_checker) + }); + + self.snapshot_tracker.close(instant); + let safe_to_gc = self.snapshot_tracker.get_seqno_safe_to_gc(); + committed_txns.retain(|ts, _| *ts > safe_to_gc); + + if conflicted { + return Ok(CommitOutcome::Conflicted); + } + + if let Err(e) = f() { + return Ok(CommitOutcome::Aborted(e)); + } + + committed_txns.insert(self.seqno.get(), conflict_checker); + + Ok(CommitOutcome::Ok) + } + + pub(super) fn write_serialize_lock( + &self, + ) -> crate::Result>> { + self.write_serialize_lock + .lock() + .map_err(|_| crate::Error::Poisoned) + } +} + +#[cfg(test)] +mod tests { + use crate::{Config, PartitionCreateOptions, TxKeyspace, TxPartitionHandle}; + + #[allow(clippy::unwrap_used)] + #[test] + fn oracle_committed_txns_does_not_leak() -> crate::Result<()> { + let tmpdir = tempfile::tempdir()?; + let ks = Config::new(tmpdir.path()).open_transactional()?; + + let part = ks.open_partition("foo", PartitionCreateOptions::default())?; + + for _ in 0..250 { + run_tx(&ks, &part).unwrap(); + } + + assert!(dbg!(ks.oracle.write_serialize_lock.lock().unwrap().len()) < 200); + + for _ in 0..200 { + run_tx(&ks, &part).unwrap(); + } + + assert!(dbg!(ks.oracle.write_serialize_lock.lock().unwrap().len()) < 200); + + Ok(()) + } + + fn run_tx(ks: &TxKeyspace, part: &TxPartitionHandle) -> Result<(), Box> { + let mut tx1 = ks.write_tx()?; + let mut tx2 = ks.write_tx()?; + tx1.insert(part, "hello", "world"); + + tx1.commit()??; + assert!(part.contains_key("hello")?); + + _ = tx2.get(part, "hello")?; + + tx2.insert(part, "hello", "world2"); + assert!(tx2.commit()?.is_err()); // intended to conflict + + Ok(()) + } +} diff --git a/src/tx/partition.rs b/src/tx/partition.rs index aca2d265..077a1315 100644 --- a/src/tx/partition.rs +++ b/src/tx/partition.rs @@ -2,18 +2,15 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::{gc::GarbageCollection, PartitionHandle}; +use crate::{gc::GarbageCollection, PartitionHandle, TxKeyspace}; use lsm_tree::{GcReport, UserValue}; -use std::{ - path::PathBuf, - sync::{Arc, Mutex}, -}; +use std::path::PathBuf; /// Access to a partition of a transactional keyspace #[derive(Clone)] pub struct TransactionalPartitionHandle { pub(crate) inner: PartitionHandle, - pub(crate) tx_lock: Arc>, + pub(crate) keyspace: TxKeyspace, } impl GarbageCollection for TransactionalPartitionHandle { @@ -22,12 +19,10 @@ impl GarbageCollection for TransactionalPartitionHandle { } fn gc_with_space_amp_target(&self, factor: f32) -> crate::Result { - let _lock = self.tx_lock.lock().expect("lock is poisoned"); crate::gc::GarbageCollector::with_space_amp_target(self.inner(), factor) } fn gc_with_staleness_threshold(&self, threshold: f32) -> crate::Result { - let _lock = self.tx_lock.lock().expect("lock is poisoned"); crate::gc::GarbageCollector::with_staleness_threshold(self.inner(), threshold) } @@ -78,6 +73,12 @@ impl TransactionalPartitionHandle { /// /// The operation will run wrapped in a transaction. /// + /// # Note + /// + /// The provided closure can be called multiple times as this function + /// automatically retries on conflict. Since this is an `FnMut`, make sure + /// it is idempotent and will not cause side-effects. + /// /// # Examples /// /// ``` @@ -119,23 +120,30 @@ impl TransactionalPartitionHandle { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn fetch_update, F: Fn(Option<&UserValue>) -> Option>( + #[allow(unused_mut)] + pub fn fetch_update, F: FnMut(Option<&UserValue>) -> Option>( &self, key: K, - f: F, + mut f: F, ) -> crate::Result> { - let _lock = self.tx_lock.lock().expect("lock is poisoned"); + #[cfg(feature = "single_writer_tx")] + { + let mut tx = self.keyspace.write_tx(); - let prev = self.inner.get(&key)?; - let updated = f(prev.as_ref()); + let prev = tx.fetch_update(self, key, f)?; + tx.commit()?; - if let Some(value) = updated { - self.inner.insert(&key, value)?; - } else if prev.is_some() { - self.inner.remove(&key)?; + Ok(prev) } - Ok(prev) + #[cfg(feature = "ssi_tx")] + loop { + let mut tx = self.keyspace.write_tx()?; + let prev = tx.fetch_update(self, key.as_ref(), &mut f)?; + if tx.commit()?.is_ok() { + return Ok(prev); + } + } } /// Atomically updates an item and returns the new value. @@ -144,6 +152,12 @@ impl TransactionalPartitionHandle { /// /// The operation will run wrapped in a transaction. /// + /// # Note + /// + /// The provided closure can be called multiple times as this function + /// automatically retries on conflict. Since this is an `FnMut`, make sure + /// it is idempotent and will not cause side-effects. + /// /// # Examples /// /// ``` @@ -185,23 +199,29 @@ impl TransactionalPartitionHandle { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn update_fetch, F: Fn(Option<&UserValue>) -> Option>( + #[allow(unused_mut)] + pub fn update_fetch, F: FnMut(Option<&UserValue>) -> Option>( &self, key: K, - f: F, + mut f: F, ) -> crate::Result> { - let _lock = self.tx_lock.lock().expect("lock is poisoned"); - - let prev = self.inner.get(&key)?; - let updated = f(prev.as_ref()); + #[cfg(feature = "single_writer_tx")] + { + let mut tx = self.keyspace.write_tx(); + let updated = tx.update_fetch(self, key, f)?; + tx.commit()?; - if let Some(value) = &updated { - self.inner.insert(&key, value)?; - } else if prev.is_some() { - self.inner.remove(&key)?; + Ok(updated) } - Ok(updated) + #[cfg(feature = "ssi_tx")] + loop { + let mut tx = self.keyspace.write_tx()?; + let updated = tx.update_fetch(self, key.as_ref(), &mut f)?; + if tx.commit()?.is_ok() { + return Ok(updated); + } + } } /// Inserts a key-value pair into the partition. @@ -232,8 +252,21 @@ impl TransactionalPartitionHandle { /// /// Will return `Err` if an IO error occurs. pub fn insert, V: AsRef<[u8]>>(&self, key: K, value: V) -> crate::Result<()> { - let _lock = self.tx_lock.lock().expect("lock is poisoned"); - self.inner.insert(key, value) + #[cfg(feature = "single_writer_tx")] + { + let mut tx = self.keyspace.write_tx(); + tx.insert(self, key, value); + tx.commit()?; + Ok(()) + } + + #[cfg(feature = "ssi_tx")] + { + let mut tx = self.keyspace.write_tx()?; + tx.insert(self, key.as_ref(), value.as_ref()); + tx.commit()?.expect("blind insert should not conflict ever"); + Ok(()) + } } /// Removes an item from the partition. @@ -264,8 +297,21 @@ impl TransactionalPartitionHandle { /// /// Will return `Err` if an IO error occurs. pub fn remove>(&self, key: K) -> crate::Result<()> { - let _lock = self.tx_lock.lock().expect("lock is poisoned"); - self.inner.remove(key) + #[cfg(feature = "single_writer_tx")] + { + let mut tx = self.keyspace.write_tx(); + tx.remove(self, key); + tx.commit()?; + Ok(()) + } + + #[cfg(feature = "ssi_tx")] + { + let mut tx = self.keyspace.write_tx()?; + tx.remove(self, key.as_ref()); + tx.commit()?.expect("blind remove should not conflict ever"); + Ok(()) + } } /// Retrieves an item from the partition. diff --git a/src/tx/write/mod.rs b/src/tx/write/mod.rs new file mode 100644 index 00000000..d9f88202 --- /dev/null +++ b/src/tx/write/mod.rs @@ -0,0 +1,453 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +#[cfg(feature = "single_writer_tx")] +pub mod single_writer; + +#[cfg(feature = "ssi_tx")] +pub mod ssi; + +use crate::{ + batch::{item::Item, PartitionKey}, + snapshot_nonce::SnapshotNonce, + Batch, HashMap, PersistMode, TxKeyspace, TxPartitionHandle, +}; +use lsm_tree::{AbstractTree, InternalValue, KvPair, Memtable, SeqNo, UserKey, UserValue}; +use std::{ops::RangeBounds, sync::Arc}; + +fn ignore_tombstone_value(item: InternalValue) -> Option { + if item.is_tombstone() { + None + } else { + Some(item) + } +} + +/// A single-writer (serialized) cross-partition transaction +/// +/// Use [`WriteTransaction::commit`] to commit changes to the keyspace. +/// +/// Drop the transaction to rollback changes. +pub(super) struct BaseTransaction { + durability: Option, + pub(super) keyspace: TxKeyspace, + memtables: HashMap>, + + nonce: SnapshotNonce, +} + +impl BaseTransaction { + pub(crate) fn new(keyspace: TxKeyspace, nonce: SnapshotNonce) -> Self { + Self { + keyspace, + memtables: HashMap::default(), + nonce, + durability: None, + } + } + + /// Sets the durability level. + #[must_use] + pub(super) fn durability(mut self, mode: Option) -> Self { + self.durability = mode; + self + } + + /// Removes an item and returns its value if it existed. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn take>( + &mut self, + partition: &TxPartitionHandle, + key: K, + ) -> crate::Result> { + self.fetch_update(partition, key, |_| None) + } + + /// Atomically updates an item and returns the new value. + /// + /// Returning `None` removes the item if it existed before. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn update_fetch< + K: AsRef<[u8]>, + F: FnMut(Option<&UserValue>) -> Option, + >( + &mut self, + partition: &TxPartitionHandle, + key: K, + mut f: F, + ) -> crate::Result> { + let prev = self.get(partition, &key)?; + let updated = f(prev.as_ref()); + + if let Some(value) = &updated { + // NOTE: Skip insert if the value hasn't changed + if updated != prev { + self.insert(partition, &key, value); + } + } else if prev.is_some() { + self.remove(partition, &key); + } + + Ok(updated) + } + + /// Atomically updates an item and returns the previous value. + /// + /// Returning `None` removes the item if it existed before. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn fetch_update< + K: AsRef<[u8]>, + F: FnMut(Option<&UserValue>) -> Option, + >( + &mut self, + partition: &TxPartitionHandle, + key: K, + mut f: F, + ) -> crate::Result> { + let prev = self.get(partition, &key)?; + let updated = f(prev.as_ref()); + + if let Some(value) = &updated { + // NOTE: Skip insert if the value hasn't changed + if updated != prev { + self.insert(partition, &key, value); + } + } else if prev.is_some() { + self.remove(partition, &key); + } + + Ok(prev) + } + + /// Retrieves an item from the transaction's state. + /// + /// The transaction allows reading your own writes (RYOW). + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn get>( + &self, + partition: &TxPartitionHandle, + key: K, + ) -> crate::Result> { + if let Some(memtable) = self.memtables.get(&partition.inner.name) { + if let Some(item) = memtable.get(&key, None) { + return Ok(ignore_tombstone_value(item).map(|x| x.value)); + } + } + + let res = partition + .inner + .snapshot_at(self.nonce.instant) + .get(key.as_ref())?; + + Ok(res) + } + + /// Returns `true` if the transaction's state contains the specified key. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn contains_key>( + &self, + partition: &TxPartitionHandle, + key: K, + ) -> crate::Result { + if let Some(memtable) = self.memtables.get(&partition.inner.name) { + if let Some(item) = memtable.get(&key, None) { + return Ok(!item.key.is_tombstone()); + } + } + + let contains = partition + .inner + .snapshot_at(self.nonce.instant) + .contains_key(key.as_ref())?; + + Ok(contains) + } + + /// Returns the first key-value pair in the transaction's state. + /// The key in this pair is the minimum key in the transaction's state. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn first_key_value( + &self, + partition: &TxPartitionHandle, + ) -> crate::Result> { + // TODO: calling .iter will mark the partition as fully read, is that what we want? + self.iter(partition).next().transpose() + } + + /// Returns the last key-value pair in the transaction's state. + /// The key in this pair is the maximum key in the transaction's state. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn last_key_value( + &self, + partition: &TxPartitionHandle, + ) -> crate::Result> { + // TODO: calling .iter will mark the partition as fully read, is that what we want? + self.iter(partition).next_back().transpose() + } + + /// Scans the entire partition, returning the amount of items. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn len(&self, partition: &TxPartitionHandle) -> crate::Result { + let mut count = 0; + + // TODO: calling .iter will mark the partition as fully read, is that what we want? + let iter = self.iter(partition); + + for kv in iter { + let _ = kv?; + count += 1; + } + + Ok(count) + } + + /// Iterates over the transaction's state. + /// + /// Avoid using this function, or limit it as otherwise it may scan a lot of items. + #[must_use] + pub(super) fn iter( + &self, + partition: &TxPartitionHandle, + ) -> impl DoubleEndedIterator> + 'static { + partition + .inner + .tree + .iter_with_seqno( + self.nonce.instant, + self.memtables.get(&partition.inner.name).cloned(), + ) + .map(|item| item.map_err(Into::into)) + } + + /// Iterates over the transaction's state, returning keys only. + /// + /// Avoid using this function, or limit it as otherwise it may scan a lot of items. + #[must_use] + pub(super) fn keys( + &self, + partition: &TxPartitionHandle, + ) -> impl DoubleEndedIterator> + 'static { + partition + .inner + .tree + .keys_with_seqno(self.nonce.instant, None) + .map(|item| item.map_err(Into::into)) + } + + /// Iterates over the transaction's state, returning values only. + /// + /// Avoid using this function, or limit it as otherwise it may scan a lot of items. + #[must_use] + pub(super) fn values( + &self, + partition: &TxPartitionHandle, + ) -> impl DoubleEndedIterator> + 'static { + partition + .inner + .tree + .values_with_seqno(self.nonce.instant, None) + .map(|item| item.map_err(Into::into)) + } + + /// Iterates over a range of the transaction's state. + /// + /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited). + #[must_use] + pub(super) fn range<'b, K: AsRef<[u8]> + 'b, R: RangeBounds + 'b>( + &'b self, + partition: &'b TxPartitionHandle, + range: R, + ) -> impl DoubleEndedIterator> + 'static { + partition + .inner + .tree + .range_with_seqno( + range, + self.nonce.instant, + self.memtables.get(&partition.inner.name).cloned(), + ) + .map(|item| item.map_err(Into::into)) + } + + /// Iterates over a range of the transaction's state. + /// + /// Avoid using an empty prefix as it may scan a lot of items (unless limited). + #[must_use] + pub(super) fn prefix<'b, K: AsRef<[u8]> + 'b>( + &'b self, + partition: &'b TxPartitionHandle, + prefix: K, + ) -> impl DoubleEndedIterator> + 'static { + partition + .inner + .tree + .prefix_with_seqno( + prefix, + self.nonce.instant, + self.memtables.get(&partition.inner.name).cloned(), + ) + .map(|item| item.map_err(Into::into)) + } + + /// Inserts a key-value pair into the partition. + /// + /// Keys may be up to 65536 bytes long, values up to 2^32 bytes. + /// Shorter keys and values result in better performance. + /// + /// If the key already exists, the item will be overwritten. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn insert, V: AsRef<[u8]>>( + &mut self, + partition: &TxPartitionHandle, + key: K, + value: V, + ) { + // TODO: PERF: slow?? + self.memtables + .entry(partition.inner.name.clone()) + .or_default() + .insert(lsm_tree::InternalValue::from_components( + key.as_ref(), + value.as_ref(), + // NOTE: Just take the max seqno, which should never be reached + // that way, the write is definitely always the newest + SeqNo::MAX, + lsm_tree::ValueType::Value, + )); + } + + /// Removes an item from the partition. + /// + /// The key may be up to 65536 bytes long. + /// Shorter keys result in better performance. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn remove>(&mut self, partition: &TxPartitionHandle, key: K) { + // TODO: PERF: slow?? + self.memtables + .entry(partition.inner.name.clone()) + .or_default() + .insert(lsm_tree::InternalValue::new_tombstone( + key.as_ref(), + // NOTE: Just take the max seqno, which should never be reached + // that way, the write is definitely always the newest + SeqNo::MAX, + )); + } + + /// Commits the transaction. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub(super) fn commit(self) -> crate::Result<()> { + // skip all the logic if no keys were written to + if self.memtables.is_empty() { + return Ok(()); + } + + let mut batch = Batch::new(self.keyspace.inner).durability(self.durability); + + for (partition_key, memtable) in self.memtables { + for item in memtable.iter() { + batch.data.push(Item::new( + partition_key.clone(), + item.key.user_key.clone(), + item.value.clone(), + item.key.value_type, + )); + } + } + + // TODO: instead of using batch, write batch::commit as a generic function that takes + // a impl Iterator + // that way, we don't have to move the memtable(s) into the batch first to commit + batch.commit()?; + + Ok(()) + } + + /// More explicit alternative to dropping the transaction + /// to roll it back. + #[allow(clippy::unused_self)] + pub(super) fn rollback(self) {} +} + +#[cfg(test)] +mod tests { + use crate::{ + snapshot_nonce::SnapshotNonce, Config, PartitionCreateOptions, + TransactionalPartitionHandle, TxKeyspace, + }; + use tempfile::TempDir; + + struct TestEnv { + ks: TxKeyspace, + part: TransactionalPartitionHandle, + + #[allow(unused)] + tmpdir: TempDir, + } + + fn setup() -> Result> { + let tmpdir = tempfile::tempdir()?; + let ks = Config::new(tmpdir.path()).open_transactional()?; + + let part = ks.open_partition("foo", PartitionCreateOptions::default())?; + + Ok(TestEnv { ks, part, tmpdir }) + } + + #[test] + fn update_fetch() -> Result<(), Box> { + let env = setup()?; + + env.part.insert([2u8], [20u8])?; + + let mut tx = super::BaseTransaction::new( + env.ks.clone(), + SnapshotNonce::new( + env.ks.inner.instant(), + env.ks.inner.snapshot_tracker.clone(), + ), + ); + + let new = tx.update_fetch(&env.part, [2u8], |v| { + v.and_then(|v| v.first().copied()).map(|v| [v + 5].into()) + })?; + assert_eq!(new, Some([25u8].into())); + tx.commit()?; + + Ok(()) + } +} diff --git a/src/tx/write/single_writer.rs b/src/tx/write/single_writer.rs new file mode 100644 index 00000000..bac3ecec --- /dev/null +++ b/src/tx/write/single_writer.rs @@ -0,0 +1,578 @@ +use super::BaseTransaction as InnerWriteTransaction; +use crate::{snapshot_nonce::SnapshotNonce, PersistMode, TxKeyspace, TxPartitionHandle}; +use lsm_tree::{KvPair, UserKey, UserValue}; +use std::{ops::RangeBounds, sync::MutexGuard}; + +/// A single-writer (serialized) cross-partition transaction +/// +/// Use [`WriteTransaction::commit`] to commit changes to the partition(s). +/// +/// Drop the transaction to rollback changes. +pub struct WriteTransaction<'a> { + _guard: MutexGuard<'a, ()>, + inner: InnerWriteTransaction, +} + +impl<'a> WriteTransaction<'a> { + pub(crate) fn new( + keyspace: TxKeyspace, + nonce: SnapshotNonce, + guard: MutexGuard<'a, ()>, + ) -> Self { + Self { + _guard: guard, + inner: InnerWriteTransaction::new(keyspace, nonce), + } + } + + /// Sets the durability level. + #[must_use] + pub fn durability(mut self, mode: Option) -> Self { + self.inner = self.inner.durability(mode); + self + } + + /// Removes an item and returns its value if it existed. + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # use std::sync::Arc; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "abc")?; + /// + /// let mut tx = keyspace.write_tx(); + /// + /// let taken = tx.take(&partition, "a")?.unwrap(); + /// assert_eq!(b"abc", &*taken); + /// tx.commit()?; + /// + /// let item = partition.get("a")?; + /// assert!(item.is_none()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn take>( + &mut self, + partition: &TxPartitionHandle, + key: K, + ) -> crate::Result> { + self.inner.take(partition, key) + } + + /// Atomically updates an item and returns the new value. + /// + /// Returning `None` removes the item if it existed before. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions, Slice}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "abc")?; + /// + /// let mut tx = keyspace.write_tx(); + /// + /// let updated = tx.update_fetch(&partition, "a", |_| Some(Slice::from(*b"def")))?.unwrap(); + /// assert_eq!(b"def", &*updated); + /// tx.commit()?; + /// + /// let item = partition.get("a")?; + /// assert_eq!(Some("def".as_bytes().into()), item); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # use std::sync::Arc; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "abc")?; + /// + /// let mut tx = keyspace.write_tx(); + /// + /// let updated = tx.update_fetch(&partition, "a", |_| None)?; + /// assert!(updated.is_none()); + /// tx.commit()?; + /// + /// let item = partition.get("a")?; + /// assert!(item.is_none()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn update_fetch, F: FnMut(Option<&UserValue>) -> Option>( + &mut self, + partition: &TxPartitionHandle, + key: K, + f: F, + ) -> crate::Result> { + self.inner.update_fetch(partition, key, f) + } + + /// Atomically updates an item and returns the previous value. + /// + /// Returning `None` removes the item if it existed before. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions, Slice}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "abc")?; + /// + /// let mut tx = keyspace.write_tx(); + /// + /// let prev = tx.fetch_update(&partition, "a", |_| Some(Slice::from(*b"def")))?.unwrap(); + /// assert_eq!(b"abc", &*prev); + /// tx.commit()?; + /// + /// let item = partition.get("a")?; + /// assert_eq!(Some("def".as_bytes().into()), item); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # use std::sync::Arc; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "abc")?; + /// + /// let mut tx = keyspace.write_tx(); + /// + /// let prev = tx.fetch_update(&partition, "a", |_| None)?.unwrap(); + /// assert_eq!(b"abc", &*prev); + /// tx.commit()?; + /// + /// let item = partition.get("a")?; + /// assert!(item.is_none()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn fetch_update, F: FnMut(Option<&UserValue>) -> Option>( + &mut self, + partition: &TxPartitionHandle, + key: K, + f: F, + ) -> crate::Result> { + self.inner.fetch_update(partition, key, f) + } + + /// Retrieves an item from the transaction's state. + /// + /// The transaction allows reading your own writes (RYOW). + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "previous_value")?; + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// + /// let mut tx = keyspace.write_tx(); + /// tx.insert(&partition, "a", "new_value"); + /// + /// // Read-your-own-write + /// let item = tx.get(&partition, "a")?; + /// assert_eq!(Some("new_value".as_bytes().into()), item); + /// + /// drop(tx); + /// + /// // Write was not committed + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn get>( + &self, + partition: &TxPartitionHandle, + key: K, + ) -> crate::Result> { + self.inner.get(partition, key) + } + + /// Returns `true` if the transaction's state contains the specified key. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "my_value")?; + /// assert!(keyspace.read_tx().contains_key(&partition, "a")?); + /// + /// let mut tx = keyspace.write_tx(); + /// assert!(tx.contains_key(&partition, "a")?); + /// + /// tx.insert(&partition, "b", "my_value2"); + /// assert!(tx.contains_key(&partition, "b")?); + /// + /// // Transaction not committed yet + /// assert!(!keyspace.read_tx().contains_key(&partition, "b")?); + /// + /// tx.commit()?; + /// assert!(keyspace.read_tx().contains_key(&partition, "b")?); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn contains_key>( + &self, + partition: &TxPartitionHandle, + key: K, + ) -> crate::Result { + self.inner.contains_key(partition, key) + } + + /// Returns the first key-value pair in the transaction's state. + /// The key in this pair is the minimum key in the transaction's state. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// # + /// let mut tx = keyspace.write_tx(); + /// tx.insert(&partition, "1", "abc"); + /// tx.insert(&partition, "3", "abc"); + /// tx.insert(&partition, "5", "abc"); + /// + /// let (key, _) = tx.first_key_value(&partition)?.expect("item should exist"); + /// assert_eq!(&*key, "1".as_bytes()); + /// + /// assert!(keyspace.read_tx().first_key_value(&partition)?.is_none()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn first_key_value(&self, partition: &TxPartitionHandle) -> crate::Result> { + self.inner.first_key_value(partition) + } + + /// Returns the last key-value pair in the transaction's state. + /// The key in this pair is the maximum key in the transaction's state. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// # + /// let mut tx = keyspace.write_tx(); + /// tx.insert(&partition, "1", "abc"); + /// tx.insert(&partition, "3", "abc"); + /// tx.insert(&partition, "5", "abc"); + /// + /// let (key, _) = tx.last_key_value(&partition)?.expect("item should exist"); + /// assert_eq!(&*key, "5".as_bytes()); + /// + /// assert!(keyspace.read_tx().last_key_value(&partition)?.is_none()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn last_key_value(&self, partition: &TxPartitionHandle) -> crate::Result> { + self.inner.last_key_value(partition) + } + + /// Scans the entire partition, returning the amount of items. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "my_value")?; + /// partition.insert("b", "my_value2")?; + /// + /// let mut tx = keyspace.write_tx(); + /// assert_eq!(2, tx.len(&partition)?); + /// + /// tx.insert(&partition, "c", "my_value3"); + /// + /// // read-your-own write + /// assert_eq!(3, tx.len(&partition)?); + /// + /// // Transaction is not committed yet + /// assert_eq!(2, keyspace.read_tx().len(&partition)?); + /// + /// tx.commit()?; + /// assert_eq!(3, keyspace.read_tx().len(&partition)?); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn len(&self, partition: &TxPartitionHandle) -> crate::Result { + self.inner.len(partition) + } + + /// Iterates over the transaction's state. + /// + /// Avoid using this function, or limit it as otherwise it may scan a lot of items. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// # + /// let mut tx = keyspace.write_tx(); + /// tx.insert(&partition, "a", "abc"); + /// tx.insert(&partition, "f", "abc"); + /// tx.insert(&partition, "g", "abc"); + /// + /// assert_eq!(3, tx.iter(&partition).count()); + /// assert_eq!(0, keyspace.read_tx().iter(&partition).count()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + #[must_use] + pub fn iter<'b>( + &'b self, + partition: &TxPartitionHandle, + ) -> impl DoubleEndedIterator> + 'b { + self.inner.iter(partition) + } + + /// Iterates over the transaction's state, returning keys only. + /// + /// Avoid using this function, or limit it as otherwise it may scan a lot of items. + #[must_use] + pub fn keys<'b>( + &'b self, + partition: &TxPartitionHandle, + ) -> impl DoubleEndedIterator> + 'b { + self.inner.keys(partition) + } + + /// Iterates over the transaction's state, returning values only. + /// + /// Avoid using this function, or limit it as otherwise it may scan a lot of items. + #[must_use] + pub fn values<'b>( + &'b self, + partition: &TxPartitionHandle, + ) -> impl DoubleEndedIterator> + 'b { + self.inner.values(partition) + } + + // Iterates over a range of the transaction's state. + /// + /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited). + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// # + /// let mut tx = keyspace.write_tx(); + /// tx.insert(&partition, "a", "abc"); + /// tx.insert(&partition, "f", "abc"); + /// tx.insert(&partition, "g", "abc"); + /// + /// assert_eq!(2, tx.range(&partition, "a"..="f").count()); + /// assert_eq!(0, keyspace.read_tx().range(&partition, "a"..="f").count()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + #[must_use] + pub fn range<'b, K: AsRef<[u8]> + 'b, R: RangeBounds + 'b>( + &'b self, + partition: &'b TxPartitionHandle, + range: R, + ) -> impl DoubleEndedIterator> + 'b { + self.inner.range(partition, range) + } + + /// Iterates over a range of the transaction's state. + /// + /// Avoid using an empty prefix as it may scan a lot of items (unless limited). + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// # + /// let mut tx = keyspace.write_tx(); + /// tx.insert(&partition, "a", "abc"); + /// tx.insert(&partition, "ab", "abc"); + /// tx.insert(&partition, "abc", "abc"); + /// + /// assert_eq!(2, tx.prefix(&partition, "ab").count()); + /// assert_eq!(0, keyspace.read_tx().prefix(&partition, "ab").count()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + #[must_use] + pub fn prefix<'b, K: AsRef<[u8]> + 'b>( + &'b self, + partition: &'b TxPartitionHandle, + prefix: K, + ) -> impl DoubleEndedIterator> + 'b { + self.inner.prefix(partition, prefix) + } + + // Inserts a key-value pair into the partition. + /// + /// Keys may be up to 65536 bytes long, values up to 2^32 bytes. + /// Shorter keys and values result in better performance. + /// + /// If the key already exists, the item will be overwritten. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "previous_value")?; + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// + /// let mut tx = keyspace.write_tx(); + /// tx.insert(&partition, "a", "new_value"); + /// + /// drop(tx); + /// + /// // Write was not committed + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn insert, V: AsRef<[u8]>>( + &mut self, + partition: &TxPartitionHandle, + key: K, + value: V, + ) { + self.inner.insert(partition, key, value); + } + + /// Removes an item from the partition. + /// + /// The key may be up to 65536 bytes long. + /// Shorter keys result in better performance. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "previous_value")?; + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// + /// let mut tx = keyspace.write_tx(); + /// tx.remove(&partition, "a"); + /// + /// // Read-your-own-write + /// let item = tx.get(&partition, "a")?; + /// assert_eq!(None, item); + /// + /// drop(tx); + /// + /// // Deletion was not committed + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn remove>(&mut self, partition: &TxPartitionHandle, key: K) { + self.inner.remove(partition, key); + } + + /// Commits the transaction. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn commit(self) -> crate::Result<()> { + self.inner.commit() + } + + /// More explicit alternative to dropping the transaction + /// to roll it back. + pub fn rollback(self) { + self.inner.rollback(); + } +} diff --git a/src/tx/write/ssi.rs b/src/tx/write/ssi.rs new file mode 100644 index 00000000..0b174770 --- /dev/null +++ b/src/tx/write/ssi.rs @@ -0,0 +1,1133 @@ +use super::BaseTransaction; +use crate::{ + snapshot_nonce::SnapshotNonce, + tx::{conflict_manager::ConflictManager, oracle::CommitOutcome}, + PersistMode, TxKeyspace, TxPartitionHandle, +}; +use lsm_tree::{KvPair, Slice, UserKey, UserValue}; +use std::{ + fmt, + ops::{Bound, RangeBounds, RangeFull}, +}; + +#[derive(Debug)] +pub struct Conflict; + +impl std::error::Error for Conflict {} + +impl fmt::Display for Conflict { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "transaction conflict".fmt(f) + } +} + +/// A SSI (Serializable Snapshot Isolation) cross-partition transaction +/// +/// Use [`WriteTransaction::commit`] to commit changes to the partition(s). +/// +/// Drop the transaction to rollback changes. +pub struct WriteTransaction { + inner: BaseTransaction, + cm: ConflictManager, +} + +impl WriteTransaction { + pub(crate) fn new(keyspace: TxKeyspace, nonce: SnapshotNonce) -> Self { + Self { + inner: BaseTransaction::new(keyspace, nonce), + cm: ConflictManager::default(), + } + } + + /// Sets the durability level. + #[must_use] + pub fn durability(mut self, mode: Option) -> Self { + self.inner = self.inner.durability(mode); + self + } + + /// Removes an item and returns its value if it existed. + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # use std::sync::Arc; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "abc")?; + /// + /// let mut tx = keyspace.write_tx()?; + /// + /// let taken = tx.take(&partition, "a")?.unwrap(); + /// assert_eq!(b"abc", &*taken); + /// tx.commit()?; + /// + /// let item = partition.get("a")?; + /// assert!(item.is_none()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn take>( + &mut self, + partition: &TxPartitionHandle, + key: K, + ) -> crate::Result> { + self.fetch_update(partition, key, |_| None) + } + + /// Atomically updates an item and returns the new value. + /// + /// Returning `None` removes the item if it existed before. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions, Slice}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "abc")?; + /// + /// let mut tx = keyspace.write_tx()?; + /// + /// let updated = tx.update_fetch(&partition, "a", |_| Some(Slice::from(*b"def")))?.unwrap(); + /// assert_eq!(b"def", &*updated); + /// tx.commit()?; + /// + /// let item = partition.get("a")?; + /// assert_eq!(Some("def".as_bytes().into()), item); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # use std::sync::Arc; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "abc")?; + /// + /// let mut tx = keyspace.write_tx()?; + /// + /// let updated = tx.update_fetch(&partition, "a", |_| None)?; + /// assert!(updated.is_none()); + /// tx.commit()?; + /// + /// let item = partition.get("a")?; + /// assert!(item.is_none()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn update_fetch, F: FnMut(Option<&UserValue>) -> Option>( + &mut self, + partition: &TxPartitionHandle, + key: K, + f: F, + ) -> crate::Result> { + let key = key.as_ref(); + + let updated = self.inner.update_fetch(partition, key, f)?; + + self.cm.mark_read(&partition.inner.name, &key.into()); + self.cm.mark_conflict(&partition.inner.name, key); + + Ok(updated) + } + + /// Atomically updates an item and returns the previous value. + /// + /// Returning `None` removes the item if it existed before. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions, Slice}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "abc")?; + /// + /// let mut tx = keyspace.write_tx()?; + /// + /// let prev = tx.fetch_update(&partition, "a", |_| Some(Slice::from(*b"def")))?.unwrap(); + /// assert_eq!(b"abc", &*prev); + /// tx.commit()?; + /// + /// let item = partition.get("a")?; + /// assert_eq!(Some("def".as_bytes().into()), item); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # use std::sync::Arc; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "abc")?; + /// + /// let mut tx = keyspace.write_tx()?; + /// + /// let prev = tx.fetch_update(&partition, "a", |_| None)?.unwrap(); + /// assert_eq!(b"abc", &*prev); + /// tx.commit()?; + /// + /// let item = partition.get("a")?; + /// assert!(item.is_none()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn fetch_update, F: FnMut(Option<&UserValue>) -> Option>( + &mut self, + partition: &TxPartitionHandle, + key: K, + f: F, + ) -> crate::Result> { + let key = key.as_ref(); + + let prev = self.inner.fetch_update(partition, key, f)?; + + self.cm.mark_read(&partition.inner.name, &key.into()); + self.cm.mark_conflict(&partition.inner.name, key); + + Ok(prev) + } + + /// Retrieves an item from the transaction's state. + /// + /// The transaction allows reading your own writes (RYOW). + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "previous_value")?; + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// + /// let mut tx = keyspace.write_tx()?; + /// tx.insert(&partition, "a", "new_value"); + /// + /// // Read-your-own-write + /// let item = tx.get(&partition, "a")?; + /// assert_eq!(Some("new_value".as_bytes().into()), item); + /// + /// drop(tx); + /// + /// // Write was not committed + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn get>( + &mut self, + partition: &TxPartitionHandle, + key: K, + ) -> crate::Result> { + let res = self.inner.get(partition, key.as_ref())?; + + self.cm + .mark_read(&partition.inner.name, &key.as_ref().into()); + + Ok(res) + } + + /// Returns `true` if the transaction's state contains the specified key. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "my_value")?; + /// assert!(keyspace.read_tx().contains_key(&partition, "a")?); + /// + /// let mut tx = keyspace.write_tx()?; + /// assert!(tx.contains_key(&partition, "a")?); + /// + /// tx.insert(&partition, "b", "my_value2"); + /// assert!(tx.contains_key(&partition, "b")?); + /// + /// // Transaction not committed yet + /// assert!(!keyspace.read_tx().contains_key(&partition, "b")?); + /// + /// tx.commit()?; + /// assert!(keyspace.read_tx().contains_key(&partition, "b")?); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn contains_key>( + &mut self, + partition: &TxPartitionHandle, + key: K, + ) -> crate::Result { + let contains = self.inner.contains_key(partition, key.as_ref())?; + + self.cm + .mark_read(&partition.inner.name, &key.as_ref().into()); + + Ok(contains) + } + + /// Returns the first key-value pair in the transaction's state. + /// The key in this pair is the minimum key in the transaction's state. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// # + /// let mut tx = keyspace.write_tx()?; + /// tx.insert(&partition, "1", "abc"); + /// tx.insert(&partition, "3", "abc"); + /// tx.insert(&partition, "5", "abc"); + /// + /// let (key, _) = tx.first_key_value(&partition)?.expect("item should exist"); + /// assert_eq!(&*key, "1".as_bytes()); + /// + /// assert!(keyspace.read_tx().first_key_value(&partition)?.is_none()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn first_key_value( + &mut self, + partition: &TxPartitionHandle, + ) -> crate::Result> { + self.iter(partition).next().transpose() + } + + /// Returns the last key-value pair in the transaction's state. + /// The key in this pair is the maximum key in the transaction's state. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// # + /// let mut tx = keyspace.write_tx()?; + /// tx.insert(&partition, "1", "abc"); + /// tx.insert(&partition, "3", "abc"); + /// tx.insert(&partition, "5", "abc"); + /// + /// let (key, _) = tx.last_key_value(&partition)?.expect("item should exist"); + /// assert_eq!(&*key, "5".as_bytes()); + /// + /// assert!(keyspace.read_tx().last_key_value(&partition)?.is_none()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn last_key_value( + &mut self, + partition: &TxPartitionHandle, + ) -> crate::Result> { + self.iter(partition).next_back().transpose() + } + + /// Scans the entire partition, returning the amount of items. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "my_value")?; + /// partition.insert("b", "my_value2")?; + /// + /// let mut tx = keyspace.write_tx()?; + /// assert_eq!(2, tx.len(&partition)?); + /// + /// tx.insert(&partition, "c", "my_value3"); + /// + /// // read-your-own write + /// assert_eq!(3, tx.len(&partition)?); + /// + /// // Transaction is not committed yet + /// assert_eq!(2, keyspace.read_tx().len(&partition)?); + /// + /// tx.commit()?; + /// assert_eq!(3, keyspace.read_tx().len(&partition)?); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn len(&mut self, partition: &TxPartitionHandle) -> crate::Result { + let mut count = 0; + + let iter = self.iter(partition); + + for kv in iter { + let _ = kv?; + count += 1; + } + + Ok(count) + } + + /// Iterates over the transaction's state. + /// + /// Avoid using this function, or limit it as otherwise it may scan a lot of items. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// # + /// let mut tx = keyspace.write_tx()?; + /// tx.insert(&partition, "a", "abc"); + /// tx.insert(&partition, "f", "abc"); + /// tx.insert(&partition, "g", "abc"); + /// + /// assert_eq!(3, tx.iter(&partition).count()); + /// assert_eq!(0, keyspace.read_tx().iter(&partition).count()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + #[must_use] + pub fn iter<'a>( + &'a mut self, + partition: &TxPartitionHandle, + ) -> impl DoubleEndedIterator> + 'a { + self.cm.mark_range(&partition.inner.name, RangeFull); + + self.inner.iter(partition) + } + + /// Iterates over the transaction's state, returning keys only. + /// + /// Avoid using this function, or limit it as otherwise it may scan a lot of items. + #[must_use] + pub fn keys<'a>( + &'a mut self, + partition: &TxPartitionHandle, + ) -> impl DoubleEndedIterator> + 'a { + self.cm.mark_range(&partition.inner.name, RangeFull); + + self.inner.keys(partition) + } + + /// Iterates over the transaction's state, returning values only. + /// + /// Avoid using this function, or limit it as otherwise it may scan a lot of items. + #[must_use] + pub fn values<'a>( + &'a mut self, + partition: &TxPartitionHandle, + ) -> impl DoubleEndedIterator> + 'a { + self.cm.mark_range(&partition.inner.name, RangeFull); + + self.inner.values(partition) + } + + /// Iterates over a range of the transaction's state. + /// + /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited). + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// # + /// let mut tx = keyspace.write_tx()?; + /// tx.insert(&partition, "a", "abc"); + /// tx.insert(&partition, "f", "abc"); + /// tx.insert(&partition, "g", "abc"); + /// + /// assert_eq!(2, tx.range(&partition, "a"..="f").count()); + /// assert_eq!(0, keyspace.read_tx().range(&partition, "a"..="f").count()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + #[must_use] + pub fn range<'b, K: AsRef<[u8]> + 'b, R: RangeBounds + 'b>( + &'b mut self, + partition: &'b TxPartitionHandle, + range: R, + ) -> impl DoubleEndedIterator> + 'b { + // TODO: Bound::map 1.77 + let start: Bound = match range.start_bound() { + Bound::Included(k) => Bound::Included(k.as_ref().into()), + Bound::Excluded(k) => Bound::Excluded(k.as_ref().into()), + Bound::Unbounded => Bound::Unbounded, + }; + // TODO: Bound::map 1.77 + let end: Bound = match range.end_bound() { + Bound::Included(k) => Bound::Included(k.as_ref().into()), + Bound::Excluded(k) => Bound::Excluded(k.as_ref().into()), + Bound::Unbounded => Bound::Unbounded, + }; + self.cm.mark_range(&partition.inner.name, (start, end)); + + self.inner.range(partition, range) + } + + /// Iterates over a range of the transaction's state. + /// + /// Avoid using an empty prefix as it may scan a lot of items (unless limited). + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// # + /// let mut tx = keyspace.write_tx()?; + /// tx.insert(&partition, "a", "abc"); + /// tx.insert(&partition, "ab", "abc"); + /// tx.insert(&partition, "abc", "abc"); + /// + /// assert_eq!(2, tx.prefix(&partition, "ab").count()); + /// assert_eq!(0, keyspace.read_tx().prefix(&partition, "ab").count()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + #[must_use] + pub fn prefix<'b, K: AsRef<[u8]> + 'b>( + &'b mut self, + partition: &'b TxPartitionHandle, + prefix: K, + ) -> impl DoubleEndedIterator> + 'b { + self.range(partition, lsm_tree::range::prefix_to_range(prefix.as_ref())) + } + + /// Inserts a key-value pair into the partition. + /// + /// Keys may be up to 65536 bytes long, values up to 2^32 bytes. + /// Shorter keys and values result in better performance. + /// + /// If the key already exists, the item will be overwritten. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "previous_value")?; + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// + /// let mut tx = keyspace.write_tx()?; + /// tx.insert(&partition, "a", "new_value"); + /// + /// drop(tx); + /// + /// // Write was not committed + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn insert, V: AsRef<[u8]>>( + &mut self, + partition: &TxPartitionHandle, + key: K, + value: V, + ) { + let key = key.as_ref(); + + self.inner.insert(partition, key, value); + self.cm.mark_conflict(&partition.inner.name, key); + } + + /// Removes an item from the partition. + /// + /// The key may be up to 65536 bytes long. + /// Shorter keys result in better performance. + /// + /// # Examples + /// + /// ``` + /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; + /// # + /// # let folder = tempfile::tempdir()?; + /// # let keyspace = Config::new(folder).open_transactional()?; + /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; + /// partition.insert("a", "previous_value")?; + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// + /// let mut tx = keyspace.write_tx()?; + /// tx.remove(&partition, "a"); + /// + /// // Read-your-own-write + /// let item = tx.get(&partition, "a")?; + /// assert_eq!(None, item); + /// + /// drop(tx); + /// + /// // Deletion was not committed + /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); + /// # + /// # Ok::<(), fjall::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn remove>(&mut self, partition: &TxPartitionHandle, key: K) { + let key = key.as_ref(); + + self.inner.remove(partition, key); + self.cm.mark_conflict(&partition.inner.name, key); + } + + /// Commits the transaction. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn commit(self) -> crate::Result> { + // NOTE: We have no write set, so we are basically + // a read-only transaction, so nothing to do here + if self.inner.memtables.is_empty() { + return Ok(Ok(())); + } + + let oracle = self.inner.keyspace.oracle.clone(); + + match oracle.with_commit(self.inner.nonce.instant, self.cm, move || { + self.inner.commit() + })? { + CommitOutcome::Ok => Ok(Ok(())), + CommitOutcome::Aborted(e) => Err(e), + CommitOutcome::Conflicted => Ok(Err(Conflict)), + } + } + + /// More explicit alternative to dropping the transaction + /// to roll it back. + pub fn rollback(self) { + self.inner.rollback(); + } +} + +#[cfg(test)] +mod tests { + use crate::{ + tx::write::ssi::Conflict, Config, GarbageCollection, KvSeparationOptions, + PartitionCreateOptions, TransactionalPartitionHandle, TxKeyspace, + }; + use tempfile::TempDir; + use test_log::test; + + struct TestEnv { + ks: TxKeyspace, + part: TransactionalPartitionHandle, + + #[allow(unused)] + tmpdir: TempDir, + } + + impl TestEnv { + fn seed_hermitage_data(&self) -> crate::Result<()> { + self.part.insert([1u8], [10u8])?; + self.part.insert([2u8], [20u8])?; + Ok(()) + } + } + + fn setup() -> Result> { + let tmpdir = tempfile::tempdir()?; + let ks = Config::new(tmpdir.path()).open_transactional()?; + + let part = ks.open_partition("foo", PartitionCreateOptions::default())?; + + Ok(TestEnv { ks, part, tmpdir }) + } + + // Adapted from https://github.com/al8n/skipdb/issues/10 + #[test] + #[allow(clippy::unwrap_used)] + fn tx_ssi_arthur_1() -> Result<(), Box> { + let env = setup()?; + + let mut tx = env.ks.write_tx()?; + tx.insert(&env.part, "a1", 10u64.to_be_bytes()); + tx.insert(&env.part, "b1", 100u64.to_be_bytes()); + tx.insert(&env.part, "b2", 200u64.to_be_bytes()); + tx.commit()??; + + let mut tx1 = env.ks.write_tx()?; + let val = tx1 + .range(&env.part, "a".."b") + .map(|kv| { + let (_, v) = kv.unwrap(); + + let mut buf = [0u8; 8]; + buf.copy_from_slice(&v); + u64::from_be_bytes(buf) + }) + .sum::(); + tx1.insert(&env.part, "b3", 10u64.to_be_bytes()); + assert_eq!(10, val); + + let mut tx2 = env.ks.write_tx()?; + let val = tx2 + .range(&env.part, "b".."c") + .map(|kv| { + let (_, v) = kv.unwrap(); + + let mut buf = [0u8; 8]; + buf.copy_from_slice(&v); + u64::from_be_bytes(buf) + }) + .sum::(); + tx2.insert(&env.part, "a3", 300u64.to_be_bytes()); + assert_eq!(300, val); + tx2.commit()??; + assert!(matches!(tx1.commit()?, Err(Conflict))); + + let mut tx3 = env.ks.write_tx()?; + let val = tx3 + .iter(&env.part) + .filter_map(|kv| { + let (k, v) = kv.unwrap(); + + if k.starts_with(b"a") { + let mut buf = [0u8; 8]; + buf.copy_from_slice(&v); + Some(u64::from_be_bytes(buf)) + } else { + None + } + }) + .sum::(); + assert_eq!(310, val); + + Ok(()) + } + + // Adapted from https://github.com/al8n/skipdb/issues/10 + #[test] + #[allow(clippy::unwrap_used)] + fn tx_ssi_arthur_2() -> Result<(), Box> { + let env = setup()?; + + let mut tx = env.ks.write_tx()?; + tx.insert(&env.part, "b1", 100u64.to_be_bytes()); + tx.insert(&env.part, "b2", 200u64.to_be_bytes()); + tx.commit()??; + + let mut tx1 = env.ks.write_tx()?; + let val = tx1 + .range(&env.part, "a".."b") + .map(|kv| { + let (_, v) = kv.unwrap(); + + let mut buf = [0u8; 8]; + buf.copy_from_slice(&v); + u64::from_be_bytes(buf) + }) + .sum::(); + tx1.insert(&env.part, "b3", 0u64.to_be_bytes()); + assert_eq!(0, val); + + let mut tx2 = env.ks.write_tx()?; + let val = tx2 + .range(&env.part, "b".."c") + .map(|kv| { + let (_, v) = kv.unwrap(); + + let mut buf = [0u8; 8]; + buf.copy_from_slice(&v); + u64::from_be_bytes(buf) + }) + .sum::(); + tx2.insert(&env.part, "a3", 300u64.to_be_bytes()); + assert_eq!(300, val); + tx2.commit()??; + assert!(matches!(tx1.commit()?, Err(Conflict))); + + let mut tx3 = env.ks.write_tx()?; + let val = tx3 + .iter(&env.part) + .filter_map(|kv| { + let (k, v) = kv.unwrap(); + + if k.starts_with(b"a") { + let mut buf = [0u8; 8]; + buf.copy_from_slice(&v); + Some(u64::from_be_bytes(buf)) + } else { + None + } + }) + .sum::(); + assert_eq!(300, val); + + Ok(()) + } + + #[test] + fn tx_ssi_basic() -> Result<(), Box> { + let env = setup()?; + + let mut tx1 = env.ks.write_tx()?; + let mut tx2 = env.ks.write_tx()?; + + tx1.insert(&env.part, "hello", "world"); + + tx1.commit()??; + assert!(env.part.contains_key("hello")?); + + assert_eq!(tx2.get(&env.part, "hello")?, None); + + tx2.insert(&env.part, "hello", "world2"); + assert!(matches!(tx2.commit()?, Err(Conflict))); + + let mut tx1 = env.ks.write_tx()?; + let mut tx2 = env.ks.write_tx()?; + + tx1.iter(&env.part).next(); + tx2.insert(&env.part, "hello", "world2"); + + tx1.insert(&env.part, "hello2", "world1"); + tx1.commit()??; + + tx2.commit()??; + + Ok(()) + } + + #[test] + #[allow(clippy::unwrap_used)] + fn tx_ssi_ww() -> Result<(), Box> { + // https://en.wikipedia.org/wiki/Write%E2%80%93write_conflict + let env = setup()?; + + let mut tx1 = env.ks.write_tx()?; + let mut tx2 = env.ks.write_tx()?; + + tx1.insert(&env.part, "a", "a"); + tx2.insert(&env.part, "b", "c"); + tx1.insert(&env.part, "b", "b"); + tx1.commit()??; + + tx2.insert(&env.part, "a", "c"); + + tx2.commit()??; + assert_eq!(b"c", &*env.part.get("a")?.unwrap()); + assert_eq!(b"c", &*env.part.get("b")?.unwrap()); + + Ok(()) + } + + #[test] + #[allow(clippy::unwrap_used)] + fn tx_ssi_swap() -> Result<(), Box> { + let env = setup()?; + + env.part.insert("x", "x")?; + env.part.insert("y", "y")?; + + let mut tx1 = env.ks.write_tx()?; + let mut tx2 = env.ks.write_tx()?; + + { + let x = tx1.get(&env.part, "x")?.unwrap(); + tx1.insert(&env.part, "y", x); + } + + { + let y = tx2.get(&env.part, "y")?.unwrap(); + tx2.insert(&env.part, "x", y); + } + + tx1.commit()??; + assert!(matches!(tx2.commit()?, Err(Conflict))); + + Ok(()) + } + + #[test] + fn tx_ssi_write_cycles() -> Result<(), Box> { + let env = setup()?; + env.seed_hermitage_data()?; + + let mut t1 = env.ks.write_tx()?; + let mut t2 = env.ks.write_tx()?; + + t1.insert(&env.part, [1u8], [11u8]); + t2.insert(&env.part, [1u8], [12u8]); + t1.insert(&env.part, [2u8], [21u8]); + t1.commit()??; + + assert_eq!(env.part.get([1u8])?, Some([11u8].into())); + + t2.insert(&env.part, [2u8], [22u8]); + t2.commit()??; + + assert_eq!(env.part.get([1u8])?, Some([12u8].into())); + assert_eq!(env.part.get([2u8])?, Some([22u8].into())); + + Ok(()) + } + + #[test] + fn tx_ssi_aborted_reads() -> Result<(), Box> { + let env = setup()?; + env.seed_hermitage_data()?; + + let mut t1 = env.ks.write_tx()?; + let mut t2 = env.ks.write_tx()?; + + t1.insert(&env.part, [1u8], [101u8]); + + assert_eq!(t2.get(&env.part, [1u8])?, Some([10u8].into())); + + t1.rollback(); + + assert_eq!(t2.get(&env.part, [1u8])?, Some([10u8].into())); + + t2.commit()??; + + Ok(()) + } + + #[allow(clippy::unwrap_used)] + #[test] + fn tx_ssi_anti_dependency_cycles() -> Result<(), Box> { + let env = setup()?; + env.seed_hermitage_data()?; + + let mut t1 = env.ks.write_tx()?; + { + let mut iter = t1.iter(&env.part); + assert_eq!(iter.next().unwrap()?, ([1u8].into(), [10u8].into())); + assert_eq!(iter.next().unwrap()?, ([2u8].into(), [20u8].into())); + assert!(iter.next().is_none()); + } + + let mut t2 = env.ks.write_tx()?; + let new = t2.update_fetch(&env.part, [2u8], |v| { + v.and_then(|v| v.first().copied()).map(|v| [v + 5].into()) + })?; + assert_eq!(new, Some([25u8].into())); + t2.commit()??; + + let mut t3 = env.ks.write_tx()?; + { + let mut iter = t3.iter(&env.part); + assert_eq!(iter.next().unwrap()?, ([1u8].into(), [10u8].into())); + assert_eq!(iter.next().unwrap()?, ([2u8].into(), [25u8].into())); // changed here + assert!(iter.next().is_none()); + } + + t3.commit()??; + + t1.insert(&env.part, [1u8], [0u8]); + + assert!(matches!(t1.commit()?, Err(Conflict))); + + Ok(()) + } + + #[test] + fn tx_ssi_update_fetch_update() -> Result<(), Box> { + let env = setup()?; + + let mut t1 = env.ks.write_tx()?; + let mut t2 = env.ks.write_tx()?; + + let new = t1.update_fetch(&env.part, "hello", |_| Some("world".into()))?; + assert_eq!(new, Some("world".into())); + let old = t2.fetch_update(&env.part, "hello", |_| Some("world2".into()))?; + assert_eq!(old, None); + + t1.commit()??; + assert!(matches!(t2.commit()?, Err(Conflict))); + + assert_eq!(env.part.get("hello")?, Some("world".into())); + + let mut t1 = env.ks.write_tx()?; + let mut t2 = env.ks.write_tx()?; + + let old = t1.fetch_update(&env.part, "hello", |_| Some("world3".into()))?; + assert_eq!(old, Some("world".into())); + let new = t2.update_fetch(&env.part, "hello2", |_| Some("world2".into()))?; + assert_eq!(new, Some("world2".into())); + + t1.commit()??; + t2.commit()??; + + assert_eq!(env.part.get("hello")?, Some("world3".into())); + assert_eq!(env.part.get("hello2")?, Some("world2".into())); + + Ok(()) + } + + #[test] + fn tx_ssi_range() -> Result<(), Box> { + let env = setup()?; + + let mut t1 = env.ks.write_tx()?; + let mut t2 = env.ks.write_tx()?; + + _ = t1.range(&env.part, "h"..="hello"); + t1.insert(&env.part, "foo", "bar"); + + // insert a key INSIDE the range read by t1 + t2.insert(&env.part, "hello", "world"); + + t2.commit()??; + assert!(matches!(t1.commit()?, Err(Conflict))); + + let mut t1 = env.ks.write_tx()?; + let mut t2 = env.ks.write_tx()?; + + _ = t1.range(&env.part, "h"..="hello"); + t1.insert(&env.part, "foo", "bar"); + + // insert a key OUTSIDE the range read by t1 + t2.insert(&env.part, "hello2", "world"); + + t2.commit()??; + t1.commit()??; + + Ok(()) + } + + #[test] + fn tx_ssi_prefix() -> Result<(), Box> { + let env = setup()?; + + let mut t1 = env.ks.write_tx()?; + let mut t2 = env.ks.write_tx()?; + + _ = t1.prefix(&env.part, "hello"); + t1.insert(&env.part, "foo", "bar"); + + // insert a key MATCHING the prefix read by t1 + t2.insert(&env.part, "hello", "world"); + + t2.commit()??; + assert!(matches!(t1.commit()?, Err(Conflict))); + + let mut t1 = env.ks.write_tx()?; + let mut t2 = env.ks.write_tx()?; + + _ = t1.prefix(&env.part, "hello"); + t1.insert(&env.part, "foo", "bar"); + + // insert a key NOT MATCHING the range read by t1 + t2.insert(&env.part, "foobar", "world"); + + t2.commit()??; + t1.commit()??; + + Ok(()) + } + + #[test] + #[allow(clippy::unwrap_used)] + fn tx_ssi_gc_shadowing() -> Result<(), Box> { + let tmpdir = tempfile::tempdir()?; + let ks = Config::new(tmpdir.path()).open_transactional()?; + + let part = ks.open_partition( + "foo", + PartitionCreateOptions::default().with_kv_separation( + KvSeparationOptions::default() + .separation_threshold(/* IMPORTANT: always separate */ 1), + ), + )?; + + part.insert("a", "a")?; + part.inner().rotate_memtable_and_wait()?; // blob file #0 + + part.insert("a", "b")?; + part.inner().rotate_memtable_and_wait()?; // blob file #1 + + // NOTE: a->a is now stale + + let mut tx = ks.write_tx()?; + tx.insert(&part, "a", "tx"); + + log::info!("running GC"); + part.gc_scan()?; + part.gc_with_staleness_threshold(0.0)?; + // NOTE: The GC has now added a new value handle to the memtable + // because a->b was written into blob file #2 + + log::info!("committing tx"); + tx.commit()??; + + // NOTE: We should see the transaction's write + assert_eq!(b"tx", &*part.get("a")?.unwrap()); + + // NOTE: We should still see the transaction's write + part.inner().rotate_memtable_and_wait()?; + assert_eq!(b"tx", &*part.get("a")?.unwrap()); + + Ok(()) + } +} diff --git a/src/tx/write_tx.rs b/src/tx/write_tx.rs index c1880b36..68dca52e 100644 --- a/src/tx/write_tx.rs +++ b/src/tx/write_tx.rs @@ -2,742 +2,19 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::{ - batch::{item::Item, PartitionKey}, - snapshot_nonce::SnapshotNonce, - Batch, HashMap, Keyspace, PersistMode, TxPartitionHandle, -}; -use lsm_tree::{AbstractTree, InternalValue, KvPair, Memtable, SeqNo, UserKey, UserValue}; -use std::{ - ops::RangeBounds, - sync::{Arc, MutexGuard}, -}; - -fn ignore_tombstone_value(item: InternalValue) -> Option { - if item.is_tombstone() { - None - } else { - Some(item) - } -} - -/// A single-writer (serialized) cross-partition transaction -/// -/// Use [`WriteTransaction::commit`] to commit changes to the keyspace. -/// -/// Drop the transaction to rollback changes. -pub struct WriteTransaction<'a> { - durability: Option, - - keyspace: Keyspace, - memtables: HashMap>, - - nonce: SnapshotNonce, - - #[allow(unused)] - tx_lock: MutexGuard<'a, ()>, -} - -impl<'a> WriteTransaction<'a> { - pub(crate) fn new( - keyspace: Keyspace, - tx_lock: MutexGuard<'a, ()>, - nonce: SnapshotNonce, - ) -> Self { - Self { - keyspace, - memtables: HashMap::default(), - tx_lock, - nonce, - durability: None, - } - } - - /// Sets the durability level. - #[must_use] - pub fn durability(mut self, mode: Option) -> Self { - self.durability = mode; - self - } - - /// Removes an item and returns its value if it existed. - /// - /// The operation will run wrapped in a transaction. - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # use std::sync::Arc; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// partition.insert("a", "abc")?; - /// - /// let mut tx = keyspace.write_tx(); - /// - /// let taken = tx.take(&partition, "a")?.unwrap(); - /// assert_eq!(b"abc", &*taken); - /// tx.commit()?; - /// - /// let item = partition.get("a")?; - /// assert!(item.is_none()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn take>( - &mut self, - partition: &TxPartitionHandle, - key: K, - ) -> crate::Result> { - self.fetch_update(partition, key, |_| None) - } - - /// Atomically updates an item and returns the new value. - /// - /// Returning `None` removes the item if it existed before. - /// - /// The operation will run wrapped in a transaction. - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions, Slice}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// partition.insert("a", "abc")?; - /// - /// let mut tx = keyspace.write_tx(); - /// - /// let updated = tx.update_fetch(&partition, "a", |_| Some(Slice::from(*b"def")))?.unwrap(); - /// assert_eq!(b"def", &*updated); - /// tx.commit()?; - /// - /// let item = partition.get("a")?; - /// assert_eq!(Some("def".as_bytes().into()), item); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # use std::sync::Arc; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// partition.insert("a", "abc")?; - /// - /// let mut tx = keyspace.write_tx(); - /// - /// let updated = tx.update_fetch(&partition, "a", |_| None)?; - /// assert!(updated.is_none()); - /// tx.commit()?; - /// - /// let item = partition.get("a")?; - /// assert!(item.is_none()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn update_fetch, F: Fn(Option<&UserValue>) -> Option>( - &mut self, - partition: &TxPartitionHandle, - key: K, - f: F, - ) -> crate::Result> { - let prev = self.get(partition, &key)?; - let updated = f(prev.as_ref()); - - if let Some(value) = &updated { - // NOTE: Skip insert if the value hasn't changed - if updated != prev { - self.insert(partition, &key, value); - } - } else if prev.is_some() { - self.remove(partition, &key); - } - - Ok(updated) - } - - /// Atomically updates an item and returns the previous value. - /// - /// Returning `None` removes the item if it existed before. - /// - /// The operation will run wrapped in a transaction. - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions, Slice}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// partition.insert("a", "abc")?; - /// - /// let mut tx = keyspace.write_tx(); - /// - /// let prev = tx.fetch_update(&partition, "a", |_| Some(Slice::from(*b"def")))?.unwrap(); - /// assert_eq!(b"abc", &*prev); - /// tx.commit()?; - /// - /// let item = partition.get("a")?; - /// assert_eq!(Some("def".as_bytes().into()), item); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # use std::sync::Arc; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// partition.insert("a", "abc")?; - /// - /// let mut tx = keyspace.write_tx(); - /// - /// let prev = tx.fetch_update(&partition, "a", |_| None)?.unwrap(); - /// assert_eq!(b"abc", &*prev); - /// tx.commit()?; - /// - /// let item = partition.get("a")?; - /// assert!(item.is_none()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn fetch_update, F: Fn(Option<&UserValue>) -> Option>( - &mut self, - partition: &TxPartitionHandle, - key: K, - f: F, - ) -> crate::Result> { - let prev = self.get(partition, &key)?; - let updated = f(prev.as_ref()); - - if let Some(value) = &updated { - // NOTE: Skip insert if the value hasn't changed - if updated != prev { - self.insert(partition, &key, value); - } - } else if prev.is_some() { - self.remove(partition, &key); - } - - Ok(prev) - } - - /// Retrieves an item from the transaction's state. - /// - /// The transaction allows reading your own writes (RYOW). - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// partition.insert("a", "previous_value")?; - /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); - /// - /// let mut tx = keyspace.write_tx(); - /// tx.insert(&partition, "a", "new_value"); - /// - /// // Read-your-own-write - /// let item = tx.get(&partition, "a")?; - /// assert_eq!(Some("new_value".as_bytes().into()), item); - /// - /// drop(tx); - /// - /// // Write was not committed - /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn get>( - &self, - partition: &TxPartitionHandle, - key: K, - ) -> crate::Result> { - if let Some(memtable) = self.memtables.get(&partition.inner.name) { - if let Some(item) = memtable.get(&key, None) { - return Ok(ignore_tombstone_value(item).map(|x| x.value)); - } - } - - partition - .inner - .snapshot_at(self.nonce.instant) - .get(key) - .map_err(Into::into) - } - - /// Returns `true` if the transaction's state contains the specified key. - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// partition.insert("a", "my_value")?; - /// assert!(keyspace.read_tx().contains_key(&partition, "a")?); - /// - /// let mut tx = keyspace.write_tx(); - /// assert!(tx.contains_key(&partition, "a")?); - /// - /// tx.insert(&partition, "b", "my_value2"); - /// assert!(tx.contains_key(&partition, "b")?); - /// - /// // Transaction not committed yet - /// assert!(!keyspace.read_tx().contains_key(&partition, "b")?); - /// - /// tx.commit()?; - /// assert!(keyspace.read_tx().contains_key(&partition, "b")?); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn contains_key>( - &self, - partition: &TxPartitionHandle, - key: K, - ) -> crate::Result { - if let Some(memtable) = self.memtables.get(&partition.inner.name) { - if let Some(item) = memtable.get(&key, None) { - return Ok(!item.key.is_tombstone()); - } - } - - partition - .inner - .snapshot_at(self.nonce.instant) - .contains_key(key) - .map_err(Into::into) - } - - /// Returns the first key-value pair in the transaction's state. - /// The key in this pair is the minimum key in the transaction's state. - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// # - /// let mut tx = keyspace.write_tx(); - /// tx.insert(&partition, "1", "abc"); - /// tx.insert(&partition, "3", "abc"); - /// tx.insert(&partition, "5", "abc"); - /// - /// let (key, _) = tx.first_key_value(&partition)?.expect("item should exist"); - /// assert_eq!(&*key, "1".as_bytes()); - /// - /// assert!(keyspace.read_tx().first_key_value(&partition)?.is_none()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn first_key_value(&self, partition: &TxPartitionHandle) -> crate::Result> { - self.iter(partition).next().transpose() - } - - /// Returns the last key-value pair in the transaction's state. - /// The key in this pair is the maximum key in the transaction's state. - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// # - /// let mut tx = keyspace.write_tx(); - /// tx.insert(&partition, "1", "abc"); - /// tx.insert(&partition, "3", "abc"); - /// tx.insert(&partition, "5", "abc"); - /// - /// let (key, _) = tx.last_key_value(&partition)?.expect("item should exist"); - /// assert_eq!(&*key, "5".as_bytes()); - /// - /// assert!(keyspace.read_tx().last_key_value(&partition)?.is_none()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn last_key_value(&self, partition: &TxPartitionHandle) -> crate::Result> { - self.iter(partition).next_back().transpose() - } - - /// Scans the entire partition, returning the amount of items. - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// partition.insert("a", "my_value")?; - /// partition.insert("b", "my_value2")?; - /// - /// let mut tx = keyspace.write_tx(); - /// assert_eq!(2, tx.len(&partition)?); - /// - /// tx.insert(&partition, "c", "my_value3"); - /// - /// // read-your-own write - /// assert_eq!(3, tx.len(&partition)?); - /// - /// // Transaction is not committed yet - /// assert_eq!(2, keyspace.read_tx().len(&partition)?); - /// - /// tx.commit()?; - /// assert_eq!(3, keyspace.read_tx().len(&partition)?); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn len(&self, partition: &TxPartitionHandle) -> crate::Result { - let mut count = 0; - - let iter = self.iter(partition); - - for kv in iter { - let _ = kv?; - count += 1; - } - - Ok(count) - } - - /// Iterates over the transaction's state. - /// - /// Avoid using this function, or limit it as otherwise it may scan a lot of items. - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// # - /// let mut tx = keyspace.write_tx(); - /// tx.insert(&partition, "a", "abc"); - /// tx.insert(&partition, "f", "abc"); - /// tx.insert(&partition, "g", "abc"); - /// - /// assert_eq!(3, tx.iter(&partition).count()); - /// assert_eq!(0, keyspace.read_tx().iter(&partition).count()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - #[must_use] - pub fn iter<'b>( - &'b self, - partition: &'b TxPartitionHandle, - ) -> impl DoubleEndedIterator> + 'static { - partition - .inner - .tree - .iter_with_seqno( - self.nonce.instant, - self.memtables.get(&partition.inner.name).cloned(), - ) - .map(|item| item.map_err(Into::into)) - } - - /// Iterates over the transaction's state, returning keys only. - /// - /// Avoid using this function, or limit it as otherwise it may scan a lot of items. - #[must_use] - pub fn keys( - &'a self, - partition: &'a TxPartitionHandle, - ) -> impl DoubleEndedIterator> + 'static { - partition - .inner - .tree - .keys_with_seqno(self.nonce.instant, None) - .map(|item| item.map_err(Into::into)) - } - - /// Iterates over the transaction's state, returning values only. - /// - /// Avoid using this function, or limit it as otherwise it may scan a lot of items. - #[must_use] - pub fn values( - &'a self, - partition: &'a TxPartitionHandle, - ) -> impl DoubleEndedIterator> + 'static { - partition - .inner - .tree - .values_with_seqno(self.nonce.instant, None) - .map(|item| item.map_err(Into::into)) - } - - /// Iterates over a range of the transaction's state. - /// - /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited). - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// # - /// let mut tx = keyspace.write_tx(); - /// tx.insert(&partition, "a", "abc"); - /// tx.insert(&partition, "f", "abc"); - /// tx.insert(&partition, "g", "abc"); - /// - /// assert_eq!(2, tx.range(&partition, "a"..="f").count()); - /// assert_eq!(0, keyspace.read_tx().range(&partition, "a"..="f").count()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - #[must_use] - pub fn range<'b, K: AsRef<[u8]> + 'b, R: RangeBounds + 'b>( - &'b self, - partition: &'b TxPartitionHandle, - range: R, - ) -> impl DoubleEndedIterator> + 'static { - partition - .inner - .tree - .range_with_seqno( - range, - self.nonce.instant, - self.memtables.get(&partition.inner.name).cloned(), - ) - .map(|item| item.map_err(Into::into)) - } - - /// Iterates over a range of the transaction's state. - /// - /// Avoid using an empty prefix as it may scan a lot of items (unless limited). - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// # - /// let mut tx = keyspace.write_tx(); - /// tx.insert(&partition, "a", "abc"); - /// tx.insert(&partition, "ab", "abc"); - /// tx.insert(&partition, "abc", "abc"); - /// - /// assert_eq!(2, tx.prefix(&partition, "ab").count()); - /// assert_eq!(0, keyspace.read_tx().prefix(&partition, "ab").count()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - #[must_use] - pub fn prefix<'b, K: AsRef<[u8]> + 'b>( - &'b self, - partition: &'b TxPartitionHandle, - prefix: K, - ) -> impl DoubleEndedIterator> + 'static { - partition - .inner - .tree - .prefix_with_seqno( - prefix, - self.nonce.instant, - self.memtables.get(&partition.inner.name).cloned(), - ) - .map(|item| item.map_err(Into::into)) - } - - /// Inserts a key-value pair into the partition. - /// - /// Keys may be up to 65536 bytes long, values up to 2^32 bytes. - /// Shorter keys and values result in better performance. - /// - /// If the key already exists, the item will be overwritten. - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// partition.insert("a", "previous_value")?; - /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); - /// - /// let mut tx = keyspace.write_tx(); - /// tx.insert(&partition, "a", "new_value"); - /// - /// drop(tx); - /// - /// // Write was not committed - /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn insert, V: AsRef<[u8]>>( - &mut self, - partition: &TxPartitionHandle, - key: K, - value: V, - ) { - self.memtables - .entry(partition.inner.name.clone()) - .or_default() - .insert(lsm_tree::InternalValue::from_components( - key.as_ref(), - value.as_ref(), - // NOTE: Just take the max seqno, which should never be reached - // that way, the write is definitely always the newest - SeqNo::MAX, - lsm_tree::ValueType::Value, - )); - } - - /// Removes an item from the partition. - /// - /// The key may be up to 65536 bytes long. - /// Shorter keys result in better performance. - /// - /// # Examples - /// - /// ``` - /// # use fjall::{Config, Keyspace, PartitionCreateOptions}; - /// # - /// # let folder = tempfile::tempdir()?; - /// # let keyspace = Config::new(folder).open_transactional()?; - /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?; - /// partition.insert("a", "previous_value")?; - /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); - /// - /// let mut tx = keyspace.write_tx(); - /// tx.remove(&partition, "a"); - /// - /// // Read-your-own-write - /// let item = tx.get(&partition, "a")?; - /// assert_eq!(None, item); - /// - /// drop(tx); - /// - /// // Deletion was not committed - /// assert_eq!(b"previous_value", &*partition.get("a")?.unwrap()); - /// # - /// # Ok::<(), fjall::Error>(()) - /// ``` - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn remove>(&mut self, partition: &TxPartitionHandle, key: K) { - self.memtables - .entry(partition.inner.name.clone()) - .or_default() - .insert(lsm_tree::InternalValue::new_tombstone( - key.as_ref(), - // NOTE: Just take the max seqno, which should never be reached - // that way, the write is definitely always the newest - SeqNo::MAX, - )); - } - - /// Commits the transaction. - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn commit(self) -> crate::Result<()> { - let mut batch = Batch::new(self.keyspace).durability(self.durability); - - /* - for (partition_key, memtable) in self.memtables { - let memtable = Arc::into_inner(memtable).expect("should be able to unwrap Arc"); - - for (internal_key, value) in memtable.items { - batch.data.push(Item::new( - partition_key.clone(), - internal_key.user_key, - value, - internal_key.value_type, - )); - } - } - */ - - for (partition_key, memtable) in self.memtables { - for item in memtable.iter() { - batch.data.push(Item::new( - partition_key.clone(), - item.key.user_key.clone(), - item.value.clone(), - item.key.value_type, - )); - } - } - - // TODO: instead of using batch, write batch::commit as a generic function that takes - // a impl Iterator - // that way, we don't have to move the memtable(s) into the batch first to commit - batch.commit() - } - - /// More explicit alternative to dropping the transaction - /// to roll it back. - pub fn rollback(self) {} -} +#[cfg(all(feature = "single_writer_tx", feature = "ssi_tx"))] +compile_error!("Either single_writer_tx or ssi_tx can be enabled at once"); + +#[cfg(feature = "single_writer_tx")] +pub use super::write::single_writer::WriteTransaction; + +#[cfg(feature = "ssi_tx")] +pub use super::write::ssi::WriteTransaction; + +// TODO: +// use https://github.com/rust-lang/rust/issues/43781 +// when stable +// +// #[doc(cfg(feature = "single_writer_tx"))] +// +// #[doc(cfg(feature = "ssi_tx"))]