From 738f05db03d04e9741110e0e4aafe71cd1ea701d Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Thu, 26 Oct 2023 16:20:08 +0200 Subject: [PATCH 01/12] [no ci] Switch to HashMap of last operation from vec --- full-node/db/sov-schema-db/src/lib.rs | 66 +++++++++++++-------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/full-node/db/sov-schema-db/src/lib.rs b/full-node/db/sov-schema-db/src/lib.rs index 97d0ca3b8..ed8f68773 100644 --- a/full-node/db/sov-schema-db/src/lib.rs +++ b/full-node/db/sov-schema-db/src/lib.rs @@ -192,15 +192,15 @@ impl DB { let _timer = SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS .with_label_values(&[self.name]) .start_timer(); - let rows_locked = batch.rows.lock().expect("Lock must not be poisoned"); + let rows_locked = batch.last_writes.lock().expect("Lock must not be poisoned"); let mut db_batch = rocksdb::WriteBatch::default(); for (cf_name, rows) in rows_locked.iter() { let cf_handle = self.get_cf_handle(cf_name)?; - for write_op in rows { - match write_op { - WriteOp::Value { key, value } => db_batch.put_cf(cf_handle, key, value), - WriteOp::Deletion { key } => db_batch.delete_cf(cf_handle, key), + for (key, operation) in rows { + match operation { + Operation::Put { value } => db_batch.put_cf(cf_handle, key, value), + Operation::Delete => db_batch.delete_cf(cf_handle, key), } } } @@ -210,14 +210,14 @@ impl DB { // Bump counters only after DB write succeeds. for (cf_name, rows) in rows_locked.iter() { - for write_op in rows { - match write_op { - WriteOp::Value { key, value } => { + for (key, operation) in rows { + match operation { + Operation::Put { value } => { SCHEMADB_PUT_BYTES .with_label_values(&[cf_name]) .observe((key.len() + value.len()) as f64); } - WriteOp::Deletion { key: _ } => { + Operation::Delete => { SCHEMADB_DELETES.with_label_values(&[cf_name]).inc(); } } @@ -266,11 +266,14 @@ impl DB { } } +type SchemaKey = Vec; +type SchemaValue = Vec; + #[cfg_attr(feature = "arbitrary", derive(proptest_derive::Arbitrary))] #[derive(Debug, PartialEq, Eq, Hash)] -enum WriteOp { - Value { key: Vec, value: Vec }, - Deletion { key: Vec }, +enum Operation { + Put { value: SchemaValue }, + Delete, } /// [`SchemaBatch`] holds a collection of updates that can be applied to a DB @@ -278,7 +281,7 @@ enum WriteOp { /// they are added to the [`SchemaBatch`]. #[derive(Debug, Default)] pub struct SchemaBatch { - rows: Mutex>>, + last_writes: Mutex>>, } impl SchemaBatch { @@ -297,52 +300,49 @@ impl SchemaBatch { .with_label_values(&["unknown"]) .start_timer(); let key = key.encode_key()?; - let value = value.encode_value()?; - self.rows - .lock() - .expect("Lock must not be poisoned") - .entry(S::COLUMN_FAMILY_NAME) - .or_default() - .push(WriteOp::Value { key, value }); - + let put_operation = Operation::Put { + value: value.encode_value()?, + }; + self.insert_operation::(key, put_operation); Ok(()) } /// Adds a delete operation to the batch. pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { let key = key.encode_key()?; - self.rows - .lock() - .expect("Lock must not be poisoned") - .entry(S::COLUMN_FAMILY_NAME) - .or_default() - .push(WriteOp::Deletion { key }); + self.insert_operation::(key, Operation::Delete); Ok(()) } + + fn insert_operation(&self, key: SchemaKey, operation: Operation) { + let mut last_writes = self.last_writes.lock().expect("Lock must not be poisoned"); + let column_writes = last_writes.entry(S::COLUMN_FAMILY_NAME).or_default(); + column_writes.insert(key, operation); + } } #[cfg(feature = "arbitrary")] impl proptest::arbitrary::Arbitrary for SchemaBatch { type Parameters = &'static [ColumnFamilyName]; - type Strategy = proptest::strategy::BoxedStrategy; - fn arbitrary_with(columns: Self::Parameters) -> Self::Strategy { use proptest::prelude::any; use proptest::strategy::Strategy; - proptest::collection::vec(any::>(), columns.len()) + proptest::collection::vec(any::>(), columns.len()) .prop_map::(|vec_vec_write_ops| { let mut rows = HashMap::new(); - for (col, write_ops) in columns.iter().zip(vec_vec_write_ops.into_iter()) { - rows.insert(*col, write_ops); + for (col, write_op) in columns.iter().zip(vec_vec_write_ops.into_iter()) { + rows.insert(*col, write_op); } SchemaBatch { - rows: Mutex::new(rows), + last_writes: Mutex::new(rows), } }) .boxed() } + + type Strategy = proptest::strategy::BoxedStrategy; } /// An error that occurred during (de)serialization of a [`Schema`]'s keys or From 3a8000dc1cb8a5b0a444900446b4ae7432d78c7a Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Thu, 26 Oct 2023 21:00:54 +0200 Subject: [PATCH 02/12] [no ci] Adding a DbSnapshot --- full-node/db/sov-schema-db/src/lib.rs | 25 ++++- full-node/db/sov-schema-db/src/snapshot.rs | 108 +++++++++++++++++++++ 2 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 full-node/db/sov-schema-db/src/snapshot.rs diff --git a/full-node/db/sov-schema-db/src/lib.rs b/full-node/db/sov-schema-db/src/lib.rs index ed8f68773..3db2b4d1f 100644 --- a/full-node/db/sov-schema-db/src/lib.rs +++ b/full-node/db/sov-schema-db/src/lib.rs @@ -17,6 +17,7 @@ mod iterator; mod metrics; pub mod schema; +pub mod snapshot; use std::collections::HashMap; use std::path::Path; @@ -270,9 +271,15 @@ type SchemaKey = Vec; type SchemaValue = Vec; #[cfg_attr(feature = "arbitrary", derive(proptest_derive::Arbitrary))] -#[derive(Debug, PartialEq, Eq, Hash)] -enum Operation { - Put { value: SchemaValue }, +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +/// Represents operation written to the database +pub enum Operation { + /// Writing a value to the DB. + Put { + /// Value to write + value: SchemaValue, + }, + /// Deleting a value Delete, } @@ -281,6 +288,7 @@ enum Operation { /// they are added to the [`SchemaBatch`]. #[derive(Debug, Default)] pub struct SchemaBatch { + // TODO: Why do we need a mutex here? last_writes: Mutex>>, } @@ -320,6 +328,17 @@ impl SchemaBatch { let column_writes = last_writes.entry(S::COLUMN_FAMILY_NAME).or_default(); column_writes.insert(key, operation); } + + #[allow(dead_code)] + pub(crate) fn read( + &self, + key: &impl KeyCodec, + ) -> anyhow::Result> { + let key = key.encode_key()?; + let last_writes = self.last_writes.lock().expect("Lock must not be poisoned"); + let column_writes = last_writes.get(&S::COLUMN_FAMILY_NAME).unwrap(); + Ok(column_writes.get(&key).cloned()) + } } #[cfg(feature = "arbitrary")] diff --git a/full-node/db/sov-schema-db/src/snapshot.rs b/full-node/db/sov-schema-db/src/snapshot.rs new file mode 100644 index 000000000..880b243e9 --- /dev/null +++ b/full-node/db/sov-schema-db/src/snapshot.rs @@ -0,0 +1,108 @@ +//! Snapshot related logic + +use std::sync::{Arc, LockResult, RwLock, RwLockReadGuard}; + +use crate::schema::{KeyCodec, ValueCodec}; +use crate::{Operation, Schema, SchemaBatch, DB}; + +/// Id of database snapshot +pub type SnapshotId = u64; + +/// A trait to make nested calls to several [`Schema`] +pub trait QueryManager { + /// Get a value from snapshot or its parents + fn get( + &self, + snapshot_id: SnapshotId, + key: &impl KeyCodec, + ) -> anyhow::Result>; +} + +/// Simple wrapper around `RwLock` that only allows read access. +pub struct ReadOnlyLock { + lock: Arc>, +} + +impl ReadOnlyLock { + #[allow(dead_code)] + /// Create new [`ReadOnlyLock`] from [`Arc>`]. + pub fn new(lock: Arc>) -> Self { + Self { lock } + } + + /// Acquires a read lock on the underlying `RwLock`. + pub fn read(&self) -> LockResult> { + self.lock.read() + } +} + +/// Wrapper around [`DB`] that allows to read from snapshots +#[allow(dead_code)] +pub struct DbSnapshot { + id: SnapshotId, + cache: SchemaBatch, + manager: ReadOnlyLock, + db_reader: Arc, +} + +#[allow(dead_code)] +impl DbSnapshot { + /// Create new [`DbSnapshot`] + pub fn new(id: SnapshotId, manager: ReadOnlyLock, db_reader: Arc) -> Self { + Self { + id, + cache: SchemaBatch::default(), + manager, + db_reader, + } + } + + /// Get a value from current snapshot, its parents or underlying database + pub fn read(&self, key: &impl KeyCodec) -> anyhow::Result> { + // Some(Operation) means that key was touched, + // but in case of deletion we early return None + // Only in case of not finding operation for key, + // we go deeper + + // 1. Check in cache + if let Some(operation) = self.cache.read(key)? { + return decode_operation::(operation); + } + + // Check parent + { + let parent = self + .manager + .read() + .expect("Parent lock must not be poisoned"); + if let Some(operation) = parent.get(self.id, key)? { + return decode_operation::(operation); + } + } + + // Check db + + self.db_reader.get(key) + } + + /// Store a value in snapshot + pub fn put( + &self, + key: &impl KeyCodec, + value: &impl ValueCodec, + ) -> anyhow::Result<()> { + // TODO: Lock here? + self.cache.put(key, value)?; + Ok(()) + } +} + +fn decode_operation(operation: Operation) -> anyhow::Result> { + match operation { + Operation::Put { value } => { + let value = S::Value::decode_value(&value)?; + Ok(Some(value)) + } + Operation::Delete => Ok(None), + } +} From ab44ed401a19fa1f98728df412e95d8c3a5969bd Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Thu, 26 Oct 2023 21:37:07 +0200 Subject: [PATCH 03/12] [no ci] Remove lock from inside SchemaBatch --- full-node/db/sov-db/src/native_db.rs | 2 +- full-node/db/sov-schema-db/src/lib.rs | 27 ++++++++-------------- full-node/db/sov-schema-db/src/snapshot.rs | 19 ++++++++++----- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/full-node/db/sov-db/src/native_db.rs b/full-node/db/sov-db/src/native_db.rs index 7cb3dceec..97e627b23 100644 --- a/full-node/db/sov-db/src/native_db.rs +++ b/full-node/db/sov-db/src/native_db.rs @@ -48,7 +48,7 @@ impl NativeDB { &self, key_value_pairs: impl IntoIterator, Option>)>, ) -> anyhow::Result<()> { - let batch = SchemaBatch::default(); + let mut batch = SchemaBatch::default(); for (key, value) in key_value_pairs { batch.put::(&key, &value)?; } diff --git a/full-node/db/sov-schema-db/src/lib.rs b/full-node/db/sov-schema-db/src/lib.rs index 3db2b4d1f..031caeb33 100644 --- a/full-node/db/sov-schema-db/src/lib.rs +++ b/full-node/db/sov-schema-db/src/lib.rs @@ -21,7 +21,6 @@ pub mod snapshot; use std::collections::HashMap; use std::path::Path; -use std::sync::Mutex; use anyhow::format_err; use iterator::ScanDirection; @@ -145,7 +144,7 @@ impl DB { ) -> anyhow::Result<()> { // Not necessary to use a batch, but we'd like a central place to bump counters. // Used in tests only anyway. - let batch = SchemaBatch::new(); + let mut batch = SchemaBatch::new(); batch.put::(key, value)?; self.write_schemas(batch) } @@ -193,10 +192,8 @@ impl DB { let _timer = SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS .with_label_values(&[self.name]) .start_timer(); - let rows_locked = batch.last_writes.lock().expect("Lock must not be poisoned"); - let mut db_batch = rocksdb::WriteBatch::default(); - for (cf_name, rows) in rows_locked.iter() { + for (cf_name, rows) in batch.last_writes.iter() { let cf_handle = self.get_cf_handle(cf_name)?; for (key, operation) in rows { match operation { @@ -210,7 +207,7 @@ impl DB { self.inner.write_opt(db_batch, &default_write_options())?; // Bump counters only after DB write succeeds. - for (cf_name, rows) in rows_locked.iter() { + for (cf_name, rows) in batch.last_writes.iter() { for (key, operation) in rows { match operation { Operation::Put { value } => { @@ -289,7 +286,7 @@ pub enum Operation { #[derive(Debug, Default)] pub struct SchemaBatch { // TODO: Why do we need a mutex here? - last_writes: Mutex>>, + last_writes: HashMap>, } impl SchemaBatch { @@ -300,7 +297,7 @@ impl SchemaBatch { /// Adds an insert/update operation to the batch. pub fn put( - &self, + &mut self, key: &impl KeyCodec, value: &impl ValueCodec, ) -> anyhow::Result<()> { @@ -316,16 +313,15 @@ impl SchemaBatch { } /// Adds a delete operation to the batch. - pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { + pub fn delete(&mut self, key: &impl KeyCodec) -> anyhow::Result<()> { let key = key.encode_key()?; self.insert_operation::(key, Operation::Delete); Ok(()) } - fn insert_operation(&self, key: SchemaKey, operation: Operation) { - let mut last_writes = self.last_writes.lock().expect("Lock must not be poisoned"); - let column_writes = last_writes.entry(S::COLUMN_FAMILY_NAME).or_default(); + fn insert_operation(&mut self, key: SchemaKey, operation: Operation) { + let column_writes = self.last_writes.entry(S::COLUMN_FAMILY_NAME).or_default(); column_writes.insert(key, operation); } @@ -335,8 +331,7 @@ impl SchemaBatch { key: &impl KeyCodec, ) -> anyhow::Result> { let key = key.encode_key()?; - let last_writes = self.last_writes.lock().expect("Lock must not be poisoned"); - let column_writes = last_writes.get(&S::COLUMN_FAMILY_NAME).unwrap(); + let column_writes = self.last_writes.get(&S::COLUMN_FAMILY_NAME).unwrap(); Ok(column_writes.get(&key).cloned()) } } @@ -354,9 +349,7 @@ impl proptest::arbitrary::Arbitrary for SchemaBatch { for (col, write_op) in columns.iter().zip(vec_vec_write_ops.into_iter()) { rows.insert(*col, write_op); } - SchemaBatch { - last_writes: Mutex::new(rows), - } + SchemaBatch { last_writes: rows } }) .boxed() } diff --git a/full-node/db/sov-schema-db/src/snapshot.rs b/full-node/db/sov-schema-db/src/snapshot.rs index 880b243e9..f52263ab1 100644 --- a/full-node/db/sov-schema-db/src/snapshot.rs +++ b/full-node/db/sov-schema-db/src/snapshot.rs @@ -1,6 +1,6 @@ //! Snapshot related logic -use std::sync::{Arc, LockResult, RwLock, RwLockReadGuard}; +use std::sync::{Arc, LockResult, Mutex, RwLock, RwLockReadGuard}; use crate::schema::{KeyCodec, ValueCodec}; use crate::{Operation, Schema, SchemaBatch, DB}; @@ -40,7 +40,7 @@ impl ReadOnlyLock { #[allow(dead_code)] pub struct DbSnapshot { id: SnapshotId, - cache: SchemaBatch, + cache: Mutex, manager: ReadOnlyLock, db_reader: Arc, } @@ -51,7 +51,7 @@ impl DbSnapshot { pub fn new(id: SnapshotId, manager: ReadOnlyLock, db_reader: Arc) -> Self { Self { id, - cache: SchemaBatch::default(), + cache: Mutex::new(SchemaBatch::default()), manager, db_reader, } @@ -65,7 +65,12 @@ impl DbSnapshot { // we go deeper // 1. Check in cache - if let Some(operation) = self.cache.read(key)? { + if let Some(operation) = self + .cache + .lock() + .expect("SchemaBatch lock should not be poisoned") + .read(key)? + { return decode_operation::(operation); } @@ -91,8 +96,10 @@ impl DbSnapshot { key: &impl KeyCodec, value: &impl ValueCodec, ) -> anyhow::Result<()> { - // TODO: Lock here? - self.cache.put(key, value)?; + self.cache + .lock() + .expect("SchemaBatch lock must not be poisoned") + .put(key, value)?; Ok(()) } } From 0d3daa4751ff3595c95f67d7468716a901cea8d9 Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Thu, 26 Oct 2023 21:43:41 +0200 Subject: [PATCH 04/12] [no ci] Fix lint and add FrozenDbSnapshot --- full-node/db/sov-schema-db/src/snapshot.rs | 40 +++++++++++++++++++++ full-node/db/sov-schema-db/tests/db_test.rs | 8 ++--- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/full-node/db/sov-schema-db/src/snapshot.rs b/full-node/db/sov-schema-db/src/snapshot.rs index f52263ab1..f7dad0356 100644 --- a/full-node/db/sov-schema-db/src/snapshot.rs +++ b/full-node/db/sov-schema-db/src/snapshot.rs @@ -104,6 +104,46 @@ impl DbSnapshot { } } +/// Read only version of [`DbSnapshot`], for usage inside [`QueryManager`] +pub struct FrozenDbSnapshot { + id: SnapshotId, + cache: SchemaBatch, +} + +impl FrozenDbSnapshot { + /// Get value from its own cache + pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { + if let Some(operation) = self.cache.read(key)? { + return decode_operation::(operation); + } + + Ok(None) + } + + /// Get id of this Snapshot + pub fn get_id(&self) -> SnapshotId { + self.id + } +} + +impl From> for FrozenDbSnapshot { + fn from(snapshot: DbSnapshot) -> Self { + Self { + id: snapshot.id, + cache: snapshot + .cache + .into_inner() + .expect("SchemaBatch lock must not be poisoned"), + } + } +} + +impl From for SchemaBatch { + fn from(value: FrozenDbSnapshot) -> Self { + value.cache + } +} + fn decode_operation(operation: Operation) -> anyhow::Result> { match operation { Operation::Put { value } => { diff --git a/full-node/db/sov-schema-db/tests/db_test.rs b/full-node/db/sov-schema-db/tests/db_test.rs index fcfb3c2b5..44d634244 100644 --- a/full-node/db/sov-schema-db/tests/db_test.rs +++ b/full-node/db/sov-schema-db/tests/db_test.rs @@ -209,7 +209,7 @@ fn gen_expected_values(values: &[(u32, u32)]) -> Vec<(TestField, TestField)> { fn test_single_schema_batch() { let db = TestDB::new(); - let db_batch = SchemaBatch::new(); + let mut db_batch = SchemaBatch::new(); db_batch .put::(&TestField(0), &TestField(0)) .unwrap(); @@ -247,7 +247,7 @@ fn test_single_schema_batch() { fn test_two_schema_batches() { let db = TestDB::new(); - let db_batch1 = SchemaBatch::new(); + let mut db_batch1 = SchemaBatch::new(); db_batch1 .put::(&TestField(0), &TestField(0)) .unwrap(); @@ -265,7 +265,7 @@ fn test_two_schema_batches() { gen_expected_values(&[(0, 0), (1, 1)]), ); - let db_batch2 = SchemaBatch::new(); + let mut db_batch2 = SchemaBatch::new(); db_batch2.delete::(&TestField(3)).unwrap(); db_batch2 .put::(&TestField(3), &TestField(3)) @@ -345,7 +345,7 @@ fn test_report_size() { let db = TestDB::new(); for i in 0..1000 { - let db_batch = SchemaBatch::new(); + let mut db_batch = SchemaBatch::new(); db_batch .put::(&TestField(i), &TestField(i)) .unwrap(); From 84ab9ce87d0a52bcea8c869f313e0e04985c2d1b Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Fri, 27 Oct 2023 12:15:12 +0200 Subject: [PATCH 05/12] Add test for snapshot lifecycle --- full-node/db/sov-schema-db/src/lib.rs | 6 +- full-node/db/sov-schema-db/src/snapshot.rs | 17 ++- .../db/sov-schema-db/tests/snapshot_test.rs | 142 ++++++++++++++++++ 3 files changed, 158 insertions(+), 7 deletions(-) create mode 100644 full-node/db/sov-schema-db/tests/snapshot_test.rs diff --git a/full-node/db/sov-schema-db/src/lib.rs b/full-node/db/sov-schema-db/src/lib.rs index 031caeb33..5270d193c 100644 --- a/full-node/db/sov-schema-db/src/lib.rs +++ b/full-node/db/sov-schema-db/src/lib.rs @@ -331,8 +331,10 @@ impl SchemaBatch { key: &impl KeyCodec, ) -> anyhow::Result> { let key = key.encode_key()?; - let column_writes = self.last_writes.get(&S::COLUMN_FAMILY_NAME).unwrap(); - Ok(column_writes.get(&key).cloned()) + if let Some(column_writes) = self.last_writes.get(&S::COLUMN_FAMILY_NAME) { + return Ok(column_writes.get(&key).cloned()); + } + Ok(None) } } diff --git a/full-node/db/sov-schema-db/src/snapshot.rs b/full-node/db/sov-schema-db/src/snapshot.rs index f7dad0356..4b57cb29a 100644 --- a/full-node/db/sov-schema-db/src/snapshot.rs +++ b/full-node/db/sov-schema-db/src/snapshot.rs @@ -80,7 +80,7 @@ impl DbSnapshot { .manager .read() .expect("Parent lock must not be poisoned"); - if let Some(operation) = parent.get(self.id, key)? { + if let Some(operation) = parent.get::(self.id, key)? { return decode_operation::(operation); } } @@ -99,8 +99,15 @@ impl DbSnapshot { self.cache .lock() .expect("SchemaBatch lock must not be poisoned") - .put(key, value)?; - Ok(()) + .put(key, value) + } + + /// Delete given key from snapshot + pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { + self.cache + .lock() + .expect("SchemaBatch lock must not be poisoned") + .delete(key) } } @@ -112,9 +119,9 @@ pub struct FrozenDbSnapshot { impl FrozenDbSnapshot { /// Get value from its own cache - pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { + pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { if let Some(operation) = self.cache.read(key)? { - return decode_operation::(operation); + return Ok(Some(operation)); } Ok(None) diff --git a/full-node/db/sov-schema-db/tests/snapshot_test.rs b/full-node/db/sov-schema-db/tests/snapshot_test.rs new file mode 100644 index 000000000..b37f5f2e9 --- /dev/null +++ b/full-node/db/sov-schema-db/tests/snapshot_test.rs @@ -0,0 +1,142 @@ +use std::path::Path; +use std::sync::{Arc, RwLock}; + +use byteorder::{BigEndian, ReadBytesExt}; +use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; +use sov_schema_db::schema::{ColumnFamilyName, KeyCodec, KeyDecoder, KeyEncoder, ValueCodec}; +use sov_schema_db::snapshot::{ + DbSnapshot, FrozenDbSnapshot, QueryManager, ReadOnlyLock, SnapshotId, +}; +use sov_schema_db::{define_schema, CodecError, Operation, Schema, DB}; + +define_schema!(TestSchema1, TestField, TestField, "TestCF1"); + +#[derive(Debug, Eq, PartialEq, Clone)] +pub(crate) struct TestField(u32); + +impl TestField { + fn to_bytes(&self) -> Vec { + self.0.to_be_bytes().to_vec() + } + + fn from_bytes(data: &[u8]) -> sov_schema_db::schema::Result { + let mut reader = std::io::Cursor::new(data); + Ok(TestField( + reader + .read_u32::() + .map_err(|e| CodecError::Wrapped(e.into()))?, + )) + } +} + +impl KeyEncoder for TestField { + fn encode_key(&self) -> sov_schema_db::schema::Result> { + Ok(self.to_bytes()) + } +} + +impl KeyDecoder for TestField { + fn decode_key(data: &[u8]) -> sov_schema_db::schema::Result { + Self::from_bytes(data) + } +} + +impl ValueCodec for TestField { + fn encode_value(&self) -> sov_schema_db::schema::Result> { + Ok(self.to_bytes()) + } + + fn decode_value(data: &[u8]) -> sov_schema_db::schema::Result { + Self::from_bytes(data) + } +} + +#[derive(Default)] +struct LinearSnapshotManager { + snapshots: Vec, +} + +impl LinearSnapshotManager { + fn add_snapshot(&mut self, snapshot: FrozenDbSnapshot) { + self.snapshots.push(snapshot); + } +} + +impl QueryManager for LinearSnapshotManager { + fn get( + &self, + snapshot_id: SnapshotId, + key: &impl KeyCodec, + ) -> anyhow::Result> { + for snapshot in self.snapshots[..snapshot_id as usize].iter().rev() { + if let Some(operation) = snapshot.get(key)? { + return Ok(Some(operation)); + } + } + Ok(None) + } +} + +fn get_column_families() -> Vec { + vec![DEFAULT_COLUMN_FAMILY_NAME, TestSchema1::COLUMN_FAMILY_NAME] +} + +fn open_db(dir: impl AsRef) -> DB { + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + + DB::open(dir, "test", get_column_families(), &db_opts).expect("Failed to open DB.") +} + +#[test] +fn snapshot_lifecycle() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = Arc::new(open_db(&tmpdir)); + let manager = Arc::new(RwLock::new(LinearSnapshotManager::default())); + + let key_1 = TestField(1); + let value_1 = TestField(1); + // 1 => 1 in db + db.put::(&key_1, &value_1).unwrap(); + + // Snapshot 1: reads from DB first, then sets 1 => 2 + let snapshot_1 = + DbSnapshot::::new(0, ReadOnlyLock::new(manager.clone()), db.clone()); + assert_eq!( + Some(value_1.clone()), + snapshot_1.read::(&key_1).unwrap(), + "Incorrect value, should be fetched from DB" + ); + // 1 => 2 in snapshot 1 + let value_2 = TestField(2); + snapshot_1.put(&key_1, &value_2).unwrap(); + assert_eq!( + Some(value_2.clone()), + snapshot_1.read::(&key_1).unwrap(), + "Incorrect value, should be fetched from local cache" + ); + { + let mut manager = manager.write().unwrap(); + manager.add_snapshot(snapshot_1.into()); + } + + // Snapshot 2: reads value from snapshot 1, then deletes it + let snapshot_2 = + DbSnapshot::::new(1, ReadOnlyLock::new(manager.clone()), db.clone()); + assert_eq!( + Some(value_2.clone()), + snapshot_2.read::(&key_1).unwrap() + ); + snapshot_2.delete(&key_1).unwrap(); + assert_eq!(None, snapshot_2.read::(&key_1).unwrap()); + { + let mut manager = manager.write().unwrap(); + manager.add_snapshot(snapshot_2.into()); + } + + // Snapshot 3: gets empty result, event value is still in DB, but it was deleted in previous snapshot + let snapshot_3 = + DbSnapshot::::new(2, ReadOnlyLock::new(manager.clone()), db.clone()); + assert_eq!(None, snapshot_3.read::(&key_1).unwrap()); +} From cb970639b6909fd67feb52b488cc7d5a3a8d072f Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Fri, 27 Oct 2023 12:16:36 +0200 Subject: [PATCH 06/12] Formatting --- full-node/db/sov-schema-db/src/snapshot.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/full-node/db/sov-schema-db/src/snapshot.rs b/full-node/db/sov-schema-db/src/snapshot.rs index 4b57cb29a..f6883ad64 100644 --- a/full-node/db/sov-schema-db/src/snapshot.rs +++ b/full-node/db/sov-schema-db/src/snapshot.rs @@ -74,7 +74,7 @@ impl DbSnapshot { return decode_operation::(operation); } - // Check parent + // 2. Check parent { let parent = self .manager @@ -85,8 +85,7 @@ impl DbSnapshot { } } - // Check db - + // 3. Check db self.db_reader.get(key) } From 0a342e1f77b0c9b2bac1cf2dfa25df1dd55d893f Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Fri, 27 Oct 2023 12:28:32 +0200 Subject: [PATCH 07/12] Remove unrelated TODO --- full-node/db/sov-schema-db/src/lib.rs | 1 - full-node/db/sov-schema-db/src/snapshot.rs | 3 --- 2 files changed, 4 deletions(-) diff --git a/full-node/db/sov-schema-db/src/lib.rs b/full-node/db/sov-schema-db/src/lib.rs index 5270d193c..ae3cb16d8 100644 --- a/full-node/db/sov-schema-db/src/lib.rs +++ b/full-node/db/sov-schema-db/src/lib.rs @@ -285,7 +285,6 @@ pub enum Operation { /// they are added to the [`SchemaBatch`]. #[derive(Debug, Default)] pub struct SchemaBatch { - // TODO: Why do we need a mutex here? last_writes: HashMap>, } diff --git a/full-node/db/sov-schema-db/src/snapshot.rs b/full-node/db/sov-schema-db/src/snapshot.rs index f6883ad64..ad5e04336 100644 --- a/full-node/db/sov-schema-db/src/snapshot.rs +++ b/full-node/db/sov-schema-db/src/snapshot.rs @@ -24,7 +24,6 @@ pub struct ReadOnlyLock { } impl ReadOnlyLock { - #[allow(dead_code)] /// Create new [`ReadOnlyLock`] from [`Arc>`]. pub fn new(lock: Arc>) -> Self { Self { lock } @@ -37,7 +36,6 @@ impl ReadOnlyLock { } /// Wrapper around [`DB`] that allows to read from snapshots -#[allow(dead_code)] pub struct DbSnapshot { id: SnapshotId, cache: Mutex, @@ -45,7 +43,6 @@ pub struct DbSnapshot { db_reader: Arc, } -#[allow(dead_code)] impl DbSnapshot { /// Create new [`DbSnapshot`] pub fn new(id: SnapshotId, manager: ReadOnlyLock, db_reader: Arc) -> Self { From 23791e5fa87050eebc938c07b0f90c3aac849300 Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Fri, 27 Oct 2023 12:33:06 +0200 Subject: [PATCH 08/12] I want bigger coverage --- full-node/db/sov-schema-db/src/lib.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/full-node/db/sov-schema-db/src/lib.rs b/full-node/db/sov-schema-db/src/lib.rs index ae3cb16d8..8b9ad1e06 100644 --- a/full-node/db/sov-schema-db/src/lib.rs +++ b/full-node/db/sov-schema-db/src/lib.rs @@ -384,3 +384,25 @@ fn default_write_options() -> rocksdb::WriteOptions { opts.set_sync(true); opts } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_db_debug_output() { + let tmpdir = tempfile::tempdir().unwrap(); + let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME]; + + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + + let db = DB::open(&tmpdir.path(), "test_db_debug", column_families, &db_opts) + .expect("Failed to open DB."); + + let db_debug = format!("{:?}", db); + assert!(db_debug.contains("test_db_debug")); + assert!(db_debug.contains(tmpdir.path().to_str().unwrap())); + } +} From ca4ff6da28e4e7db2f49a48e4ecd63abfb651f04 Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Fri, 27 Oct 2023 12:41:48 +0200 Subject: [PATCH 09/12] Fix lint! --- full-node/db/sov-schema-db/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/full-node/db/sov-schema-db/src/lib.rs b/full-node/db/sov-schema-db/src/lib.rs index 8b9ad1e06..0532e392c 100644 --- a/full-node/db/sov-schema-db/src/lib.rs +++ b/full-node/db/sov-schema-db/src/lib.rs @@ -398,7 +398,7 @@ mod tests { db_opts.create_if_missing(true); db_opts.create_missing_column_families(true); - let db = DB::open(&tmpdir.path(), "test_db_debug", column_families, &db_opts) + let db = DB::open(tmpdir.path(), "test_db_debug", column_families, &db_opts) .expect("Failed to open DB."); let db_debug = format!("{:?}", db); From 3352ccff4e05de69f915a859289bcd5b6f2a8a55 Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Fri, 27 Oct 2023 13:37:01 +0200 Subject: [PATCH 10/12] Simplify snapshot get --- full-node/db/sov-schema-db/src/snapshot.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/full-node/db/sov-schema-db/src/snapshot.rs b/full-node/db/sov-schema-db/src/snapshot.rs index ad5e04336..288cfc044 100644 --- a/full-node/db/sov-schema-db/src/snapshot.rs +++ b/full-node/db/sov-schema-db/src/snapshot.rs @@ -116,11 +116,7 @@ pub struct FrozenDbSnapshot { impl FrozenDbSnapshot { /// Get value from its own cache pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { - if let Some(operation) = self.cache.read(key)? { - return Ok(Some(operation)); - } - - Ok(None) + Ok(self.cache.read(key)?) } /// Get id of this Snapshot From 9c5c303b5116e3e9d06d4d3613b31c842321db6d Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Fri, 27 Oct 2023 14:54:08 +0200 Subject: [PATCH 11/12] Lint fix and explicit hold of the lock --- full-node/db/sov-schema-db/src/snapshot.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/full-node/db/sov-schema-db/src/snapshot.rs b/full-node/db/sov-schema-db/src/snapshot.rs index 288cfc044..f5bec8061 100644 --- a/full-node/db/sov-schema-db/src/snapshot.rs +++ b/full-node/db/sov-schema-db/src/snapshot.rs @@ -61,13 +61,14 @@ impl DbSnapshot { // Only in case of not finding operation for key, // we go deeper - // 1. Check in cache - if let Some(operation) = self + // Hold local cache lock explicitly, so reads are atomic + let local_cache = self .cache .lock() - .expect("SchemaBatch lock should not be poisoned") - .read(key)? - { + .expect("SchemaBatch lock should not be poisoned"); + + // 1. Check in cache + if let Some(operation) = local_cache.read(key)? { return decode_operation::(operation); } @@ -116,7 +117,7 @@ pub struct FrozenDbSnapshot { impl FrozenDbSnapshot { /// Get value from its own cache pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { - Ok(self.cache.read(key)?) + self.cache.read(key) } /// Get id of this Snapshot From 693724f385c7aa4c1b636c401e22d8463abada3a Mon Sep 17 00:00:00 2001 From: Nikolai Golub Date: Mon, 30 Oct 2023 12:20:20 +0100 Subject: [PATCH 12/12] Change QueryManager interface and DbSnapshot implementation --- full-node/db/sov-schema-db/src/snapshot.rs | 33 ++++----- .../db/sov-schema-db/tests/snapshot_test.rs | 67 +++++++------------ 2 files changed, 37 insertions(+), 63 deletions(-) diff --git a/full-node/db/sov-schema-db/src/snapshot.rs b/full-node/db/sov-schema-db/src/snapshot.rs index f5bec8061..6bfbe3f7b 100644 --- a/full-node/db/sov-schema-db/src/snapshot.rs +++ b/full-node/db/sov-schema-db/src/snapshot.rs @@ -3,19 +3,19 @@ use std::sync::{Arc, LockResult, Mutex, RwLock, RwLockReadGuard}; use crate::schema::{KeyCodec, ValueCodec}; -use crate::{Operation, Schema, SchemaBatch, DB}; +use crate::{Operation, Schema, SchemaBatch}; /// Id of database snapshot pub type SnapshotId = u64; -/// A trait to make nested calls to several [`Schema`] +/// A trait to make nested calls to several [`SchemaBatch`]s and eventually [`crate::DB`] pub trait QueryManager { /// Get a value from snapshot or its parents fn get( &self, snapshot_id: SnapshotId, key: &impl KeyCodec, - ) -> anyhow::Result>; + ) -> anyhow::Result>; } /// Simple wrapper around `RwLock` that only allows read access. @@ -35,22 +35,20 @@ impl ReadOnlyLock { } } -/// Wrapper around [`DB`] that allows to read from snapshots +/// Wrapper around [`QueryManager`] that allows to read from snapshots pub struct DbSnapshot { id: SnapshotId, cache: Mutex, - manager: ReadOnlyLock, - db_reader: Arc, + parents_manager: ReadOnlyLock, } impl DbSnapshot { /// Create new [`DbSnapshot`] - pub fn new(id: SnapshotId, manager: ReadOnlyLock, db_reader: Arc) -> Self { + pub fn new(id: SnapshotId, manager: ReadOnlyLock) -> Self { Self { id, cache: Mutex::new(SchemaBatch::default()), - manager, - db_reader, + parents_manager: manager, } } @@ -73,18 +71,11 @@ impl DbSnapshot { } // 2. Check parent - { - let parent = self - .manager - .read() - .expect("Parent lock must not be poisoned"); - if let Some(operation) = parent.get::(self.id, key)? { - return decode_operation::(operation); - } - } - - // 3. Check db - self.db_reader.get(key) + let parent = self + .parents_manager + .read() + .expect("Parent lock must not be poisoned"); + parent.get::(self.id, key) } /// Store a value in snapshot diff --git a/full-node/db/sov-schema-db/tests/snapshot_test.rs b/full-node/db/sov-schema-db/tests/snapshot_test.rs index b37f5f2e9..24df1a7a2 100644 --- a/full-node/db/sov-schema-db/tests/snapshot_test.rs +++ b/full-node/db/sov-schema-db/tests/snapshot_test.rs @@ -1,13 +1,11 @@ -use std::path::Path; use std::sync::{Arc, RwLock}; use byteorder::{BigEndian, ReadBytesExt}; -use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; -use sov_schema_db::schema::{ColumnFamilyName, KeyCodec, KeyDecoder, KeyEncoder, ValueCodec}; +use sov_schema_db::schema::{KeyCodec, KeyDecoder, KeyEncoder, ValueCodec}; use sov_schema_db::snapshot::{ DbSnapshot, FrozenDbSnapshot, QueryManager, ReadOnlyLock, SnapshotId, }; -use sov_schema_db::{define_schema, CodecError, Operation, Schema, DB}; +use sov_schema_db::{define_schema, CodecError, Operation, Schema}; define_schema!(TestSchema1, TestField, TestField, "TestCF1"); @@ -67,53 +65,38 @@ impl QueryManager for LinearSnapshotManager { &self, snapshot_id: SnapshotId, key: &impl KeyCodec, - ) -> anyhow::Result> { + ) -> anyhow::Result> { for snapshot in self.snapshots[..snapshot_id as usize].iter().rev() { if let Some(operation) = snapshot.get(key)? { - return Ok(Some(operation)); + return match operation { + Operation::Put { value } => Ok(Some(S::Value::decode_value(&value)?)), + Operation::Delete => Ok(None), + }; } } Ok(None) } } -fn get_column_families() -> Vec { - vec![DEFAULT_COLUMN_FAMILY_NAME, TestSchema1::COLUMN_FAMILY_NAME] -} - -fn open_db(dir: impl AsRef) -> DB { - let mut db_opts = rocksdb::Options::default(); - db_opts.create_if_missing(true); - db_opts.create_missing_column_families(true); - - DB::open(dir, "test", get_column_families(), &db_opts).expect("Failed to open DB.") -} - #[test] fn snapshot_lifecycle() { - let tmpdir = tempfile::tempdir().unwrap(); - let db = Arc::new(open_db(&tmpdir)); let manager = Arc::new(RwLock::new(LinearSnapshotManager::default())); - let key_1 = TestField(1); - let value_1 = TestField(1); - // 1 => 1 in db - db.put::(&key_1, &value_1).unwrap(); + let key = TestField(1); + let value = TestField(1); - // Snapshot 1: reads from DB first, then sets 1 => 2 let snapshot_1 = - DbSnapshot::::new(0, ReadOnlyLock::new(manager.clone()), db.clone()); + DbSnapshot::::new(0, ReadOnlyLock::new(manager.clone())); assert_eq!( - Some(value_1.clone()), - snapshot_1.read::(&key_1).unwrap(), - "Incorrect value, should be fetched from DB" + None, + snapshot_1.read::(&key).unwrap(), + "Incorrect value, should find nothing" ); - // 1 => 2 in snapshot 1 - let value_2 = TestField(2); - snapshot_1.put(&key_1, &value_2).unwrap(); + + snapshot_1.put(&key, &value).unwrap(); assert_eq!( - Some(value_2.clone()), - snapshot_1.read::(&key_1).unwrap(), + Some(value.clone()), + snapshot_1.read::(&key).unwrap(), "Incorrect value, should be fetched from local cache" ); { @@ -123,20 +106,20 @@ fn snapshot_lifecycle() { // Snapshot 2: reads value from snapshot 1, then deletes it let snapshot_2 = - DbSnapshot::::new(1, ReadOnlyLock::new(manager.clone()), db.clone()); + DbSnapshot::::new(1, ReadOnlyLock::new(manager.clone())); assert_eq!( - Some(value_2.clone()), - snapshot_2.read::(&key_1).unwrap() + Some(value.clone()), + snapshot_2.read::(&key).unwrap() ); - snapshot_2.delete(&key_1).unwrap(); - assert_eq!(None, snapshot_2.read::(&key_1).unwrap()); + snapshot_2.delete(&key).unwrap(); + assert_eq!(None, snapshot_2.read::(&key).unwrap()); { let mut manager = manager.write().unwrap(); manager.add_snapshot(snapshot_2.into()); } - // Snapshot 3: gets empty result, event value is still in DB, but it was deleted in previous snapshot + // Snapshot 3: gets empty result, event value is in some previous snapshots let snapshot_3 = - DbSnapshot::::new(2, ReadOnlyLock::new(manager.clone()), db.clone()); - assert_eq!(None, snapshot_3.read::(&key_1).unwrap()); + DbSnapshot::::new(2, ReadOnlyLock::new(manager.clone())); + assert_eq!(None, snapshot_3.read::(&key).unwrap()); }