Skip to content

Commit

Permalink
Merge pull request #79 from jeromegn/ssi-tx
Browse files Browse the repository at this point in the history
Serializable Snapshot Isolation (SSI)
  • Loading branch information
marvin-j97 authored Oct 19, 2024
2 parents d2ec49f + e868f20 commit e6c325d
Show file tree
Hide file tree
Showing 36 changed files with 2,982 additions and 795 deletions.
2 changes: 2 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[profile.default]
slow-timeout = "1m"
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ jobs:
run: cargo test --features __internal_whitebox -- whitebox_ --test-threads=1
- name: Run tests
run: cargo nextest run --features lz4,miniz,single_writer_tx,bloom
- name: Run SSI tests
run: cargo nextest run --no-default-features --features ssi_tx tx_ssi_
- name: Run doc tests
run: cargo test --doc
- name: Run SSI doc tests
run: cargo test --no-default-features --features ssi_tx --doc
- name: Build & test examples
run: node compile_examples.mjs
cross:
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ lz4 = ["lsm-tree/lz4"]
miniz = ["lsm-tree/miniz"]
bloom = ["lsm-tree/bloom"]
single_writer_tx = []
ssi_tx = []
__internal_whitebox = []

[dependencies]
Expand All @@ -42,6 +43,7 @@ rand = "0.8.5"

[package.metadata.cargo-all-features]
denylist = ["__internal_whitebox"]
skip_feature_sets = [["ssi_tx", "single_writer_tx"]]

[[bench]]
name = "lsmt"
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Fjall is an LSM-based embeddable key-value storage engine written in Rust. It fe
- Automatic background maintenance
- Partitions (a.k.a. column families) with cross-partition atomic semantics
- Built-in compression (default = LZ4)
- Single-writer, multi-reader transactions (optional)
- Serializable transactions (optional)
- Key-value separation for large blob use cases (optional)

Each `Keyspace` is a single logical database and is split into `partitions` (a.k.a. column families) - you should probably only use a single keyspace for your application. Each partition is physically a single LSM-tree and its own logical collection (a persistent, sorted map); however, write operations across partitions are atomic as they are persisted in a single keyspace-level journal, which will be recovered on restart.
Expand Down Expand Up @@ -117,6 +117,13 @@ Allows opening a transactional Keyspace for single-writer (serialized) transacti

*Enabled by default.*

### ssi_tx

Allows opening a transactional Keyspace for multi-writer, serializable transactions, allowing RYOW (read-your-own-write), fetch-and-update and other atomic operations.
Conflict checking is done using optimistic concurrency control.

*Disabled by default.*

## Stable disk format

The disk format is stable as of 1.0.0.
Expand Down
1 change: 0 additions & 1 deletion examples/tx-atomic-counter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ fn main() -> fjall::Result<()> {

write_tx.insert(&counters, "c1", next.to_be_bytes());
write_tx.commit()?;
keyspace.persist(PersistMode::Buffer)?;

println!("worker {idx} incremented to {next}");

Expand Down
2 changes: 0 additions & 2 deletions examples/tx-mpmc-queue/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ fn main() -> fjall::Result<()> {
let task_id = scru128::new_string();

tasks.insert(&task_id, &task_id)?;
keyspace.persist(PersistMode::Buffer)?;

println!("producer {idx} created task {task_id}");

Expand Down Expand Up @@ -67,7 +66,6 @@ fn main() -> fjall::Result<()> {
tx.remove(&tasks, &key);

tx.commit()?;
keyspace.persist(PersistMode::Buffer)?;

let task_id = std::str::from_utf8(&key).unwrap();
println!("consumer {idx} completed task {task_id}");
Expand Down
1 change: 0 additions & 1 deletion examples/tx-partition-move/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ fn main() -> fjall::Result<()> {
tx.insert(&dst, &key, &value);

tx.commit()?;
keyspace.persist(PersistMode::Buffer)?;

let task_id = std::str::from_utf8(&key).unwrap();
println!("consumer {idx} moved {task_id}");
Expand Down
Empty file.
14 changes: 14 additions & 0 deletions examples/tx-ssi-atomic-counter/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "tx-atomic-counter"
version = "0.0.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
fjall = { path = "../../", default-features = false, features = [
"bloom",
"lz4",
"ssi_tx",
] }
rand = "0.8.5"
3 changes: 3 additions & 0 deletions examples/tx-ssi-atomic-counter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# tx-ssi-atomic-counter

This example demonstrates using transactions for atomic updates.
58 changes: 58 additions & 0 deletions examples/tx-ssi-atomic-counter/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use fjall::{Config, PersistMode};
use std::path::Path;

const LIMIT: u64 = 100;

fn main() -> fjall::Result<()> {
let path = Path::new(".fjall_data");

let keyspace = Config::new(path).temporary(true).open_transactional()?;
let counters = keyspace.open_partition("counters", Default::default())?;

counters.insert("c1", 0_u64.to_be_bytes())?;

let workers = (0_u8..4)
.map(|idx| {
let keyspace = keyspace.clone();
let counters = counters.clone();

std::thread::spawn(move || {
use rand::Rng;

let mut rng = rand::thread_rng();

loop {
let mut write_tx = keyspace.write_tx().unwrap();

let item = write_tx.get(&counters, "c1")?.unwrap();

let mut bytes = [0; 8];
bytes.copy_from_slice(&item);
let prev = u64::from_be_bytes(bytes);

if prev >= LIMIT {
return Ok::<_, fjall::Error>(());
}

let next = prev + 1;

write_tx.insert(&counters, "c1", next.to_be_bytes());
write_tx.commit()?.ok();

println!("worker {idx} incremented to {next}");

let ms = rng.gen_range(10..400);
std::thread::sleep(std::time::Duration::from_millis(ms));
}
})
})
.collect::<Vec<_>>();

for worker in workers {
worker.join().unwrap()?;
}

assert_eq!(&*counters.get("c1").unwrap().unwrap(), LIMIT.to_be_bytes());

Ok(())
}
Empty file added examples/tx-ssi-cc/.run
Empty file.
13 changes: 13 additions & 0 deletions examples/tx-ssi-cc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "tx-ssi-cc"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
fjall = { path = "../../", default-features = false, features = [
"bloom",
"lz4",
"ssi_tx",
] }
3 changes: 3 additions & 0 deletions examples/tx-ssi-cc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# tx-ssi-cc

This example demonstrates concurrent transactions using SSI (serializable snapshot isolation).
53 changes: 53 additions & 0 deletions examples/tx-ssi-cc/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::time::{Duration, Instant};

fn main() -> fjall::Result<()> {
let keyspace = fjall::Config::default()
.temporary(true)
.open_transactional()?;
let items = keyspace.open_partition("items", Default::default())?;

let start = Instant::now();

let t1 = {
let keyspace = keyspace.clone();
let items = items.clone();

std::thread::spawn(move || {
let mut wtx = keyspace.write_tx().unwrap();
println!("Started tx1");
std::thread::sleep(Duration::from_secs(3));
wtx.insert(&items, "a", "a");
wtx.commit()
})
};

let t2 = {
let keyspace = keyspace.clone();
let items = items.clone();

std::thread::spawn(move || {
let mut wtx = keyspace.write_tx().unwrap();
println!("Started tx2");
std::thread::sleep(Duration::from_secs(3));
wtx.insert(&items, "b", "b");
wtx.commit()
})
};

t1.join()
.expect("should join")?
.expect("tx should not fail");

t2.join()
.expect("should join")?
.expect("tx should not fail");

// NOTE: We would expect a single writer tx implementation to finish in
// ~6 seconds
println!("Done in {:?}, items.len={}", start.elapsed(), {
let rtx = keyspace.read_tx();
rtx.len(&items)?
});

Ok(())
}
Empty file added examples/tx-ssi-mpmc-queue/.run
Empty file.
15 changes: 15 additions & 0 deletions examples/tx-ssi-mpmc-queue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "tx-mpmc-queue"
version = "0.0.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
fjall = { path = "../../", default-features = false, features = [
"bloom",
"lz4",
"ssi_tx",
] }
rand = "0.8.5"
scru128 = "3.0.2"
3 changes: 3 additions & 0 deletions examples/tx-ssi-mpmc-queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# tx-ssi-mpmc-queue

This example demonstrates implementing a FIFO-MPMC queue using transactions.
98 changes: 98 additions & 0 deletions examples/tx-ssi-mpmc-queue/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use fjall::{Config, PersistMode};
use std::path::Path;
use std::sync::{
atomic::{AtomicUsize, Ordering::Relaxed},
Arc,
};

const PRODUCER_COUNT: usize = 4;
const PRODUCING_COUNT: usize = 100;

const EXPECTED_COUNT: usize = PRODUCER_COUNT * PRODUCING_COUNT;

fn main() -> fjall::Result<()> {
let path = Path::new(".fjall_data");

let keyspace = Config::new(path).temporary(true).open_transactional()?;
let tasks = keyspace.open_partition("tasks", Default::default())?;

let counter = Arc::new(AtomicUsize::default());

let producers = (0..PRODUCER_COUNT)
.map(|idx| {
let keyspace = keyspace.clone();
let tasks = tasks.clone();

std::thread::spawn(move || {
use rand::Rng;

let mut rng = rand::thread_rng();

for _ in 0..PRODUCING_COUNT {
let task_id = scru128::new_string();

tasks.insert(&task_id, &task_id)?;

println!("producer {idx} created task {task_id}");

let ms = rng.gen_range(10..100);
std::thread::sleep(std::time::Duration::from_millis(ms));
}

println!("producer {idx} done");

Ok::<_, fjall::Error>(())
})
})
.collect::<Vec<_>>();

let consumers = (0..4)
.map(|idx| {
let keyspace = keyspace.clone();
let tasks = tasks.clone();
let counter = counter.clone();

std::thread::spawn(move || {
use rand::Rng;

let mut rng = rand::thread_rng();

loop {
let mut tx = keyspace.write_tx().unwrap();

// TODO: NOTE:
// Tombstones will add up over time, making first KV slower
// Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete
// would be good for this type of workload
if let Some((key, _)) = tx.first_key_value(&tasks)? {
tx.remove(&tasks, &key);

if tx.commit()?.is_ok() {
counter.fetch_add(1, Relaxed);
}

let task_id = std::str::from_utf8(&key).unwrap();
println!("consumer {idx} completed task {task_id}");

let ms = rng.gen_range(50..200);
std::thread::sleep(std::time::Duration::from_millis(ms));
} else if counter.load(Relaxed) == EXPECTED_COUNT {
return Ok::<_, fjall::Error>(());
}
}
})
})
.collect::<Vec<_>>();

for t in producers {
t.join().unwrap()?;
}

for t in consumers {
t.join().unwrap()?;
}

assert_eq!(EXPECTED_COUNT, counter.load(Relaxed));

Ok(())
}
Empty file.
15 changes: 15 additions & 0 deletions examples/tx-ssi-partition-move/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "tx-partition-move"
version = "0.0.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
fjall = { path = "../../", default-features = false, features = [
"bloom",
"lz4",
"ssi_tx",
] }
rand = "0.8.5"
scru128 = "3.0.2"
3 changes: 3 additions & 0 deletions examples/tx-ssi-partition-move/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# tx-ssi-partition-move

This example demonstrates atomically moving items between partitions using transactions.
Loading

0 comments on commit e6c325d

Please sign in to comment.