From 71747315f28c03a568883b05ae32628455efc74d Mon Sep 17 00:00:00 2001 From: Jordan Santell Date: Wed, 2 Aug 2023 15:13:58 -0700 Subject: [PATCH] feat: sqlite-backed Storage implementation, SqliteStorage. --- .github/workflows/run_test_suite.yaml | 6 +- Cargo.lock | 90 ++++++ rust/noosphere-storage/Cargo.toml | 3 + rust/noosphere-storage/examples/bench/main.rs | 14 +- .../src/implementation/mod.rs | 5 + .../src/implementation/sqlite.rs | 263 ++++++++++++++++++ rust/noosphere/build.rs | 5 +- rust/noosphere/src/platform.rs | 4 + 8 files changed, 380 insertions(+), 10 deletions(-) create mode 100644 rust/noosphere-storage/src/implementation/sqlite.rs diff --git a/.github/workflows/run_test_suite.yaml b/.github/workflows/run_test_suite.yaml index f6311545e..a49919eb5 100644 --- a/.github/workflows/run_test_suite.yaml +++ b/.github/workflows/run_test_suite.yaml @@ -98,7 +98,7 @@ jobs: - name: 'Run Rust native target tests' run: NOOSPHERE_LOG=deafening cargo test --features test-kubo,headers - run-test-suite-linux-rocksdb: + run-test-suite-linux-sqlite: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -115,8 +115,8 @@ jobs: with: ipfs_version: v0.17.0 run_daemon: true - - name: 'Run Rust native target tests (RocksDB)' - run: NOOSPHERE_LOG=defeaning cargo test -p noosphere -p noosphere-storage --features rocksdb,test-kubo + - name: 'Run Rust native target tests (Sqlite)' + run: NOOSPHERE_LOG=defeaning cargo test -p noosphere -p noosphere-storage --features sqlite,test-kubo run-test-suite-linux-c: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index d2cc1e7db..f14e61b33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -159,6 +159,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "anstream" version = "0.5.0" @@ -1582,6 +1588,18 @@ dependencies = [ "ext-trait", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastcdc" version = "3.1.0" @@ -2015,6 +2033,19 @@ name = "hashbrown" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +dependencies = [ + "ahash 0.8.3", + "allocator-api2", +] + +[[package]] +name = "hashlink" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +dependencies = [ + "hashbrown 0.14.0", +] [[package]] name = "headers" @@ -2993,6 +3024,17 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "libsqlite3-sys" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afc22eff61b133b115c6e8c74e818c628d6d5e7a502afea6f64dee076dd94326" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.12" @@ -3716,6 +3758,8 @@ dependencies = [ "libipld-core", "noosphere-common", "noosphere-core", + "r2d2", + "r2d2_sqlite", "rand", "rexie", "rocksdb", @@ -4329,6 +4373,28 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot 0.12.1", + "scheduled-thread-pool", +] + +[[package]] +name = "r2d2_sqlite" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99f31323d6161385f385046738df520e0e8694fa74852d35891fc0be08348ddc" +dependencies = [ + "r2d2", + "rusqlite", + "uuid", +] + [[package]] name = "rand" version = "0.8.5" @@ -4640,6 +4706,20 @@ dependencies = [ "webrtc-util", ] +[[package]] +name = "rusqlite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "549b9d036d571d42e6e85d1c1425e2ac83491075078ca9a15be021c56b1641f2" +dependencies = [ + "bitflags 2.4.0", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -4815,6 +4895,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot 0.12.1", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -6024,6 +6113,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom 0.2.10", + "rand", "serde", ] diff --git a/rust/noosphere-storage/Cargo.toml b/rust/noosphere-storage/Cargo.toml index b47f66c7f..cc88088f8 100644 --- a/rust/noosphere-storage/Cargo.toml +++ b/rust/noosphere-storage/Cargo.toml @@ -31,6 +31,8 @@ libipld-cbor = { workspace = true } serde = { workspace = true } base64 = "=0.21.4" url = { version = "^2" } +r2d2 = { version = "0.8.10", optional = true } +r2d2_sqlite = { version = "0.22.0", optional = true, features = ["bundled"] } [dev-dependencies] witty-phrase-generator = "~0.2" @@ -69,6 +71,7 @@ features = [ default = [] rocksdb = ["dep:rocksdb"] rocksdb-multi-thread = ["dep:rocksdb"] +sqlite = ["dep:r2d2", "dep:r2d2_sqlite"] [[example]] name = "bench" diff --git a/rust/noosphere-storage/examples/bench/main.rs b/rust/noosphere-storage/examples/bench/main.rs index 36484568e..0cacc492c 100644 --- a/rust/noosphere-storage/examples/bench/main.rs +++ b/rust/noosphere-storage/examples/bench/main.rs @@ -102,11 +102,7 @@ async fn create_sphere_with_large_files(db: SphereDb) - type ActiveStoragePrimitive = noosphere_storage::SledStorage; #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] type ActiveStoragePrimitive = noosphere_storage::RocksDbStorage; -#[cfg(all( - not(target_arch = "wasm32"), - feature = "sqlite", - not(feature = "rocksdb") -))] +#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))] type ActiveStoragePrimitive = noosphere_storage::SqliteStorage; #[cfg(target_arch = "wasm32")] type ActiveStoragePrimitive = noosphere_storage::IndexedDbStorage; @@ -145,6 +141,14 @@ impl BenchmarkStorage { ) }; + #[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))] + let (storage, storage_name) = { + ( + noosphere_storage::SqliteStorage::new(&storage_path)?, + "SqliteStorage", + ) + }; + #[cfg(target_arch = "wasm32")] let (storage, storage_name) = { let temp_name: String = witty_phrase_generator::WPGen::new() diff --git a/rust/noosphere-storage/src/implementation/mod.rs b/rust/noosphere-storage/src/implementation/mod.rs index c6491d6f5..8e960acf6 100644 --- a/rust/noosphere-storage/src/implementation/mod.rs +++ b/rust/noosphere-storage/src/implementation/mod.rs @@ -18,3 +18,8 @@ pub use rocks_db::*; mod indexed_db; #[cfg(target_arch = "wasm32")] pub use indexed_db::*; + +#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))] +mod sqlite; +#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))] +pub use sqlite::*; diff --git a/rust/noosphere-storage/src/implementation/sqlite.rs b/rust/noosphere-storage/src/implementation/sqlite.rs new file mode 100644 index 000000000..40dbe77ab --- /dev/null +++ b/rust/noosphere-storage/src/implementation/sqlite.rs @@ -0,0 +1,263 @@ +use crate::{ + config::{ConfigurableStorage, StorageConfig}, + storage::Storage, + store::Store, + SPHERE_DB_STORE_NAMES, +}; +use anyhow::{anyhow, Error, Result}; +use async_trait::async_trait; +use noosphere_common::ConditionalSend; +use r2d2::{Pool, PooledConnection}; +use r2d2_sqlite::{rusqlite, SqliteConnectionManager}; +use rusqlite::{params, OptionalExtension, Transaction}; +use std::{ + fmt::Display, + path::{Path, PathBuf}, + sync::Arc, +}; + +/// Within the directory provided to [SqliteStorage], the +/// name of the sqlite database file. +const SQLITE_DB_PATH: &str = "database.db"; + +/// Sqlite-backed [Storage] implementation. +#[derive(Clone, Debug)] +pub struct SqliteStorage { + client: Arc, +} + +impl SqliteStorage { + pub fn new>(path: P) -> Result { + Self::with_config(path, StorageConfig::default()) + } + + pub fn with_config>(path: P, storage_config: StorageConfig) -> Result { + let client = Arc::new(SqliteClient::new(path.as_ref().to_owned(), storage_config)?); + Ok(SqliteStorage { client }) + } +} + +#[async_trait] +impl ConfigurableStorage for SqliteStorage { + async fn open_with_config + ConditionalSend>( + path: P, + storage_config: StorageConfig, + ) -> Result { + Self::with_config(path, storage_config) + } +} + +#[async_trait] +impl Storage for SqliteStorage { + type BlockStore = SqliteStore; + + type KeyValueStore = SqliteStore; + + async fn get_block_store(&self, name: &str) -> Result { + SqliteStore::new(self.client.clone(), name) + } + + async fn get_key_value_store(&self, name: &str) -> Result { + SqliteStore::new(self.client.clone(), name) + } +} + +#[derive(Clone)] +pub struct SqliteStore { + client: Arc, + queries: Arc, +} + +impl SqliteStore { + fn new(client: Arc, table_name: &str) -> Result { + let queries = Arc::new(Queries::new(table_name)?); + client.transaction(|tx| { + tx.prepare_cached(&Queries::generate_create_table_query(table_name))? + .execute(())?; + Ok(None) + })?; + Ok(SqliteStore { client, queries }) + } +} + +#[async_trait] +impl Store for SqliteStore { + async fn read(&self, key: &[u8]) -> Result>> { + self.client.transaction(|tx| { + let value = tx + .prepare_cached(self.queries.read_query())? + .query_row(params![key], |r| r.get(0)) + .optional()?; + Ok(value) + }) + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.client.transaction(|tx| { + let previous = tx + .prepare_cached(self.queries.read_query())? + .query_row(params![key], |r| r.get(0)) + .optional()?; + tx.prepare_cached(self.queries.write_query())? + .execute(params![key, bytes])?; + Ok(previous) + }) + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.client.transaction(|tx| { + let previous = tx + .prepare_cached(self.queries.read_query())? + .query_row(params![key], |r| r.get(0)) + .optional()?; + tx.prepare_cached(self.queries.delete_query())? + .execute(params![key])?; + Ok(previous) + }) + } + + async fn flush(&self) -> Result<()> { + Ok(()) + } +} + +/// Manages a connection pool for a database. +#[derive(Debug)] +pub struct SqliteClient { + pub(crate) path: PathBuf, + connection_pool: Pool, +} + +impl SqliteClient { + /// Create a new [SqliteClient], opening a pool of connections + /// for database at `path`. + pub fn new(path: PathBuf, config: StorageConfig) -> Result { + std::fs::create_dir_all(&path)?; + let connection_pool = Pool::builder() + .build(SqliteConnectionManager::file(path.join(SQLITE_DB_PATH))) + .map_err(rusqlite_into_anyhow)?; + let client = SqliteClient { + path, + connection_pool, + }; + + { + let db = client.db()?; + (match db + .pragma_update_and_check(None, "journal_mode", "wal", |row| { + row.get::(0) + })? + .as_ref() + { + "wal" => Ok(()), + _ => Err(anyhow!("Could not set journal_mode to WAL.")), + })?; + + if let Some(memory_cache_limit) = config.memory_cache_limit { + if memory_cache_limit >= 1024 { + // `cache_size` pragma takes either the number of pages to use in memory cache, + // or a negative value, indicating how many kibibytes should be used, deriving a page + // value from that limit. If we configure the page size after this, we should + // re-set the cache size. + // https://www.sqlite.org/pragma.html#pragma_cache_size + let kb = memory_cache_limit / 1024; + db.pragma_update_and_check(None, "cache_size", format!("-{}", kb), |_| Ok(()))?; + } + } + } + Ok(client) + } + + /// Get a [rusqlite::Connection] from the pool. + pub fn db(&self) -> Result> { + self.connection_pool.get().map_err(rusqlite_into_anyhow) + } + + /// Starts a [Transaction], passing it into the provided `callback`. + /// On success completion, the transaction is committed. + pub fn transaction(&self, callback: F) -> Result>> + where + F: FnOnce(&mut Transaction) -> Result>, rusqlite::Error>, + { + let mut db = self.db()?; + let mut tx = db.transaction().map_err(rusqlite_into_anyhow)?; + let result = callback(&mut tx).map_err(rusqlite_into_anyhow); + tx.commit().map_err(rusqlite_into_anyhow)?; + result + } +} + +/// Provides cached queries into table operations, and +/// static methods that generate queries. +/// +/// The purpose here is two-fold: While [rusqlite] provides +/// safe parameter interpolation, this doesn't cover e.g. dynamic table names, +/// so we cache them here rather than recreating on every query. Additionally, +/// the static methods consolidate our queries to one place for further inspection. +struct Queries { + read_stmt: String, + write_stmt: String, + delete_stmt: String, +} + +impl Queries { + fn new(table_name: &str) -> Result { + Queries::is_sanitized(table_name)?; + + let read_stmt = format!("SELECT VALUE FROM {} WHERE key = $1", table_name); + let write_stmt = format!( + "INSERT INTO {} (key, value) + VALUES($1, $2) + ON CONFLICT(key) + DO UPDATE SET value=excluded.value", + table_name + ); + let delete_stmt = format!("DELETE FROM {} WHERE key = $1", table_name); + Ok(Queries { + read_stmt, + write_stmt, + delete_stmt, + }) + } + + pub fn generate_create_table_query(table_name: &str) -> String { + format!( + "CREATE TABLE IF NOT EXISTS {} ( + _id INTEGER PRIMARY KEY, + key TEXT NOT NULL UNIQUE, + value BLOB + )", + table_name + ) + } + + pub fn read_query(&self) -> &str { + &self.read_stmt + } + pub fn write_query(&self) -> &str { + &self.write_stmt + } + pub fn delete_query(&self) -> &str { + &self.delete_stmt + } + + /// Only allow SphereDb table names. Can expand in the future, + /// but must be sanitized. + fn is_sanitized(table_name: &str) -> Result<()> { + match SPHERE_DB_STORE_NAMES.contains(&table_name) { + true => Ok(()), + false => Err(anyhow!("Invalid table name.")), + } + } +} + +#[async_trait] +impl crate::Space for SqliteStorage { + async fn get_space_usage(&self) -> Result { + crate::get_dir_size(&self.client.path).await + } +} + +fn rusqlite_into_anyhow(error: T) -> Error { + anyhow!(error.to_string()) +} diff --git a/rust/noosphere/build.rs b/rust/noosphere/build.rs index 3747db77c..da759e519 100644 --- a/rust/noosphere/build.rs +++ b/rust/noosphere/build.rs @@ -13,8 +13,9 @@ fn main() { }, // Backends - rocksdb: { all(feature = "rocksdb", native) }, - sled: { all(not(any(rocksdb)), native) }, + rocksdb: { all(feature = "rocksdb", native, not(sqlite)) }, + sqlite: { all(feature = "sqlite", native) }, + sled: { all(not(any(sqlite, rocksdb)), native) }, indexeddb: { wasm }, // Other diff --git a/rust/noosphere/src/platform.rs b/rust/noosphere/src/platform.rs index af409887a..251697e00 100644 --- a/rust/noosphere/src/platform.rs +++ b/rust/noosphere/src/platform.rs @@ -20,6 +20,8 @@ mod inner { pub(crate) type PrimitiveStorage = noosphere_storage::SledStorage; #[cfg(rocksdb)] pub(crate) type PrimitiveStorage = noosphere_storage::RocksDbStorage; + #[cfg(sqlite)] + pub(crate) type PrimitiveStorage = noosphere_storage::SqliteStorage; #[cfg(not(ipfs_storage))] pub type PlatformStorage = PrimitiveStorage; @@ -115,6 +117,8 @@ mod inner { pub(crate) type PrimitiveStorage = noosphere_storage::SledStorage; #[cfg(rocksdb)] pub(crate) type PrimitiveStorage = noosphere_storage::RocksDbStorage; + #[cfg(sqlite)] + pub(crate) type PrimitiveStorage = noosphere_storage::SqliteStorage; /// The default backing [noosphere_storage::Storage] in use for this /// platform