Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DbSnapshot to sov-db-schema #1112

Merged
merged 12 commits into from
Oct 30, 2023
2 changes: 1 addition & 1 deletion full-node/db/sov-db/src/native_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl NativeDB {
&self,
key_value_pairs: impl IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
) -> anyhow::Result<()> {
let batch = SchemaBatch::default();
let mut batch = SchemaBatch::default();
for (key, value) in key_value_pairs {
batch.put::<ModuleAccessoryState>(&key, &value)?;
}
Expand Down
121 changes: 78 additions & 43 deletions full-node/db/sov-schema-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
mod iterator;
mod metrics;
pub mod schema;
pub mod snapshot;

use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex;

use anyhow::format_err;
use iterator::ScanDirection;
Expand Down Expand Up @@ -144,7 +144,7 @@ impl DB {
) -> anyhow::Result<()> {
// Not necessary to use a batch, but we'd like a central place to bump counters.
// Used in tests only anyway.
let batch = SchemaBatch::new();
let mut batch = SchemaBatch::new();
batch.put::<S>(key, value)?;
self.write_schemas(batch)
}
Expand Down Expand Up @@ -192,15 +192,13 @@ impl DB {
let _timer = SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS
.with_label_values(&[self.name])
.start_timer();
let rows_locked = batch.rows.lock().expect("Lock must not be poisoned");

let mut db_batch = rocksdb::WriteBatch::default();
for (cf_name, rows) in rows_locked.iter() {
for (cf_name, rows) in batch.last_writes.iter() {
let cf_handle = self.get_cf_handle(cf_name)?;
for write_op in rows {
match write_op {
WriteOp::Value { key, value } => db_batch.put_cf(cf_handle, key, value),
WriteOp::Deletion { key } => db_batch.delete_cf(cf_handle, key),
for (key, operation) in rows {
match operation {
Operation::Put { value } => db_batch.put_cf(cf_handle, key, value),
Operation::Delete => db_batch.delete_cf(cf_handle, key),
}
}
}
Expand All @@ -209,15 +207,15 @@ impl DB {
self.inner.write_opt(db_batch, &default_write_options())?;

// Bump counters only after DB write succeeds.
for (cf_name, rows) in rows_locked.iter() {
for write_op in rows {
match write_op {
WriteOp::Value { key, value } => {
for (cf_name, rows) in batch.last_writes.iter() {
for (key, operation) in rows {
match operation {
Operation::Put { value } => {
SCHEMADB_PUT_BYTES
.with_label_values(&[cf_name])
.observe((key.len() + value.len()) as f64);
}
WriteOp::Deletion { key: _ } => {
Operation::Delete => {
SCHEMADB_DELETES.with_label_values(&[cf_name]).inc();
}
}
Expand Down Expand Up @@ -266,19 +264,28 @@ impl DB {
}
}

type SchemaKey = Vec<u8>;
type SchemaValue = Vec<u8>;

#[cfg_attr(feature = "arbitrary", derive(proptest_derive::Arbitrary))]
#[derive(Debug, PartialEq, Eq, Hash)]
enum WriteOp {
Value { key: Vec<u8>, value: Vec<u8> },
Deletion { key: Vec<u8> },
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
/// Represents operation written to the database
pub enum Operation {
/// Writing a value to the DB.
Put {
/// Value to write
value: SchemaValue,
},
/// Deleting a value
Delete,
}

/// [`SchemaBatch`] holds a collection of updates that can be applied to a DB
/// ([`Schema`]) atomically. The updates will be applied in the order in which
/// they are added to the [`SchemaBatch`].
#[derive(Debug, Default)]
pub struct SchemaBatch {
rows: Mutex<HashMap<ColumnFamilyName, Vec<WriteOp>>>,
last_writes: HashMap<ColumnFamilyName, HashMap<SchemaKey, Operation>>,
}

impl SchemaBatch {
Expand All @@ -289,60 +296,66 @@ impl SchemaBatch {

/// Adds an insert/update operation to the batch.
pub fn put<S: Schema>(
&self,
&mut self,
key: &impl KeyCodec<S>,
value: &impl ValueCodec<S>,
) -> anyhow::Result<()> {
let _timer = SCHEMADB_BATCH_PUT_LATENCY_SECONDS
.with_label_values(&["unknown"])
.start_timer();
let key = key.encode_key()?;
let value = value.encode_value()?;
self.rows
.lock()
.expect("Lock must not be poisoned")
.entry(S::COLUMN_FAMILY_NAME)
.or_default()
.push(WriteOp::Value { key, value });

let put_operation = Operation::Put {
value: value.encode_value()?,
};
self.insert_operation::<S>(key, put_operation);
Ok(())
}

/// Adds a delete operation to the batch.
pub fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
pub fn delete<S: Schema>(&mut self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
let key = key.encode_key()?;
self.rows
.lock()
.expect("Lock must not be poisoned")
.entry(S::COLUMN_FAMILY_NAME)
.or_default()
.push(WriteOp::Deletion { key });
self.insert_operation::<S>(key, Operation::Delete);

Ok(())
}

fn insert_operation<S: Schema>(&mut self, key: SchemaKey, operation: Operation) {
let column_writes = self.last_writes.entry(S::COLUMN_FAMILY_NAME).or_default();
column_writes.insert(key, operation);
}

#[allow(dead_code)]
pub(crate) fn read<S: Schema>(
&self,
key: &impl KeyCodec<S>,
) -> anyhow::Result<Option<Operation>> {
let key = key.encode_key()?;
if let Some(column_writes) = self.last_writes.get(&S::COLUMN_FAMILY_NAME) {
return Ok(column_writes.get(&key).cloned());
}
Ok(None)
}
}

#[cfg(feature = "arbitrary")]
impl proptest::arbitrary::Arbitrary for SchemaBatch {
type Parameters = &'static [ColumnFamilyName];
type Strategy = proptest::strategy::BoxedStrategy<Self>;

fn arbitrary_with(columns: Self::Parameters) -> Self::Strategy {
use proptest::prelude::any;
use proptest::strategy::Strategy;

proptest::collection::vec(any::<Vec<WriteOp>>(), columns.len())
proptest::collection::vec(any::<HashMap<SchemaKey, Operation>>(), columns.len())
.prop_map::<SchemaBatch, _>(|vec_vec_write_ops| {
let mut rows = HashMap::new();
for (col, write_ops) in columns.iter().zip(vec_vec_write_ops.into_iter()) {
rows.insert(*col, write_ops);
}
SchemaBatch {
rows: Mutex::new(rows),
for (col, write_op) in columns.iter().zip(vec_vec_write_ops.into_iter()) {
rows.insert(*col, write_op);
}
SchemaBatch { last_writes: rows }
})
.boxed()
}

type Strategy = proptest::strategy::BoxedStrategy<Self>;
}

/// An error that occurred during (de)serialization of a [`Schema`]'s keys or
Expand Down Expand Up @@ -371,3 +384,25 @@ fn default_write_options() -> rocksdb::WriteOptions {
opts.set_sync(true);
opts
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_db_debug_output() {
let tmpdir = tempfile::tempdir().unwrap();
let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME];

let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
db_opts.create_missing_column_families(true);

let db = DB::open(tmpdir.path(), "test_db_debug", column_families, &db_opts)
.expect("Failed to open DB.");

let db_debug = format!("{:?}", db);
assert!(db_debug.contains("test_db_debug"));
assert!(db_debug.contains(tmpdir.path().to_str().unwrap()));
}
}
146 changes: 146 additions & 0 deletions full-node/db/sov-schema-db/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//! Snapshot related logic

use std::sync::{Arc, LockResult, Mutex, RwLock, RwLockReadGuard};

use crate::schema::{KeyCodec, ValueCodec};
use crate::{Operation, Schema, SchemaBatch};

/// Id of database snapshot
pub type SnapshotId = u64;

/// A trait to make nested calls to several [`SchemaBatch`]s and eventually [`crate::DB`]
pub trait QueryManager {
/// Get a value from snapshot or its parents
fn get<S: Schema>(
&self,
snapshot_id: SnapshotId,
key: &impl KeyCodec<S>,
) -> anyhow::Result<Option<S::Value>>;
}

/// Simple wrapper around `RwLock` that only allows read access.
pub struct ReadOnlyLock<T> {
lock: Arc<RwLock<T>>,
}

impl<T> ReadOnlyLock<T> {
/// Create new [`ReadOnlyLock`] from [`Arc<RwLock<T>>`].
pub fn new(lock: Arc<RwLock<T>>) -> Self {
Self { lock }
}

/// Acquires a read lock on the underlying `RwLock`.
pub fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> {
self.lock.read()
}
}

/// Wrapper around [`QueryManager`] that allows to read from snapshots
pub struct DbSnapshot<Q> {
id: SnapshotId,
cache: Mutex<SchemaBatch>,
parents_manager: ReadOnlyLock<Q>,
}

impl<Q: QueryManager> DbSnapshot<Q> {
/// Create new [`DbSnapshot`]
pub fn new(id: SnapshotId, manager: ReadOnlyLock<Q>) -> Self {
Self {
id,
cache: Mutex::new(SchemaBatch::default()),
parents_manager: manager,
}
}

/// Get a value from current snapshot, its parents or underlying database
pub fn read<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<Option<S::Value>> {
// Some(Operation) means that key was touched,
// but in case of deletion we early return None
// Only in case of not finding operation for key,
// we go deeper

// Hold local cache lock explicitly, so reads are atomic
let local_cache = self
.cache
.lock()
citizen-stig marked this conversation as resolved.
Show resolved Hide resolved
.expect("SchemaBatch lock should not be poisoned");

// 1. Check in cache
if let Some(operation) = local_cache.read(key)? {
return decode_operation::<S>(operation);
}

// 2. Check parent
let parent = self
.parents_manager
.read()
.expect("Parent lock must not be poisoned");
parent.get::<S>(self.id, key)
}

/// Store a value in snapshot
pub fn put<S: Schema>(
&self,
key: &impl KeyCodec<S>,
value: &impl ValueCodec<S>,
) -> anyhow::Result<()> {
self.cache
.lock()
.expect("SchemaBatch lock must not be poisoned")
.put(key, value)
}

/// Delete given key from snapshot
pub fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
self.cache
.lock()
.expect("SchemaBatch lock must not be poisoned")
.delete(key)
}
}

/// Read only version of [`DbSnapshot`], for usage inside [`QueryManager`]
pub struct FrozenDbSnapshot {
id: SnapshotId,
cache: SchemaBatch,
}

impl FrozenDbSnapshot {
/// Get value from its own cache
pub fn get<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<Option<Operation>> {
self.cache.read(key)
}

/// Get id of this Snapshot
pub fn get_id(&self) -> SnapshotId {
self.id
}
}

impl<Q> From<DbSnapshot<Q>> for FrozenDbSnapshot {
fn from(snapshot: DbSnapshot<Q>) -> Self {
Self {
id: snapshot.id,
cache: snapshot
.cache
.into_inner()
.expect("SchemaBatch lock must not be poisoned"),
}
}
}

impl From<FrozenDbSnapshot> for SchemaBatch {
fn from(value: FrozenDbSnapshot) -> Self {
value.cache
}
}

fn decode_operation<S: Schema>(operation: Operation) -> anyhow::Result<Option<S::Value>> {
match operation {
Operation::Put { value } => {
let value = S::Value::decode_value(&value)?;
Ok(Some(value))
}
Operation::Delete => Ok(None),
}
}
Loading