Skip to content

Commit

Permalink
Enable memdb instead of rocks with feature flags
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Aug 19, 2024
1 parent 5d33920 commit 00f5357
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 85 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ avail-core = { version = "0.6", git = "https://github.com/availproject/avail-cor
avail-subxt = { version = "0.5", git = "https://github.com/availproject/avail.git", tag = "v2.1.0.0-rc1" }
dusk-plonk = { git = "https://github.com/availproject/plonk.git", tag = "v0.12.0-polygon-2" }
kate-recovery = { version = "0.9", git = "https://github.com/availproject/avail-core", tag = "node-v2100-rc1" }
avail-light-core = { version = "0.0.0", path = "./core" }
avail-light-core = { version = "0.0.0", path = "./core", default-features = false }

anyhow = "1.0.71"
async-std = { version = "1.12.0", features = ["attributes"] }
Expand Down
5 changes: 3 additions & 2 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ test-case = "3.2.1"

[features]
network-analysis = ["avail-light-core/network-analysis"]
kademlia-rocksdb = ["avail-light-core/kademlia-rocksdb"]
default = ["kademlia-rocksdb"]
crawl = ["avail-light-core/crawl"]
rocksdb = ["avail-light-core/rocksdb"]
default = ["rocksdb"]
48 changes: 33 additions & 15 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@

use crate::cli::CliOpts;
use avail_core::AppId;
#[cfg(not(feature = "rocksdb"))]
use avail_light_core::data::MemoryDB as DB;
#[cfg(feature = "rocksdb")]
use avail_light_core::data::RocksDB as DB;
use avail_light_core::{
api,
consts::EXPECTED_SYSTEM_VERSION,
data::{IsFinalitySyncedKey, IsSyncedKey},
network,
sync_client::SyncClient,
sync_finality::SyncFinality,
};
use avail_light_core::{
data::{ClientIdKey, Database, LatestHeaderKey, RocksDB},
data::{ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey},
network::{
p2p::{self, BOOTSTRAP_LIST_EMPTY_MESSAGE},
self,
p2p::{self, Store, BOOTSTRAP_LIST_EMPTY_MESSAGE},
rpc, Network,
},
shutdown::Controller,
sync_client::SyncClient,
sync_finality::SyncFinality,
telemetry::{self, MetricCounter, Metrics},
types::{
load_or_init_suri, IdentityConfig, KademliaMode, MaintenanceConfig, MultiaddrConfig,
Expand Down Expand Up @@ -51,7 +52,8 @@ static GLOBAL: Jemalloc = Jemalloc;
async fn run(
cfg: RuntimeConfig,
identity_cfg: IdentityConfig,
db: RocksDB,

db: DB,
shutdown: Controller<String>,
client_id: Uuid,
execution_id: Uuid,
Expand Down Expand Up @@ -99,15 +101,21 @@ async fn run(
// Create sender channel for P2P event loop commands
let (p2p_event_loop_sender, p2p_event_loop_receiver) = mpsc::unbounded_channel();

let store = Store::with_config(
peer_id,
(&cfg.libp2p).into(),
#[cfg(feature = "rocksdb")]
db.inner(),
);

let p2p_event_loop = p2p::EventLoop::new(
cfg.libp2p.clone(),
version,
&cfg.genesis_hash,
&id_keys,
cfg.is_fat_client(),
shutdown.clone(),
#[cfg(feature = "kademlia-rocksdb")]
db.inner(),
store,
);

spawn_in_span(
Expand Down Expand Up @@ -338,7 +346,7 @@ async fn run(
async fn run_fat(
cfg: RuntimeConfig,
identity_cfg: IdentityConfig,
db: RocksDB,
db: DB,
shutdown: Controller<String>,
client_id: Uuid,
execution_id: Uuid,
Expand Down Expand Up @@ -384,15 +392,21 @@ async fn run_fat(
// Create sender channel for P2P event loop commands
let (p2p_event_loop_sender, p2p_event_loop_receiver) = mpsc::unbounded_channel();

let store = Store::with_config(
peer_id,
(&cfg.libp2p).into(),
#[cfg(feature = "rocksdb")]
db.inner(),
);

let p2p_event_loop = p2p::EventLoop::new(
cfg.libp2p.clone(),
version,
&cfg.genesis_hash,
&id_keys,
cfg.is_fat_client(),
shutdown.clone(),
#[cfg(feature = "kademlia-rocksdb")]
db.inner(),
store,
);

spawn_in_span(
Expand Down Expand Up @@ -613,7 +627,11 @@ pub async fn main() -> Result<()> {
fs::remove_dir_all(&cfg.avail_path).wrap_err("Failed to remove local state directory")?;
}

let db = RocksDB::open(&cfg.avail_path).expect("Avail Light could not initialize database");
#[cfg(feature = "rocksdb")]
let db = DB::open(&cfg.avail_path).expect("Avail Light could not initialize database");

#[cfg(not(feature = "rocksdb"))]
let db = DB::default();

let client_id = db.get(ClientIdKey).unwrap_or_else(|| {
let client_id = Uuid::new_v4();
Expand Down
6 changes: 3 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ num_cpus = "1.13.0"
pcap = "1.1.0"
rand = "0.8.4"
rand_chacha = "0.3"
rocksdb = { version = "0.21.0", features = ["snappy", "multi-threaded-cf"] }
rocksdb = { version = "0.21.0", features = ["snappy", "multi-threaded-cf"], optional = true }
semver = { workspace = true }
serde = { workspace = true }
serde_json = "1.0.68"
Expand Down Expand Up @@ -74,6 +74,6 @@ test-case = "3.2.1"

[features]
network-analysis = []
kademlia-rocksdb = []
rocksdb = ["dep:rocksdb"]
crawl = []
default = []
default = ["rocksdb"]
28 changes: 8 additions & 20 deletions core/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,30 @@
use self::rocks_db::RocksDBKey;
use crate::{
network::rpc::Node as RpcNode,
types::{BlockRange, Uuid},
};
use avail_subxt::primitives::Header;
use codec::{Decode, Encode};
#[cfg(test)]
use mem_db::HashMapKey;
use serde::{Deserialize, Serialize};
use sp_core::ed25519;

mod keys;
#[cfg(test)]

#[cfg(not(feature = "rocksdb"))]
mod mem_db;
mod rocks_db;
#[cfg(not(feature = "rocksdb"))]
pub use mem_db::*;

#[cfg(test)]
pub use mem_db::MemoryDB;
pub use rocks_db::RocksDB;
#[cfg(feature = "rocksdb")]
mod rocks_db;
#[cfg(feature = "rocksdb")]
pub use rocks_db::*;

/// Column family for application state
pub const APP_STATE_CF: &str = "app_state_cf";

/// Column family for Kademlia store
pub const KADEMLIA_STORE_CF: &str = "kademlia_store_cf";

#[cfg(not(test))]
/// Type of the database key which we can get from the custom key.
pub trait RecordKey: Into<RocksDBKey> {
type Type: Serialize + for<'a> Deserialize<'a> + Encode + Decode;
}

#[cfg(test)]
/// Type of the database key which we can get from the custom key.
pub trait RecordKey: Into<RocksDBKey> + Into<HashMapKey> {
type Type: Serialize + for<'a> Deserialize<'a> + Encode + Decode;
}

pub trait Database {
/// Puts value for given key into database.
/// Key is serialized into database key, value is serialized into type supported by database.
Expand Down
5 changes: 5 additions & 0 deletions core/src/data/mem_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ pub struct MemoryDB {
#[derive(Eq, Hash, PartialEq)]
pub struct HashMapKey(pub String);

/// Type of the database key which we can get from the custom key.
pub trait RecordKey: Into<HashMapKey> {
type Type: Serialize + for<'a> Deserialize<'a> + Encode + Decode;
}

impl Default for MemoryDB {
fn default() -> Self {
MemoryDB {
Expand Down
5 changes: 5 additions & 0 deletions core/src/data/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ pub struct RocksDB {
#[derive(Eq, Hash, PartialEq)]
pub struct RocksDBKey(Option<&'static str>, Vec<u8>);

/// Type of the database key which we can get from the custom key.
pub trait RecordKey: Into<RocksDBKey> {
type Type: Serialize + for<'a> Deserialize<'a> + Encode + Decode;
}

impl RocksDBKey {
fn app_state(key: &str) -> Self {
Self(Some(APP_STATE_CF), key.as_bytes().to_vec())
Expand Down
4 changes: 2 additions & 2 deletions core/src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use color_eyre::{eyre::WrapErr, Result};
use std::sync::Arc;
#[cfg(not(feature = "kademlia-rocksdb"))]
#[cfg(not(feature = "rocksdb"))]
use std::time::Instant;
use tokio::sync::broadcast;
use tracing::{debug, error, info};
Expand All @@ -18,7 +18,7 @@ pub async fn process_block(
maintenance_config: MaintenanceConfig,
metrics: &Arc<impl Metrics>,
) -> Result<()> {
#[cfg(not(feature = "kademlia-rocksdb"))]
#[cfg(not(feature = "rocksdb"))]
if block_number % maintenance_config.pruning_interval == 0 {
info!(block_number, "Pruning...");
match p2p_client.prune_expired_records(Instant::now()).await {
Expand Down
32 changes: 19 additions & 13 deletions core/src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,31 @@ mod client;
pub mod configuration;
mod event_loop;
mod kad_mem_providers;
#[cfg(not(feature = "kademlia-rocksdb"))]

#[cfg(not(feature = "rocksdb"))]
mod kad_mem_store;
#[cfg(feature = "rocksdb")]
mod kad_rocksdb_store;

#[cfg(not(feature = "rocksdb"))]
pub use kad_mem_store::MemoryStoreConfig;
#[cfg(feature = "rocksdb")]
pub use kad_rocksdb_store::ExpirationCompactionFilterFactory;
#[cfg(feature = "rocksdb")]
pub use kad_rocksdb_store::RocksDBStoreConfig;

#[cfg(feature = "rocksdb")]
pub type Store = kad_rocksdb_store::RocksDBStore;
#[cfg(not(feature = "rocksdb"))]
pub type Store = kad_mem_store::MemoryStore;

use crate::{
data::{Database, P2PKeypairKey, RocksDB},
data::{Database, P2PKeypairKey},
types::SecretKey,
};
pub use client::Client;
pub use event_loop::EventLoop;
pub use kad_mem_providers::ProvidersConfig;
#[cfg(not(feature = "kademlia-rocksdb"))]
pub use kad_mem_store::MemoryStoreConfig;
pub use kad_rocksdb_store::ExpirationCompactionFilterFactory;
pub use kad_rocksdb_store::RocksDBStoreConfig;
use libp2p_allow_block_list as allow_block_list;

const MINIMUM_SUPPORTED_BOOTSTRAP_VERSION: &str = "0.1.1";
Expand Down Expand Up @@ -120,11 +131,6 @@ pub enum QueryChannel {

type Command = Box<dyn FnOnce(&mut EventLoop) -> Result<(), Report> + Send>;

#[cfg(not(feature = "kademlia-rocksdb"))]
type Store = kad_mem_store::MemoryStore;
#[cfg(feature = "kademlia-rocksdb")]
type Store = kad_rocksdb_store::RocksDBStore;

// Behaviour struct is used to derive delegated Libp2p behaviour implementation
#[derive(NetworkBehaviour)]
#[behaviour(event_process = false)]
Expand Down Expand Up @@ -302,7 +308,7 @@ pub fn is_multiaddr_global(address: &Multiaddr) -> bool {
.any(|protocol| matches!(protocol, libp2p::multiaddr::Protocol::Ip4(ip) if is_global(ip)))
}

fn get_or_init_keypair(cfg: &LibP2PConfig, db: RocksDB) -> Result<identity::Keypair> {
fn get_or_init_keypair(cfg: &LibP2PConfig, db: impl Database) -> Result<identity::Keypair> {
if let Some(secret_key) = cfg.secret_key.as_ref() {
return keypair(secret_key);
};
Expand All @@ -317,7 +323,7 @@ fn get_or_init_keypair(cfg: &LibP2PConfig, db: RocksDB) -> Result<identity::Keyp
Ok(id_keys)
}

pub fn identity(cfg: &LibP2PConfig, db: RocksDB) -> Result<(identity::Keypair, PeerId)> {
pub fn identity(cfg: &LibP2PConfig, db: impl Database) -> Result<(identity::Keypair, PeerId)> {
let keypair = get_or_init_keypair(cfg, db)?;
let peer_id = PeerId::from(keypair.public());
Ok((keypair, peer_id))
Expand Down
2 changes: 1 addition & 1 deletion core/src/network/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl Client {

pub async fn prune_expired_records(&self, now: Instant) -> Result<usize> {
self.execute_sync(|response_sender| {
if cfg!(feature = "kademlia-rocksdb") {
if cfg!(feature = "rocksdb") {
Box::new(move |_| {
response_sender.send(Ok(0)).map_err(|e| {
eyre!(
Expand Down
10 changes: 6 additions & 4 deletions core/src/network/p2p/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#[cfg(not(feature = "kademlia-rocksdb"))]
use super::ProvidersConfig;
#[cfg(feature = "rocksdb")]
use super::RocksDBStoreConfig;
#[cfg(not(feature = "rocksdb"))]
use crate::network::p2p::MemoryStoreConfig;
use crate::types::{duration_seconds_format, KademliaMode, MultiaddrConfig, SecretKey};
use libp2p::{kad, multiaddr::Protocol, Multiaddr};
Expand All @@ -10,8 +13,6 @@ use std::{
time::Duration,
};

use super::{ProvidersConfig, RocksDBStoreConfig};

/// Libp2p AutoNAT configuration (see [RuntimeConfig] for details)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(default)]
Expand Down Expand Up @@ -206,7 +207,7 @@ impl From<&LibP2PConfig> for kad::Config {
}
}

#[cfg(not(feature = "kademlia-rocksdb"))]
#[cfg(not(feature = "rocksdb"))]
impl From<&LibP2PConfig> for MemoryStoreConfig {
fn from(cfg: &LibP2PConfig) -> Self {
MemoryStoreConfig {
Expand All @@ -220,6 +221,7 @@ impl From<&LibP2PConfig> for MemoryStoreConfig {
}
}

#[cfg(feature = "rocksdb")]
impl From<&LibP2PConfig> for RocksDBStoreConfig {
fn from(cfg: &LibP2PConfig) -> Self {
RocksDBStoreConfig {
Expand Down
17 changes: 2 additions & 15 deletions core/src/network/p2p/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::{debug, error, info, trace, warn};

use super::{
build_swarm, client::BlockStat, configuration::LibP2PConfig, Behaviour, BehaviourEvent,
Command, QueryChannel,
Command, QueryChannel, Store,
};
use crate::{
network::p2p::{is_multiaddr_global, AgentVersion},
Expand Down Expand Up @@ -163,30 +163,17 @@ impl TryFrom<RecordKey> for DHTKey {
}
}

#[cfg(not(feature = "kademlia-rocksdb"))]
type Store = super::kad_mem_store::MemoryStore;
#[cfg(feature = "kademlia-rocksdb")]
type Store = super::kad_rocksdb_store::RocksDBStore;

impl EventLoop {
#[allow(clippy::too_many_arguments)]
pub async fn new(
cfg: LibP2PConfig,
version: &str,
genesis_hash: &str,
id_keys: &Keypair,
is_fat_client: bool,
shutdown: Controller<String>,
#[cfg(feature = "kademlia-rocksdb")] db: Arc<rocksdb::DB>,
store: Store,
) -> Self {
let bootstrap_interval = cfg.bootstrap_period;
let peer_id = id_keys.public().to_peer_id();
let store = Store::with_config(
peer_id,
(&cfg).into(),
#[cfg(feature = "kademlia-rocksdb")]
db,
);

let swarm = build_swarm(&cfg, version, genesis_hash, id_keys, store)
.await
Expand Down
Loading

0 comments on commit 00f5357

Please sign in to comment.