Skip to content

Commit

Permalink
Enable memdb instead of rocks with feature flags
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Aug 20, 2024
1 parent 4f5427b commit b5c5996
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 93 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
# TODO: Replace with "--benches --tests --all-features" when CI is fixed for other features
args: --workspace --benches --tests --features "default,crawl,kademlia-rocksdb"
args: --workspace --benches --tests --no-default-features
env:
RUSTFLAGS: "-C instrument-coverage"
LLVM_PROFILE_FILE: "profile-%p-%m.profraw"
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ avail-core = { version = "0.6", git = "https://github.com/availproject/avail-cor
avail-subxt = { version = "0.5", git = "https://github.com/availproject/avail.git", tag = "v2.1.0.0-rc1" }
dusk-plonk = { git = "https://github.com/availproject/plonk.git", tag = "v0.12.0-polygon-2" }
kate-recovery = { version = "0.9", git = "https://github.com/availproject/avail-core", tag = "node-v2100-rc1" }
avail-light-core = { version = "0.0.0", path = "./core" }
avail-light-core = { path = "./core" }

anyhow = "1.0.71"
async-std = { version = "1.12.0", features = ["attributes"] }
Expand Down
4 changes: 2 additions & 2 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ test-case = "3.2.1"

[features]
network-analysis = ["avail-light-core/network-analysis"]
kademlia-rocksdb = ["avail-light-core/kademlia-rocksdb"]
default = ["kademlia-rocksdb"]
rocksdb = ["avail-light-core/rocksdb"]
default = ["rocksdb"]
36 changes: 24 additions & 12 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@

use crate::cli::CliOpts;
use avail_core::AppId;
#[cfg(not(feature = "rocksdb"))]
use avail_light_core::data::MemoryDB as DB;
#[cfg(feature = "rocksdb")]
use avail_light_core::data::RocksDB as DB;
use avail_light_core::{
api,
consts::EXPECTED_SYSTEM_VERSION,
data::{IsFinalitySyncedKey, IsSyncedKey},
network,
sync_client::SyncClient,
sync_finality::SyncFinality,
};
use avail_light_core::{
data::{ClientIdKey, Database, LatestHeaderKey, RocksDB},
data::{ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey},
network::{
p2p::{self, BOOTSTRAP_LIST_EMPTY_MESSAGE},
self,
p2p::{self, Store, BOOTSTRAP_LIST_EMPTY_MESSAGE},
rpc, Network,
},
shutdown::Controller,
sync_client::SyncClient,
sync_finality::SyncFinality,
telemetry::{self, MetricCounter, Metrics},
types::{
load_or_init_suri, IdentityConfig, MaintenanceConfig, MultiaddrConfig, RuntimeConfig,
Expand Down Expand Up @@ -50,7 +51,8 @@ static GLOBAL: Jemalloc = Jemalloc;
async fn run(
cfg: RuntimeConfig,
identity_cfg: IdentityConfig,
db: RocksDB,

db: DB,
shutdown: Controller<String>,
client_id: Uuid,
execution_id: Uuid,
Expand Down Expand Up @@ -92,15 +94,21 @@ async fn run(
// Create sender channel for P2P event loop commands
let (p2p_event_loop_sender, p2p_event_loop_receiver) = mpsc::unbounded_channel();

let store = Store::with_config(
peer_id,
(&cfg.libp2p).into(),
#[cfg(feature = "rocksdb")]
db.inner(),
);

let p2p_event_loop = p2p::EventLoop::new(
cfg.libp2p.clone(),
version,
&cfg.genesis_hash,
&id_keys,
false,
shutdown.clone(),
#[cfg(feature = "kademlia-rocksdb")]
db.inner(),
store,
);

spawn_in_span(
Expand Down Expand Up @@ -409,7 +417,11 @@ pub async fn main() -> Result<()> {
fs::remove_dir_all(&cfg.avail_path).wrap_err("Failed to remove local state directory")?;
}

let db = RocksDB::open(&cfg.avail_path).expect("Avail Light could not initialize database");
#[cfg(feature = "rocksdb")]
let db = DB::open(&cfg.avail_path).expect("Avail Light could not initialize database");

#[cfg(not(feature = "rocksdb"))]
let db = DB::default();

let client_id = db.get(ClientIdKey).unwrap_or_else(|| {
let client_id = Uuid::new_v4();
Expand Down
4 changes: 4 additions & 0 deletions compatibility-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ kate-recovery = { workspace = true }
clap = { workspace = true }
color-eyre = { workspace = true }
tokio = { workspace = true }

[features]
rocksdb = ["avail-light-core/rocksdb"]
default = ["rocksdb"]
17 changes: 11 additions & 6 deletions compatibility-tests/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::time::Duration;

#[cfg(not(feature = "rocksdb"))]
use avail_light_core::data::MemoryDB as DB;
#[cfg(feature = "rocksdb")]
use avail_light_core::data::RocksDB as DB;
use avail_light_core::{
data::RocksDB,
network::rpc::{
self,
configuration::{ExponentialConfig, RPCConfig, RetryConfig},
},
shutdown::Controller,
};
use clap::Parser;
use color_eyre::{eyre::Context, Result};
use color_eyre::Result;
use kate_recovery::matrix::Position;
use std::time::Duration;

#[derive(Parser)]
struct CommandArgs {
Expand All @@ -25,8 +27,11 @@ async fn main() -> Result<()> {
let command_args = CommandArgs::parse();
println!("Using URL: {}", command_args.url);
println!("Using Path: {}", command_args.avail_path);
let db = RocksDB::open(&command_args.avail_path)
.wrap_err("API Compatibility Test could not initialize database")?;

#[cfg(not(feature = "rocksdb"))]
let db = DB::default();
#[cfg(feature = "rocksdb")]
let db = DB::open(&command_args.avail_path)?;

let rpc_cfg = RPCConfig {
full_node_ws: vec![command_args.url],
Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ num_cpus = "1.13.0"
pcap = "1.1.0"
rand = "0.8.4"
rand_chacha = "0.3"
rocksdb = { version = "0.21.0", features = ["snappy", "multi-threaded-cf"] }
rocksdb = { version = "0.21.0", features = ["snappy", "multi-threaded-cf"], optional = true }
semver = { workspace = true }
serde = { workspace = true }
serde_json = "1.0.68"
Expand Down Expand Up @@ -75,6 +75,6 @@ test-case = "3.2.1"

[features]
network-analysis = []
kademlia-rocksdb = []
rocksdb = ["dep:rocksdb"]
crawl = []
default = []
28 changes: 8 additions & 20 deletions core/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,30 @@
use self::rocks_db::RocksDBKey;
use crate::{
network::rpc::Node as RpcNode,
types::{BlockRange, Uuid},
};
use avail_subxt::primitives::Header;
use codec::{Decode, Encode};
#[cfg(test)]
use mem_db::HashMapKey;
use serde::{Deserialize, Serialize};
use sp_core::ed25519;

mod keys;
#[cfg(test)]

#[cfg(not(feature = "rocksdb"))]
mod mem_db;
mod rocks_db;
#[cfg(not(feature = "rocksdb"))]
pub use mem_db::*;

#[cfg(test)]
pub use mem_db::MemoryDB;
pub use rocks_db::RocksDB;
#[cfg(feature = "rocksdb")]
mod rocks_db;
#[cfg(feature = "rocksdb")]
pub use rocks_db::*;

/// Column family for application state
pub const APP_STATE_CF: &str = "app_state_cf";

/// Column family for Kademlia store
pub const KADEMLIA_STORE_CF: &str = "kademlia_store_cf";

#[cfg(not(test))]
/// Type of the database key which we can get from the custom key.
pub trait RecordKey: Into<RocksDBKey> {
type Type: Serialize + for<'a> Deserialize<'a> + Encode + Decode;
}

#[cfg(test)]
/// Type of the database key which we can get from the custom key.
pub trait RecordKey: Into<RocksDBKey> + Into<HashMapKey> {
type Type: Serialize + for<'a> Deserialize<'a> + Encode + Decode;
}

pub trait Database {
/// Puts value for given key into database.
/// Key is serialized into database key, value is serialized into type supported by database.
Expand Down
5 changes: 5 additions & 0 deletions core/src/data/mem_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ pub struct MemoryDB {
#[derive(Eq, Hash, PartialEq)]
pub struct HashMapKey(pub String);

/// Type of the database key which we can get from the custom key.
pub trait RecordKey: Into<HashMapKey> {
type Type: Serialize + for<'a> Deserialize<'a> + Encode + Decode;
}

impl Default for MemoryDB {
fn default() -> Self {
MemoryDB {
Expand Down
5 changes: 5 additions & 0 deletions core/src/data/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ pub struct RocksDB {
#[derive(Eq, Hash, PartialEq)]
pub struct RocksDBKey(Option<&'static str>, Vec<u8>);

/// Type of the database key which we can get from the custom key.
pub trait RecordKey: Into<RocksDBKey> {
type Type: Serialize + for<'a> Deserialize<'a> + Encode + Decode;
}

impl RocksDBKey {
fn app_state(key: &str) -> Self {
Self(Some(APP_STATE_CF), key.as_bytes().to_vec())
Expand Down
4 changes: 2 additions & 2 deletions core/src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use color_eyre::{eyre::WrapErr, Result};
use std::sync::Arc;
#[cfg(not(feature = "kademlia-rocksdb"))]
#[cfg(not(feature = "rocksdb"))]
use std::time::Instant;
use tokio::sync::broadcast;
use tracing::{debug, error, info};
Expand All @@ -18,7 +18,7 @@ pub async fn process_block(
maintenance_config: MaintenanceConfig,
metrics: &Arc<impl Metrics>,
) -> Result<()> {
#[cfg(not(feature = "kademlia-rocksdb"))]
#[cfg(not(feature = "rocksdb"))]
if block_number % maintenance_config.pruning_interval == 0 {
info!(block_number, "Pruning...");
match p2p_client.prune_expired_records(Instant::now()).await {
Expand Down
32 changes: 19 additions & 13 deletions core/src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,31 @@ mod client;
pub mod configuration;
mod event_loop;
mod kad_mem_providers;
#[cfg(not(feature = "kademlia-rocksdb"))]

#[cfg(not(feature = "rocksdb"))]
mod kad_mem_store;
#[cfg(feature = "rocksdb")]
mod kad_rocksdb_store;

#[cfg(not(feature = "rocksdb"))]
pub use kad_mem_store::MemoryStoreConfig;
#[cfg(feature = "rocksdb")]
pub use kad_rocksdb_store::ExpirationCompactionFilterFactory;
#[cfg(feature = "rocksdb")]
pub use kad_rocksdb_store::RocksDBStoreConfig;

#[cfg(feature = "rocksdb")]
pub type Store = kad_rocksdb_store::RocksDBStore;
#[cfg(not(feature = "rocksdb"))]
pub type Store = kad_mem_store::MemoryStore;

use crate::{
data::{Database, P2PKeypairKey, RocksDB},
data::{Database, P2PKeypairKey},
types::SecretKey,
};
pub use client::Client;
pub use event_loop::EventLoop;
pub use kad_mem_providers::ProvidersConfig;
#[cfg(not(feature = "kademlia-rocksdb"))]
pub use kad_mem_store::MemoryStoreConfig;
pub use kad_rocksdb_store::ExpirationCompactionFilterFactory;
pub use kad_rocksdb_store::RocksDBStoreConfig;
use libp2p_allow_block_list as allow_block_list;

const MINIMUM_SUPPORTED_BOOTSTRAP_VERSION: &str = "0.1.1";
Expand Down Expand Up @@ -122,11 +133,6 @@ pub enum QueryChannel {

type Command = Box<dyn FnOnce(&mut EventLoop) -> Result<(), Report> + Send>;

#[cfg(not(feature = "kademlia-rocksdb"))]
type Store = kad_mem_store::MemoryStore;
#[cfg(feature = "kademlia-rocksdb")]
type Store = kad_rocksdb_store::RocksDBStore;

// Behaviour struct is used to derive delegated Libp2p behaviour implementation
#[derive(NetworkBehaviour)]
#[behaviour(event_process = false)]
Expand Down Expand Up @@ -310,7 +316,7 @@ pub fn is_multiaddr_global(address: &Multiaddr) -> bool {
.any(|protocol| matches!(protocol, libp2p::multiaddr::Protocol::Ip4(ip) if is_global(ip)))
}

fn get_or_init_keypair(cfg: &LibP2PConfig, db: RocksDB) -> Result<identity::Keypair> {
fn get_or_init_keypair(cfg: &LibP2PConfig, db: impl Database) -> Result<identity::Keypair> {
if let Some(secret_key) = cfg.secret_key.as_ref() {
return keypair(secret_key);
};
Expand All @@ -325,7 +331,7 @@ fn get_or_init_keypair(cfg: &LibP2PConfig, db: RocksDB) -> Result<identity::Keyp
Ok(id_keys)
}

pub fn identity(cfg: &LibP2PConfig, db: RocksDB) -> Result<(identity::Keypair, PeerId)> {
pub fn identity(cfg: &LibP2PConfig, db: impl Database) -> Result<(identity::Keypair, PeerId)> {
let keypair = get_or_init_keypair(cfg, db)?;
let peer_id = PeerId::from(keypair.public());
Ok((keypair, peer_id))
Expand Down
2 changes: 1 addition & 1 deletion core/src/network/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ impl Client {

pub async fn prune_expired_records(&self, now: Instant) -> Result<usize> {
self.execute_sync(|response_sender| {
if cfg!(feature = "kademlia-rocksdb") {
if cfg!(feature = "rocksdb") {
Box::new(move |_| {
response_sender.send(Ok(0)).map_err(|e| {
eyre!(
Expand Down
10 changes: 6 additions & 4 deletions core/src/network/p2p/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#[cfg(not(feature = "kademlia-rocksdb"))]
use super::ProvidersConfig;
#[cfg(feature = "rocksdb")]
use super::RocksDBStoreConfig;
#[cfg(not(feature = "rocksdb"))]
use crate::network::p2p::MemoryStoreConfig;
use crate::types::{duration_seconds_format, KademliaMode, MultiaddrConfig, SecretKey};
use libp2p::{kad, multiaddr::Protocol, Multiaddr};
Expand All @@ -10,8 +13,6 @@ use std::{
time::Duration,
};

use super::{ProvidersConfig, RocksDBStoreConfig};

/// Libp2p AutoNAT configuration (see [RuntimeConfig] for details)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(default)]
Expand Down Expand Up @@ -216,7 +217,7 @@ impl From<&LibP2PConfig> for kad::Config {
}
}

#[cfg(not(feature = "kademlia-rocksdb"))]
#[cfg(not(feature = "rocksdb"))]
impl From<&LibP2PConfig> for MemoryStoreConfig {
fn from(cfg: &LibP2PConfig) -> Self {
MemoryStoreConfig {
Expand All @@ -230,6 +231,7 @@ impl From<&LibP2PConfig> for MemoryStoreConfig {
}
}

#[cfg(feature = "rocksdb")]
impl From<&LibP2PConfig> for RocksDBStoreConfig {
fn from(cfg: &LibP2PConfig) -> Self {
RocksDBStoreConfig {
Expand Down
Loading

0 comments on commit b5c5996

Please sign in to comment.