Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sqllite as an option to store signing keys #723

Merged
merged 8 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
447 changes: 433 additions & 14 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ bytes = { version = "1.4.0", default-features = false }
bincode2 = { version = "2.0.1", default-features = false }
structopt = { version = "0.3.26", default-features = false }
hash-db = { version = "0.16.0", default-features = false }
sqlx = { version = "0.7.1", default-features = false, features = ["macros"] }

[profile.release]
panic = 'unwind'
Binary file added alice
Binary file not shown.
3 changes: 2 additions & 1 deletion dkg-gadget/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ itertools = { workspace = true }
sync_wrapper = { workspace = true }
async-stream = { workspace = true }
lazy_static = { workspace = true }

log = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-native-tls", "sqlite", "json"] }
hash-db = { workspace = true, optional = true }
webb-proposals = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion dkg-gadget/src/async_protocols/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use curv::elliptic::curves::Secp256k1;
use dkg_runtime_primitives::{SessionId, StoredUnsignedProposalBatch};
use multi_party_ecdsa::protocols::multi_party_ecdsa::gg_2020::state_machine::keygen::LocalKey;
use serde::{Deserialize, Serialize};
use sqlx::{FromRow, Sqlite};
use std::fmt::{Debug, Formatter};
use wsts::{
common::{PolyCommitment, Signature},
Expand All @@ -17,7 +18,6 @@ pub enum LocalKeyType {
ECDSA(LocalKey<Secp256k1>),
FROST(Vec<PolyCommitment>, PartyState),
}

impl Debug for LocalKeyType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
2 changes: 2 additions & 0 deletions dkg-gadget/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use dkg_primitives::{types::DKGError, SessionId};

mod mem;
mod offchain_storage;
mod sql_storage;

use crate::async_protocols::types::LocalKeyType;
pub use mem::DKGInMemoryDb;
pub use offchain_storage::DKGOffchainStorageDb;
pub use sql_storage::{BackendConfig, SqlBackend, SqliteBackendConfig};

/// A Database backend, specifically for the DKG to store and load important state
///
Expand Down
199 changes: 199 additions & 0 deletions dkg-gadget/src/db/sql_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use super::*;
use crate::{async_protocols::types::LocalKeyType, debug_logger::DebugLogger, DKGKeystore};
use dkg_primitives::{
types::DKGError,
utils::{decrypt_data, encrypt_data},
SessionId,
};
use dkg_runtime_primitives::offchain::crypto::{Pair as AppPair, Public};
use sc_client_api::Backend;
use sc_keystore::LocalKeystore;
use sp_core::{offchain::OffchainStorage, Pair};
use sp_runtime::{
generic::BlockId,
traits::{BlakeTwo256, Block as BlockT, Header as HeaderT, UniqueSaturatedInto, Zero},
};
use sqlx::{
query::Query,
sqlite::{
SqliteArguments, SqliteConnectOptions, SqlitePool, SqlitePoolOptions, SqliteQueryResult,
},
ConnectOptions, Error, Execute, QueryBuilder, Row, Sqlite,
};
use std::{cmp::Ordering, collections::HashSet, num::NonZeroU32, str::FromStr, sync::Arc};

// Represents the Sqlite connection options that are
/// used to establish a database connection.
#[derive(Debug)]
pub struct SqliteBackendConfig<'a> {
pub path: &'a str,
pub create_if_missing: bool,
pub thread_count: u32,
pub cache_size: u64,
}

/// Represents the backend configurations.
#[derive(Debug)]
pub enum BackendConfig<'a> {
Sqlite(SqliteBackendConfig<'a>),
}

#[derive(Clone)]
pub struct SqlBackend {
/// The Sqlite connection.
pool: SqlitePool,

/// The number of allowed operations for the Sqlite filter call.
/// A value of `0` disables the timeout.
num_ops_timeout: i32,
}

impl SqlBackend {
pub fn new(
config: BackendConfig<'_>,
pool_size: u32,
num_ops_timeout: Option<NonZeroU32>,
) -> Result<Self, Error> {
futures::executor::block_on(async {
Self::new_inner(config, pool_size, num_ops_timeout).await
})
}
/// Creates a new instance of the SQL backend.
pub async fn new_inner(
config: BackendConfig<'_>,
pool_size: u32,
num_ops_timeout: Option<NonZeroU32>,
) -> Result<Self, Error> {
let any_pool = SqlitePoolOptions::new()
.max_connections(pool_size)
.connect_lazy_with(Self::connect_options(&config)?.disable_statement_logging());
let _ = Self::create_database_if_not_exists(&any_pool).await?;
let _ = Self::create_indexes_if_not_exist(&any_pool).await?;
Ok(Self {
pool: any_pool,
num_ops_timeout: num_ops_timeout
.map(|n| n.get())
.unwrap_or(0)
.try_into()
.unwrap_or(i32::MAX),
})
}

fn connect_options(config: &BackendConfig) -> Result<SqliteConnectOptions, Error> {
match config {
BackendConfig::Sqlite(config) => {
log::info!(target: "dkg-gadget", "📑 Connection configuration: {config:?}");
let config = sqlx::sqlite::SqliteConnectOptions::from_str(config.path)?
.create_if_missing(config.create_if_missing)
// https://www.sqlite.org/pragma.html#pragma_busy_timeout
.busy_timeout(std::time::Duration::from_secs(8))
// 200MB, https://www.sqlite.org/pragma.html#pragma_cache_size
.pragma("cache_size", format!("-{}", config.cache_size))
// https://www.sqlite.org/pragma.html#pragma_analysis_limit
.pragma("analysis_limit", "1000")
// https://www.sqlite.org/pragma.html#pragma_threads
.pragma("threads", config.thread_count.to_string())
// https://www.sqlite.org/pragma.html#pragma_threads
.pragma("temp_store", "memory")
// https://www.sqlite.org/wal.html
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
// https://www.sqlite.org/pragma.html#pragma_synchronous
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
Ok(config)
},
}
}

/// Get the underlying Sqlite pool.
pub fn pool(&self) -> &SqlitePool {
&self.pool
}

/// Create the Sqlite database if it does not already exist.
async fn create_database_if_not_exists(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
sqlx::query(
"BEGIN;
CREATE TABLE IF NOT EXISTS dkg_keys (
id INTEGER PRIMARY KEY,
session_id INTEGER NOT NULL,
local_key BLOB NOT NULL,
);
COMMIT;",
)
.execute(pool)
.await
}

/// Create the Sqlite database indices if it does not already exist.
async fn create_indexes_if_not_exist(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
sqlx::query(
"BEGIN;
CREATE INDEX IF NOT EXISTS session_id_index ON dkg_keys (
session_id
);
COMMIT;",
)
.execute(pool)
.await
}

async fn get_local_key(&self, session_id: SessionId) -> Result<Option<LocalKeyType>, DKGError> {
log::info!("{}", format!("SQL Storage : Fetching local keys for session {session_id:?}"));
let session_id: i64 = session_id as i64;
match sqlx::query("SELECT local_key FROM dkg_keys WHERE session_id = ?")
.bind(session_id)
.fetch_optional(self.pool())
.await
{
Ok(result) => {
if let Some(row) = result {
let local_key_json: String = row.get(0);
let local_key: LocalKeyType = serde_json::from_str(&local_key_json).unwrap();
return Ok(Some(local_key))
}
return Err(DKGError::LocalKeyNotFound)
},
Err(err) => {
log::debug!(target: "dkg-gadget", "Failed retrieving key for session_id {err:?}");
return Err(DKGError::LocalKeyNotFound)
},
}
}

async fn store_local_key(
&self,
session_id: SessionId,
local_key: LocalKeyType,
) -> Result<(), DKGError> {
log::info!(
"{}",
format!(
"SQL Storage : Store local keys for session {session_id:?}, Key : {local_key:?}"
)
);
let session_id: i64 = session_id as i64;
let mut tx = self.pool().begin().await.map_err(|_| DKGError::StoringLocalKeyFailed)?;
let local_key = serde_json::to_string(&local_key).unwrap();
sqlx::query("INSERT INTO dkg_keys(session_id, local_key) VALUES (?)")
.bind(session_id)
.bind(local_key)
.execute(&mut *tx)
.await
.map_err(|_| DKGError::StoringLocalKeyFailed)?;
tx.commit().await.map_err(|_| DKGError::StoringLocalKeyFailed)
}
}

impl super::DKGDbBackend for SqlBackend {
fn get_local_key(&self, session_id: SessionId) -> Result<Option<LocalKeyType>, DKGError> {
futures::executor::block_on(async { self.get_local_key(session_id).await })
}

fn store_local_key(
&self,
session_id: SessionId,
local_key: LocalKeyType,
) -> Result<(), DKGError> {
futures::executor::block_on(async { self.store_local_key(session_id, local_key).await })
}
}
40 changes: 30 additions & 10 deletions dkg-gadget/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{marker::PhantomData, sync::Arc};

use debug_logger::DebugLogger;
use dkg_runtime_primitives::{crypto::AuthorityId, DKGApi, MaxAuthorities, MaxProposalLength};
use parking_lot::RwLock;
Expand All @@ -26,6 +23,11 @@ use sp_api::{NumberFor, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::Block;
use std::{
marker::PhantomData,
path::{Path, PathBuf},
sync::Arc,
};

mod error;
/// Stores keypairs for DKG
Expand Down Expand Up @@ -107,6 +109,8 @@ where
pub prometheus_registry: Option<Registry>,
/// For logging
pub debug_logger: DebugLogger,
/// database path
pub db_path: PathBuf,
/// Phantom block type
pub _block: PhantomData<B>,
}
Expand Down Expand Up @@ -134,6 +138,7 @@ where
local_keystore,
_block,
debug_logger,
db_path,
} = dkg_params;

let dkg_keystore: DKGKeystore = DKGKeystore::new(key_store, debug_logger.clone());
Expand Down Expand Up @@ -180,13 +185,28 @@ where

// In memory backend, not used for now
// let db_backend = Arc::new(db::DKGInMemoryDb::new());
let offchain_db_backend = db::DKGOffchainStorageDb::new(
backend.clone(),
dkg_keystore.clone(),
local_keystore.clone(),
debug_logger.clone(),
);
let db_backend = Arc::new(offchain_db_backend);
// let offchain_db_backend = db::DKGOffchainStorageDb::new(
// backend.clone(),
// dkg_keystore.clone(),
// local_keystore.clone(),
// debug_logger.clone(),
// );

let path = Path::new("sqlite:///").join(db_path).join("frontier.db3");

let sql_backend = db::SqlBackend::new(
db::BackendConfig::Sqlite(db::SqliteBackendConfig {
path: path.to_str().unwrap(),
create_if_missing: true,
cache_size: 20480,
thread_count: 4,
}),
1,
None,
)
.unwrap();

let db_backend = Arc::new(sql_backend);
let worker_params = worker::WorkerParams {
latest_header,
client,
Expand Down
4 changes: 4 additions & 0 deletions dkg-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ pub enum DKGError {
InvalidSigningSet,
InputOutOfBounds,
CannotSign,
LocalKeyNotFound,
StoringLocalKeyFailed,
}

impl fmt::Display for DKGError {
Expand All @@ -242,6 +244,8 @@ impl fmt::Display for DKGError {
InvalidSigningSet => "Invalid Signing Set!".to_string(),
InputOutOfBounds => "Input value out of bounds set by runtime".to_string(),
CannotSign => "Could not sign public key".to_string(),
LocalKeyNotFound => "Local key not found!".to_string(),
StoringLocalKeyFailed => "Local key not be saved!".to_string(),
};
write!(f, "DKGError of type {label}")
}
Expand Down
10 changes: 5 additions & 5 deletions scripts/run-local-testnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,28 @@ echo "*** Start Webb DKG Node ***"
# Alice
./target/release/dkg-standalone-node --tmp --chain local --validator -lerror --alice --output-path=./tmp/alice/output.log \
--rpc-cors all --rpc-external --rpc-methods=unsafe \
--port 30333 \
--port 30333 --db-path=./tmp/alice/db \
--rpc-port 9944 --node-key 0000000000000000000000000000000000000000000000000000000000000001 &
# Bob
./target/release/dkg-standalone-node --tmp --chain local --validator -lerror --bob --output-path=./tmp/bob/output.log \
--rpc-cors all --rpc-external --rpc-methods=unsafe \
--port 30305 \
--port 30305 --db-path=./tmp/bob/db \
--rpc-port 9945 --bootnodes /ip4/127.0.0.1/tcp/30333/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp &
# Charlie
./target/release/dkg-standalone-node --tmp --chain local --validator -lerror --charlie --output-path=./tmp/charlie/output.log \
--rpc-cors all --rpc-external --rpc-methods=unsafe \
--port 30308 \
--port 30308 --db-path=./tmp/charlie/db \
--rpc-port 9946 --bootnodes /ip4/127.0.0.1/tcp/30333/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp &
# Dave
./target/release/dkg-standalone-node --tmp --chain local --validator -lerror --dave --output-path=./tmp/dave/output.log \
--rpc-cors all --rpc-external --rpc-methods=unsafe \
--port 30309 \
--port 30309 --db-path=./tmp/dave/db \
--rpc-port 9947 --bootnodes /ip4/127.0.0.1/tcp/30333/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp &
# Eve
./target/release/dkg-standalone-node --tmp --chain local --validator -linfo --eve --output-path=./tmp/eve/output.log \
--rpc-cors all --rpc-external \
--rpc-port 9948 \
--port 30310 \
--port 30310 --db-path=./tmp/eve/db \
-ldkg=debug \
-ldkg_gadget::worker=debug \
-lruntime::dkg_metadata=debug \
Expand Down
2 changes: 2 additions & 0 deletions standalone/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct Cli {
pub subcommand: Option<Subcommand>,
#[clap(flatten)]
pub run: RunCmd,
#[arg(long, short = 'd')]
pub db_path: std::path::PathBuf,
#[arg(long, short = 'o')]
pub output_path: Option<std::path::PathBuf>,
#[clap(flatten)]
Expand Down
1 change: 1 addition & 0 deletions standalone/node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub fn run() -> sc_cli::Result<()> {
runner.run_node_until_exit(|config| async move {
service::new_full(service::RunFullParams {
config,
db_path: cli.db_path,
debug_output: cli.output_path,
relayer_cmd: cli.relayer_cmd,
})
Expand Down
Loading
Loading