From b854452265637948e186c6b83ec7ca4864360ecf Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Wed, 22 Jan 2025 16:58:42 +0100 Subject: [PATCH 01/11] Add possibility to submit a list of changes to rocksdb --- crates/fuel-core/src/database.rs | 17 ++- crates/fuel-core/src/service.rs | 10 +- .../service/genesis/importer/import_task.rs | 4 +- crates/fuel-core/src/state.rs | 6 +- .../fuel-core/src/state/historical_rocksdb.rs | 106 ++++++++++++++---- .../historical_rocksdb/view_at_height.rs | 21 +++- .../src/state/in_memory/memory_store.rs | 49 +++++--- crates/fuel-core/src/state/rocks_db.rs | 52 ++++++--- crates/storage/src/transactional.rs | 8 ++ 9 files changed, 199 insertions(+), 74 deletions(-) diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 37b5f2b097e..2265ebbbf24 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -43,6 +43,7 @@ use fuel_core_storage::{ ConflictPolicy, HistoricalView, Modifiable, + StorageChanges, StorageTransaction, }, Error as StorageError, @@ -413,19 +414,25 @@ impl Modifiable for Database { impl Modifiable for GenesisDatabase { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { - self.data.as_ref().commit_changes(None, changes) + self.data + .as_ref() + .commit_changes(None, StorageChanges::Changes(changes)) } } impl Modifiable for GenesisDatabase { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { - self.data.as_ref().commit_changes(None, changes) + self.data + .as_ref() + .commit_changes(None, StorageChanges::Changes(changes)) } } impl Modifiable for GenesisDatabase { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { - self.data.as_ref().commit_changes(None, changes) + self.data + .as_ref() + .commit_changes(None, StorageChanges::Changes(changes)) } } @@ -518,7 +525,9 @@ where // Atomically commit the changes to the database, and to the mutex-protected field. let mut guard = database.stage.height.lock(); - database.data.commit_changes(new_height, updated_changes)?; + database + .data + .commit_changes(new_height, StorageChanges::Changes(updated_changes))?; // Update the block height if let Some(new_height) = new_height { diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index 628a3a9d6e4..3cb4505884a 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -32,6 +32,7 @@ use fuel_core_storage::{ transactional::{ AtomicView, ReadTransaction, + StorageChanges, }, IsNotFound, StorageAsMut, @@ -311,11 +312,10 @@ impl FuelService { &Consensus::Genesis(initialized_genesis), )?; - self.shared - .database - .on_chain() - .data - .commit_changes(Some(genesis_block_height), database_tx.into_changes())?; + self.shared.database.on_chain().data.commit_changes( + Some(genesis_block_height), + StorageChanges::Changes(database_tx.into_changes()), + )?; } Ok(()) diff --git a/crates/fuel-core/src/service/genesis/importer/import_task.rs b/crates/fuel-core/src/service/genesis/importer/import_task.rs index 5cdcf636c8c..4c0f76a1fb0 100644 --- a/crates/fuel-core/src/service/genesis/importer/import_task.rs +++ b/crates/fuel-core/src/service/genesis/importer/import_task.rs @@ -178,7 +178,7 @@ mod tests { }, tables::Coins, transactional::{ - Changes, + StorageChanges, StorageTransaction, }, Result as StorageResult, @@ -570,7 +570,7 @@ mod tests { fn commit_changes( &self, _: Option, - _: Changes, + _: StorageChanges, ) -> StorageResult<()> { Err(anyhow::anyhow!("I refuse to work!").into()) } diff --git a/crates/fuel-core/src/state.rs b/crates/fuel-core/src/state.rs index 06ee176c652..18ca7577896 100644 --- a/crates/fuel-core/src/state.rs +++ b/crates/fuel-core/src/state.rs @@ -12,7 +12,7 @@ use fuel_core_storage::{ IterableStore, }, kv_store::StorageColumn, - transactional::Changes, + transactional::StorageChanges, Result as StorageResult, }; use std::fmt::Debug; @@ -55,7 +55,7 @@ pub trait TransactableStorage: IterableStore + Debug + Send + Sync { fn commit_changes( &self, height: Option, - changes: Changes, + changes: StorageChanges, ) -> StorageResult<()>; fn view_at_height( @@ -75,7 +75,7 @@ impl TransactableStorage where S: IterableStore + Debug + Send + Sync, { - fn commit_changes(&self, _: Option, _: Changes) -> StorageResult<()> { + fn commit_changes(&self, _: Option, _: StorageChanges) -> StorageResult<()> { unimplemented!() } diff --git a/crates/fuel-core/src/state/historical_rocksdb.rs b/crates/fuel-core/src/state/historical_rocksdb.rs index 45ce0d797de..76178d8688a 100644 --- a/crates/fuel-core/src/state/historical_rocksdb.rs +++ b/crates/fuel-core/src/state/historical_rocksdb.rs @@ -43,6 +43,7 @@ use fuel_core_storage::{ Changes, ConflictPolicy, ReadTransaction, + StorageChanges, StorageTransaction, }, Error as StorageError, @@ -353,8 +354,9 @@ where ) .commit()?; - self.db - .commit_changes(&storage_transaction.into_changes())?; + self.db.commit_changes(&StorageChanges::Changes( + storage_transaction.into_changes(), + ))?; Ok(()) } @@ -577,17 +579,33 @@ where fn commit_changes( &self, height: Option, - changes: Changes, + changes: StorageChanges, ) -> StorageResult<()> { - let mut storage_transaction = - StorageTransaction::transaction(&self.db, ConflictPolicy::Overwrite, changes); + let mut storage_transaction = StorageTransaction::transaction( + &self.db, + ConflictPolicy::Overwrite, + Default::default(), + ); if let Some(height) = height { self.store_modifications_history(&mut storage_transaction, &height)?; } + let history_changes = storage_transaction.into_changes(); - self.db - .commit_changes(&storage_transaction.into_changes())?; + let new_changes = match changes { + StorageChanges::ChangesList(mut changes_list) => { + changes_list.push(history_changes); + StorageChanges::ChangesList(changes_list) + } + StorageChanges::Changes(changes) => { + let mut changes_list = Vec::with_capacity(2); + changes_list.push(changes); + changes_list.push(history_changes); + StorageChanges::ChangesList(changes_list) + } + }; + + self.db.commit_changes(&new_changes)?; Ok(()) } @@ -681,7 +699,10 @@ mod tests { .insert(&key(), &123) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // Set the value at height 2 to be 321. @@ -691,7 +712,10 @@ mod tests { .insert(&key(), &321) .unwrap(); historical_rocks_db - .commit_changes(Some(2u32.into()), transaction.into_changes()) + .commit_changes( + Some(2u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // When @@ -721,7 +745,10 @@ mod tests { .insert(&key(), &123) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // Set the value at height 2 to be 321. @@ -731,7 +758,10 @@ mod tests { .insert(&key(), &321) .unwrap(); historical_rocks_db - .commit_changes(Some(2u32.into()), transaction.into_changes()) + .commit_changes( + Some(2u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // When @@ -760,7 +790,10 @@ mod tests { .insert(&key(), &123) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // When @@ -787,7 +820,10 @@ mod tests { .insert(&key(), &123) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // When @@ -815,7 +851,10 @@ mod tests { .insert(&key(), &123) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // When @@ -825,7 +864,10 @@ mod tests { .insert(&key(), &321) .unwrap(); historical_rocks_db - .commit_changes(Some(2u32.into()), transaction.into_changes()) + .commit_changes( + Some(2u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // Then @@ -862,7 +904,10 @@ mod tests { .insert(&key(), &123) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); let entries = historical_rocks_db .db @@ -901,7 +946,10 @@ mod tests { .insert(&key(), &123) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); let v2_entries = historical_rocks_db .db @@ -936,7 +984,10 @@ mod tests { .insert(&key(), &123) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // Migrate the changes from V2 to V1. @@ -959,7 +1010,9 @@ mod tests { historical_rocks_db .db - .commit_changes(&migration_transaction.into_changes()) + .commit_changes(&StorageChanges::Changes( + migration_transaction.into_changes(), + )) .unwrap(); // Check that the history has indeed been written to V1 @@ -1008,7 +1061,10 @@ mod tests { .insert(&key(), &(123 + i as u64)) .unwrap(); historical_rocks_db - .commit_changes(Some(i.into()), transaction.into_changes()) + .commit_changes( + Some(i.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); } // We can now rollback the last block 1000 times. @@ -1042,7 +1098,10 @@ mod tests { .insert(&key(), &123) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); historical_rocks_db.rollback_last_block().unwrap(); @@ -1080,7 +1139,10 @@ mod tests { .insert(&key, &123) .unwrap(); historical_rocks_db - .commit_changes(Some(height.into()), transaction.into_changes()) + .commit_changes( + Some(height.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); } diff --git a/crates/fuel-core/src/state/historical_rocksdb/view_at_height.rs b/crates/fuel-core/src/state/historical_rocksdb/view_at_height.rs index d03703610d3..0a2041ca37f 100644 --- a/crates/fuel-core/src/state/historical_rocksdb/view_at_height.rs +++ b/crates/fuel-core/src/state/historical_rocksdb/view_at_height.rs @@ -102,6 +102,7 @@ mod tests { transactional::{ IntoTransaction, ReadTransaction, + StorageChanges, }, ContractsAssetKey, StorageAsMut, @@ -126,7 +127,10 @@ mod tests { .insert(&key(), &123) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // Set the value at height 2 to be 321. @@ -136,7 +140,10 @@ mod tests { .insert(&key(), &321) .unwrap(); historical_rocks_db - .commit_changes(Some(2u32.into()), transaction.into_changes()) + .commit_changes( + Some(2u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // When @@ -204,7 +211,10 @@ mod tests { .insert(&Default::default(), &123456) .unwrap(); historical_rocks_db - .commit_changes(Some(1u32.into()), transaction.into_changes()) + .commit_changes( + Some(1u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // Set the value at height 2 to be 321. @@ -218,7 +228,10 @@ mod tests { .insert(&Default::default(), &654321) .unwrap(); historical_rocks_db - .commit_changes(Some(2u32.into()), transaction.into_changes()) + .commit_changes( + Some(2u32.into()), + StorageChanges::Changes(transaction.into_changes()), + ) .unwrap(); // When diff --git a/crates/fuel-core/src/state/in_memory/memory_store.rs b/crates/fuel-core/src/state/in_memory/memory_store.rs index 17ee7f9ea5b..4a8f8a56e1d 100644 --- a/crates/fuel-core/src/state/in_memory/memory_store.rs +++ b/crates/fuel-core/src/state/in_memory/memory_store.rs @@ -31,6 +31,7 @@ use fuel_core_storage::{ transactional::{ Changes, ReferenceBytesKey, + StorageChanges, }, Result as StorageResult, }; @@ -117,6 +118,26 @@ where collection.into_iter().map(Ok) } + + fn _insert_changes(&self, changes: Changes) { + for (column, btree) in changes.into_iter() { + let mut lock = self.inner[column as usize] + .lock() + .map_err(|e| anyhow::anyhow!("The lock is poisoned: {}", e)) + .unwrap(); + + for (key, operation) in btree.into_iter() { + match operation { + WriteOperation::Insert(value) => { + lock.insert(key, value); + } + WriteOperation::Remove => { + lock.remove(&key); + } + } + } + } + } } impl KeyValueInspect for MemoryStore @@ -167,24 +188,18 @@ where fn commit_changes( &self, _: Option, - changes: Changes, + changes: StorageChanges, ) -> StorageResult<()> { - for (column, btree) in changes.into_iter() { - let mut lock = self.inner[column as usize] - .lock() - .map_err(|e| anyhow::anyhow!("The lock is poisoned: {}", e))?; - - for (key, operation) in btree.into_iter() { - match operation { - WriteOperation::Insert(value) => { - lock.insert(key, value); - } - WriteOperation::Remove => { - lock.remove(&key); - } + match changes { + StorageChanges::ChangesList(changes) => { + for changes in changes.into_iter() { + self._insert_changes(changes); } } - } + StorageChanges::Changes(changes) => { + self._insert_changes(changes); + } + }; Ok(()) } @@ -237,7 +252,7 @@ mod tests { let mut transaction = self.read_transaction(); let len = transaction.write(key, column, buf)?; let changes = transaction.into_changes(); - self.commit_changes(None, changes)?; + self.commit_changes(None, StorageChanges::Changes(changes))?; Ok(len) } @@ -245,7 +260,7 @@ mod tests { let mut transaction = self.read_transaction(); transaction.delete(key, column)?; let changes = transaction.into_changes(); - self.commit_changes(None, changes)?; + self.commit_changes(None, StorageChanges::Changes(changes))?; Ok(()) } } diff --git a/crates/fuel-core/src/state/rocks_db.rs b/crates/fuel-core/src/state/rocks_db.rs index a41547976ac..59cb671f1d6 100644 --- a/crates/fuel-core/src/state/rocks_db.rs +++ b/crates/fuel-core/src/state/rocks_db.rs @@ -27,7 +27,10 @@ use fuel_core_storage::{ Value, WriteOperation, }, - transactional::Changes, + transactional::{ + Changes, + StorageChanges, + }, Error as StorageError, Result as StorageResult, }; @@ -968,10 +971,34 @@ impl RocksDb where Description: DatabaseDescription, { - pub fn commit_changes(&self, changes: &Changes) -> StorageResult<()> { + pub fn commit_changes(&self, changes: &StorageChanges) -> StorageResult<()> { let instant = std::time::Instant::now(); let mut batch = WriteBatch::default(); + match changes { + StorageChanges::Changes(changes) => { + self._populate_batch(&mut batch, changes); + } + StorageChanges::ChangesList(changes_list) => { + for changes in changes_list { + self._populate_batch(&mut batch, changes); + } + } + } + + self.db + .write(batch) + .map_err(|e| DatabaseError::Other(e.into()))?; + // TODO: Use `u128` when `AtomicU128` is stable. + self.metrics.database_commit_time.inc_by( + u64::try_from(instant.elapsed().as_nanos()) + .expect("The commit shouldn't take longer than `u64`"), + ); + + Ok(()) + } + + fn _populate_batch(&self, batch: &mut WriteBatch, changes: &Changes) { for (column, ops) in changes { let cf = self.cf_u32(*column); let column_metrics = self.metrics.columns_write_statistic.get(column); @@ -989,17 +1016,6 @@ where } } } - - self.db - .write(batch) - .map_err(|e| DatabaseError::Other(e.into()))?; - // TODO: Use `u128` when `AtomicU128` is stable. - self.metrics.database_commit_time.inc_by( - u64::try_from(instant.elapsed().as_nanos()) - .expect("The commit shouldn't take longer than `u64`"), - ); - - Ok(()) } } @@ -1035,7 +1051,7 @@ pub mod test_helpers { let mut transaction = self.read_transaction(); let len = transaction.write(key, column, buf)?; let changes = transaction.into_changes(); - self.commit_changes(&changes)?; + self.commit_changes(&StorageChanges::Changes(changes))?; Ok(len) } @@ -1044,7 +1060,7 @@ pub mod test_helpers { let mut transaction = self.read_transaction(); transaction.delete(key, column)?; let changes = transaction.into_changes(); - self.commit_changes(&changes)?; + self.commit_changes(&StorageChanges::Changes(changes))?; Ok(()) } } @@ -1166,7 +1182,8 @@ mod tests { )]), )]; - db.commit_changes(&HashMap::from_iter(ops)).unwrap(); + db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops))) + .unwrap(); assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), value) } @@ -1182,7 +1199,8 @@ mod tests { Column::Metadata.id(), BTreeMap::from_iter(vec![(key.clone().into(), WriteOperation::Remove)]), )]; - db.commit_changes(&HashMap::from_iter(ops)).unwrap(); + db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops))) + .unwrap(); assert_eq!(db.get(&key, Column::Metadata).unwrap(), None); } diff --git a/crates/storage/src/transactional.rs b/crates/storage/src/transactional.rs index e5e568dd3ea..f2fd93d454c 100644 --- a/crates/storage/src/transactional.rs +++ b/crates/storage/src/transactional.rs @@ -222,6 +222,14 @@ impl From> for Changes { } } +/// The type describing the list of changes to the storage. +pub enum StorageChanges { + /// A single batch of changes. + Changes(Changes), + /// A list of changes. + ChangesList(Vec), +} + /// The trait to convert the type into the storage transaction. pub trait IntoTransaction: Sized { /// Converts the type into the storage transaction consuming it. From 3134fb07f1d4dba72b4c146540205c4494dad0ec Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 23 Jan 2025 14:32:22 +0100 Subject: [PATCH 02/11] Special case history rocksdb and changelog --- CHANGELOG.md | 1 + .../fuel-core/src/state/historical_rocksdb.rs | 38 ++++++++----------- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d8efebffc0..f9e59ac61a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2551](https://github.com/FuelLabs/fuel-core/pull/2551): Enhanced the DA compressed block header to include block id. +- [2619](https://github.com/FuelLabs/fuel-core/pull/2691): Add possibility to submit list of changes to rocksdb. ### Changed - [2603](https://github.com/FuelLabs/fuel-core/pull/2603): Sets the latest recorded height on initialization, not just when DA costs are received diff --git a/crates/fuel-core/src/state/historical_rocksdb.rs b/crates/fuel-core/src/state/historical_rocksdb.rs index 76178d8688a..9d9634d772a 100644 --- a/crates/fuel-core/src/state/historical_rocksdb.rs +++ b/crates/fuel-core/src/state/historical_rocksdb.rs @@ -579,33 +579,27 @@ where fn commit_changes( &self, height: Option, - changes: StorageChanges, + mut changes: StorageChanges, ) -> StorageResult<()> { - let mut storage_transaction = StorageTransaction::transaction( - &self.db, - ConflictPolicy::Overwrite, - Default::default(), - ); - + // When the history need to be process we need to have all the changes in one + // transaction to be able to write their reverse changes. if let Some(height) = height { + let all_changes = match changes { + StorageChanges::Changes(changes) => changes, + StorageChanges::ChangesList(list) => { + list.into_iter().flat_map(|changes| changes).collect() + } + }; + let mut storage_transaction = StorageTransaction::transaction( + &self.db, + ConflictPolicy::Overwrite, + all_changes, + ); self.store_modifications_history(&mut storage_transaction, &height)?; + changes = StorageChanges::Changes(storage_transaction.into_changes()); } - let history_changes = storage_transaction.into_changes(); - - let new_changes = match changes { - StorageChanges::ChangesList(mut changes_list) => { - changes_list.push(history_changes); - StorageChanges::ChangesList(changes_list) - } - StorageChanges::Changes(changes) => { - let mut changes_list = Vec::with_capacity(2); - changes_list.push(changes); - changes_list.push(history_changes); - StorageChanges::ChangesList(changes_list) - } - }; - self.db.commit_changes(&new_changes)?; + self.db.commit_changes(&changes)?; Ok(()) } From 7b06b0d1cd8239597b387e4cb62c634858ae063e Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 23 Jan 2025 15:00:40 +0100 Subject: [PATCH 03/11] Make sure we don't create a transaction if not needed & Clippy --- .../fuel-core/src/state/historical_rocksdb.rs | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/crates/fuel-core/src/state/historical_rocksdb.rs b/crates/fuel-core/src/state/historical_rocksdb.rs index 9d9634d772a..c6f92223c72 100644 --- a/crates/fuel-core/src/state/historical_rocksdb.rs +++ b/crates/fuel-core/src/state/historical_rocksdb.rs @@ -584,19 +584,21 @@ where // When the history need to be process we need to have all the changes in one // transaction to be able to write their reverse changes. if let Some(height) = height { - let all_changes = match changes { - StorageChanges::Changes(changes) => changes, - StorageChanges::ChangesList(list) => { - list.into_iter().flat_map(|changes| changes).collect() - } - }; - let mut storage_transaction = StorageTransaction::transaction( - &self.db, - ConflictPolicy::Overwrite, - all_changes, - ); - self.store_modifications_history(&mut storage_transaction, &height)?; - changes = StorageChanges::Changes(storage_transaction.into_changes()); + if self.state_rewind_policy != StateRewindPolicy::NoRewind { + let all_changes = match changes { + StorageChanges::Changes(changes) => changes, + StorageChanges::ChangesList(list) => { + list.into_iter().flatten().collect() + } + }; + let mut storage_transaction = StorageTransaction::transaction( + &self.db, + ConflictPolicy::Overwrite, + all_changes, + ); + self.store_modifications_history(&mut storage_transaction, &height)?; + changes = StorageChanges::Changes(storage_transaction.into_changes()); + } } self.db.commit_changes(&changes)?; From f8e001f0c703d6b4e16fc698a8f690f0eb255ca2 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Fri, 24 Jan 2025 14:43:15 +0100 Subject: [PATCH 04/11] Change `commit_changes_with_height_update` to work with StorageChanges --- crates/fuel-core/src/database.rs | 81 ++++++---- crates/storage/src/iter/changes_iterator.rs | 154 +++++++++++++++----- 2 files changed, 171 insertions(+), 64 deletions(-) diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 37bee36a6b2..7c7b1699d2c 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -365,41 +365,57 @@ where impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { - commit_changes_with_height_update(self, changes, |iter| { - iter.iter_all_keys::(Some(IterDirection::Reverse)) - .try_collect() - }) + commit_changes_with_height_update( + self, + StorageChanges::Changes(changes), + |iter| { + iter.iter_all_keys::(Some(IterDirection::Reverse)) + .try_collect() + }, + ) } } impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { - commit_changes_with_height_update(self, changes, |iter| { - iter.iter_all::(Some(IterDirection::Reverse)) - .map(|result| result.map(|(_, height)| height)) - .try_collect() - }) + commit_changes_with_height_update( + self, + StorageChanges::Changes(changes), + |iter| { + iter.iter_all::(Some(IterDirection::Reverse)) + .map(|result| result.map(|(_, height)| height)) + .try_collect() + }, + ) } } impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { - commit_changes_with_height_update(self, changes, |iter| { - iter.iter_all_keys::(Some(IterDirection::Reverse)) - .try_collect() - }) + commit_changes_with_height_update( + self, + StorageChanges::Changes(changes), + |iter| { + iter.iter_all_keys::(Some(IterDirection::Reverse)) + .try_collect() + }, + ) } } #[cfg(feature = "relayer")] impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { - commit_changes_with_height_update(self, changes, |iter| { - iter.iter_all_keys::(Some( - IterDirection::Reverse, - )) - .try_collect() - }) + commit_changes_with_height_update( + self, + StorageChanges::Changes(changes), + |iter| { + iter.iter_all_keys::(Some( + IterDirection::Reverse, + )) + .try_collect() + }, + ) } } @@ -436,7 +452,7 @@ impl Modifiable for GenesisDatabase { pub fn commit_changes_with_height_update( database: &mut Database, - changes: Changes, + mut changes: StorageChanges, heights_lookup: impl Fn( &ChangesIterator, ) -> StorageResult>, @@ -499,14 +515,14 @@ where } }; - let updated_changes = if let Some(new_height) = new_height { + if let Some(new_height) = new_height { // We want to update the metadata table to include a new height. - // For that, we are building a new storage transaction around `changes`. - // Modifying this transaction will include all required updates into the `changes`. + // For that, we are building a new storage transaction. + // We get the changes from the database and add to our list of changes. let mut transaction = StorageTransaction::transaction( &database, ConflictPolicy::Overwrite, - changes, + Default::default(), ); let maybe_current_metadata = transaction .storage_as_mut::>() @@ -516,16 +532,21 @@ where .storage_as_mut::>() .insert(&(), &metadata)?; - transaction.into_changes() - } else { - changes + changes = match changes { + StorageChanges::Changes(c) => { + StorageChanges::ChangesList(vec![c, transaction.into_changes()]) + } + StorageChanges::ChangesList(mut list) => { + let mut changes = core::mem::take(&mut list); + changes.push(transaction.into_changes()); + StorageChanges::ChangesList(changes) + } + } }; // Atomically commit the changes to the database, and to the mutex-protected field. let mut guard = database.stage.height.lock(); - database - .data - .commit_changes(new_height, StorageChanges::Changes(updated_changes))?; + database.data.commit_changes(new_height, changes)?; // Update the block height if let Some(new_height) = new_height { diff --git a/crates/storage/src/iter/changes_iterator.rs b/crates/storage/src/iter/changes_iterator.rs index 9a3e566bdf3..914b0ef494c 100644 --- a/crates/storage/src/iter/changes_iterator.rs +++ b/crates/storage/src/iter/changes_iterator.rs @@ -14,18 +14,18 @@ use crate::{ Value, WriteOperation, }, - transactional::Changes, + transactional::StorageChanges, }; -/// A type that allows to iterate over the `Changes`. +/// A type that allows to iterate over the `StorageChanges`. pub struct ChangesIterator<'a, Column> { - changes: &'a Changes, + changes: &'a StorageChanges, _marker: core::marker::PhantomData, } impl<'a, Description> ChangesIterator<'a, Description> { /// Creates a new instance of the `ChangesIterator`. - pub fn new(changes: &'a Changes) -> Self { + pub fn new(changes: &'a StorageChanges) -> Self { Self { changes, _marker: Default::default(), @@ -40,14 +40,30 @@ where type Column = Column; fn get(&self, key: &[u8], column: Self::Column) -> crate::Result> { - Ok(self - .changes - .get(&column.id()) - .and_then(|tree| tree.get(key)) - .and_then(|operation| match operation { - WriteOperation::Insert(value) => Some(value.clone()), - WriteOperation::Remove => None, - })) + match self.changes { + StorageChanges::Changes(changes) => Ok(changes + .get(&column.id()) + .and_then(|tree| tree.get(key)) + .and_then(|operation| match operation { + WriteOperation::Insert(value) => Some(value.clone()), + WriteOperation::Remove => None, + })), + StorageChanges::ChangesList(changes_list) => { + for changes in changes_list.iter() { + if let Some(value) = changes + .get(&column.id()) + .and_then(|tree| tree.get(key)) + .and_then(|operation| match operation { + WriteOperation::Insert(value) => Some(value.clone()), + WriteOperation::Remove => None, + }) + { + return Ok(Some(value)); + } + } + Ok(None) + } + } } } @@ -62,18 +78,54 @@ where start: Option<&[u8]>, direction: IterDirection, ) -> BoxedIter { - if let Some(tree) = self.changes.get(&column.id()) { - crate::iter::iterator(tree, prefix, start, direction) - .filter_map(|(key, value)| match value { - WriteOperation::Insert(value) => { - Some((key.clone().into(), value.clone())) - } - WriteOperation::Remove => None, - }) - .map(Ok) - .into_boxed() - } else { - core::iter::empty().into_boxed() + match self.changes { + StorageChanges::Changes(changes) => { + if let Some(tree) = changes.get(&column.id()) { + crate::iter::iterator(tree, prefix, start, direction) + .filter_map(|(key, value)| match value { + WriteOperation::Insert(value) => { + Some((key.clone().into(), value.clone())) + } + WriteOperation::Remove => None, + }) + .map(Ok) + .into_boxed() + } else { + core::iter::empty().into_boxed() + } + } + StorageChanges::ChangesList(changes_list) => { + // We have to clone the prefix and start, because we need to pass them to the iterator + // if someone finds a solution without making it a vec, feel free to contribute :) + let column = column.id(); + let prefix = prefix.map(|prefix| prefix.to_vec()); + let start = start.map(|start| start.to_vec()); + changes_list + .iter() + .filter_map(move |changes| { + if let Some(tree) = changes.get(&column) { + Some( + crate::iter::iterator( + tree, + prefix.as_ref().map(|p| p.as_slice()), + start.as_ref().map(|p| p.as_slice()), + direction, + ) + .filter_map(|(key, value)| match value { + WriteOperation::Insert(value) => { + Some((key.clone().into(), value.clone())) + } + WriteOperation::Remove => None, + }) + .map(Ok), + ) + } else { + None + } + }) + .flatten() + .into_boxed() + } } } @@ -88,16 +140,50 @@ where // because we have to filter out the keys that were removed, which are // marked as `WriteOperation::Remove` in the value // copied as-is from the above function, but only to return keys - if let Some(tree) = self.changes.get(&column.id()) { - crate::iter::iterator(tree, prefix, start, direction) - .filter_map(|(key, value)| match value { - WriteOperation::Insert(_) => Some(key.clone().into()), - WriteOperation::Remove => None, - }) - .map(Ok) - .into_boxed() - } else { - core::iter::empty().into_boxed() + match self.changes { + StorageChanges::Changes(changes) => { + if let Some(tree) = changes.get(&column.id()) { + crate::iter::iterator(tree, prefix, start, direction) + .filter_map(|(key, value)| match value { + WriteOperation::Insert(_) => Some(key.clone().into()), + WriteOperation::Remove => None, + }) + .map(Ok) + .into_boxed() + } else { + core::iter::empty().into_boxed() + } + } + StorageChanges::ChangesList(changes_list) => { + // We have to clone the prefix and start, because we need to pass them to the iterator + // if someone finds a solution without making it a vec, feel free to contribute :) + let column = column.id(); + let prefix = prefix.map(|prefix| prefix.to_vec()); + let start = start.map(|start| start.to_vec()); + changes_list + .iter() + .filter_map(move |changes| { + if let Some(tree) = changes.get(&column) { + Some( + crate::iter::iterator( + tree, + prefix.as_ref().map(|p| p.as_slice()), + start.as_ref().map(|p| p.as_slice()), + direction, + ) + .filter_map(|(key, value)| match value { + WriteOperation::Insert(_) => Some(key.clone().into()), + WriteOperation::Remove => None, + }) + .map(Ok), + ) + } else { + None + } + }) + .flatten() + .into_boxed() + } } } } From b92cbd8dd55c8535a8f9add860266c975b1dda6b Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Fri, 24 Jan 2025 14:49:32 +0100 Subject: [PATCH 05/11] fix compilation relayer feature --- crates/fuel-core/src/database.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 7c7b1699d2c..fc9682bef10 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -422,7 +422,9 @@ impl Modifiable for Database { #[cfg(not(feature = "relayer"))] impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { - commit_changes_with_height_update(self, changes, |_| Ok(vec![])) + commit_changes_with_height_update(self, StorageChanges::Changes(changes), |_| { + Ok(vec![]) + }) } } From 81e7c12030d7ac0e1b8ac42d1e68bd7a8eb6a42c Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Fri, 24 Jan 2025 14:53:05 +0100 Subject: [PATCH 06/11] Fix changelog --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c950d997367..021491818ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Version 0.41.3] +### Added +- [2619](https://github.com/FuelLabs/fuel-core/pull/2691): Add possibility to submit list of changes to rocksdb. + ### Fixed - [2626](https://github.com/FuelLabs/fuel-core/pull/2626): Avoid needs of RocksDB features in tests modules. @@ -21,7 +24,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2551](https://github.com/FuelLabs/fuel-core/pull/2551): Enhanced the DA compressed block header to include block id. -- [2619](https://github.com/FuelLabs/fuel-core/pull/2691): Add possibility to submit list of changes to rocksdb. - [2595](https://github.com/FuelLabs/fuel-core/pull/2595): Added `indexation` field to the `nodeInfo` GraphQL endpoint to allow checking if a specific indexation is enabled. ### Changed From 46c83e493599333c798253116d6ac92f59b837d6 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Fri, 24 Jan 2025 14:53:41 +0100 Subject: [PATCH 07/11] changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 021491818ae..f94203a17a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,11 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] -## [Version 0.41.3] - ### Added - [2619](https://github.com/FuelLabs/fuel-core/pull/2691): Add possibility to submit list of changes to rocksdb. +## [Version 0.41.3] + ### Fixed - [2626](https://github.com/FuelLabs/fuel-core/pull/2626): Avoid needs of RocksDB features in tests modules. From 3a67be51d14f90e6c62640ceb80a62fe796e3d16 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Fri, 24 Jan 2025 15:09:56 +0100 Subject: [PATCH 08/11] Fix test compilation and clippy --- crates/fuel-core/src/executor.rs | 10 ++++---- crates/storage/src/iter/changes_iterator.rs | 28 ++++++--------------- 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/crates/fuel-core/src/executor.rs b/crates/fuel-core/src/executor.rs index b0439aaaad5..33d662b2817 100644 --- a/crates/fuel-core/src/executor.rs +++ b/crates/fuel-core/src/executor.rs @@ -3175,13 +3175,10 @@ mod tests { }; use fuel_core_relayer::storage::EventsHistory; use fuel_core_storage::{ - column::Column, - iter::{ + column::Column, iter::{ changes_iterator::ChangesIterator, IteratorOverTable, - }, - tables::FuelBlocks, - StorageAsMut, + }, tables::FuelBlocks, transactional::StorageChanges, StorageAsMut }; use fuel_core_types::{ entities::RelayedTransaction, @@ -3324,6 +3321,7 @@ mod tests { let (result, changes) = producer.produce_without_commit(block.into())?.into(); // Then + let changes = StorageChanges::Changes(changes); let view = ChangesIterator::::new(&changes); assert_eq!( view.iter_all::(None).count() as u64, @@ -3933,6 +3931,7 @@ mod tests { .into(); // Then + let changes = StorageChanges::Changes(changes); let view = ChangesIterator::::new(&changes); assert!(result.skipped_transactions.is_empty()); assert_eq!(view.iter_all::(None).count() as u64, 0); @@ -3975,6 +3974,7 @@ mod tests { .into(); // Then + let changes = StorageChanges::Changes(changes); let view = ChangesIterator::::new(&changes); assert!(result.skipped_transactions.is_empty()); assert_eq!(view.iter_all::(None).count() as u64, 0); diff --git a/crates/storage/src/iter/changes_iterator.rs b/crates/storage/src/iter/changes_iterator.rs index 914b0ef494c..b46664d21ef 100644 --- a/crates/storage/src/iter/changes_iterator.rs +++ b/crates/storage/src/iter/changes_iterator.rs @@ -103,12 +103,10 @@ where changes_list .iter() .filter_map(move |changes| { - if let Some(tree) = changes.get(&column) { - Some( - crate::iter::iterator( + changes.get(&column).map(|tree| crate::iter::iterator( tree, - prefix.as_ref().map(|p| p.as_slice()), - start.as_ref().map(|p| p.as_slice()), + prefix.as_deref(), + start.as_deref(), direction, ) .filter_map(|(key, value)| match value { @@ -117,11 +115,7 @@ where } WriteOperation::Remove => None, }) - .map(Ok), - ) - } else { - None - } + .map(Ok)) }) .flatten() .into_boxed() @@ -163,23 +157,17 @@ where changes_list .iter() .filter_map(move |changes| { - if let Some(tree) = changes.get(&column) { - Some( - crate::iter::iterator( + changes.get(&column).map(|tree| crate::iter::iterator( tree, - prefix.as_ref().map(|p| p.as_slice()), - start.as_ref().map(|p| p.as_slice()), + prefix.as_deref(), + start.as_deref(), direction, ) .filter_map(|(key, value)| match value { WriteOperation::Insert(_) => Some(key.clone().into()), WriteOperation::Remove => None, }) - .map(Ok), - ) - } else { - None - } + .map(Ok)) }) .flatten() .into_boxed() From 9171473e61671ca43ae765eb560c913f3d5e5429 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Fri, 24 Jan 2025 15:14:57 +0100 Subject: [PATCH 09/11] fmt --- crates/fuel-core/src/executor.rs | 8 +++- crates/storage/src/iter/changes_iterator.rs | 52 +++++++++++---------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/crates/fuel-core/src/executor.rs b/crates/fuel-core/src/executor.rs index 33d662b2817..f78a3932de1 100644 --- a/crates/fuel-core/src/executor.rs +++ b/crates/fuel-core/src/executor.rs @@ -3175,10 +3175,14 @@ mod tests { }; use fuel_core_relayer::storage::EventsHistory; use fuel_core_storage::{ - column::Column, iter::{ + column::Column, + iter::{ changes_iterator::ChangesIterator, IteratorOverTable, - }, tables::FuelBlocks, transactional::StorageChanges, StorageAsMut + }, + tables::FuelBlocks, + transactional::StorageChanges, + StorageAsMut, }; use fuel_core_types::{ entities::RelayedTransaction, diff --git a/crates/storage/src/iter/changes_iterator.rs b/crates/storage/src/iter/changes_iterator.rs index b46664d21ef..17ccd480985 100644 --- a/crates/storage/src/iter/changes_iterator.rs +++ b/crates/storage/src/iter/changes_iterator.rs @@ -103,19 +103,21 @@ where changes_list .iter() .filter_map(move |changes| { - changes.get(&column).map(|tree| crate::iter::iterator( - tree, - prefix.as_deref(), - start.as_deref(), - direction, - ) - .filter_map(|(key, value)| match value { - WriteOperation::Insert(value) => { - Some((key.clone().into(), value.clone())) - } - WriteOperation::Remove => None, - }) - .map(Ok)) + changes.get(&column).map(|tree| { + crate::iter::iterator( + tree, + prefix.as_deref(), + start.as_deref(), + direction, + ) + .filter_map(|(key, value)| match value { + WriteOperation::Insert(value) => { + Some((key.clone().into(), value.clone())) + } + WriteOperation::Remove => None, + }) + .map(Ok) + }) }) .flatten() .into_boxed() @@ -157,17 +159,19 @@ where changes_list .iter() .filter_map(move |changes| { - changes.get(&column).map(|tree| crate::iter::iterator( - tree, - prefix.as_deref(), - start.as_deref(), - direction, - ) - .filter_map(|(key, value)| match value { - WriteOperation::Insert(_) => Some(key.clone().into()), - WriteOperation::Remove => None, - }) - .map(Ok)) + changes.get(&column).map(|tree| { + crate::iter::iterator( + tree, + prefix.as_deref(), + start.as_deref(), + direction, + ) + .filter_map(|(key, value)| match value { + WriteOperation::Insert(_) => Some(key.clone().into()), + WriteOperation::Remove => None, + }) + .map(Ok) + }) }) .flatten() .into_boxed() From 1dfaa47a346a24a4ff1264d53505a469a9079472 Mon Sep 17 00:00:00 2001 From: Mitchell Turner Date: Wed, 29 Jan 2025 16:47:33 -0700 Subject: [PATCH 10/11] Refactor `iter_store_keys` (#2651) ## Description Was looking into: ```rs // We have to clone the prefix and start, because we need to pass them to the iterator // if someone finds a solution without making it a vec, feel free to contribute :) let column = column.id(); let prefix = prefix.map(|prefix| prefix.to_vec()); let start = start.map(|start| start.to_vec()); ``` and did some refactoring to get to the bottom of the problem. I don't know if there is a solution unless we decide to change the signature of `iter_store_keys` to something like: ```rs fn iter_store_keys<'a>( &'a self, column: Self::Column, prefix: Option<&'a [u8]>, start: Option<&'a [u8]>, direction: IterDirection, ) -> BoxedIter; ``` But that's not realistic in this PR. Might as well keep the refactoring because I feel like it's cleaner. --- crates/storage/src/iter/changes_iterator.rs | 79 +++++++++++++-------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/crates/storage/src/iter/changes_iterator.rs b/crates/storage/src/iter/changes_iterator.rs index 17ccd480985..9a982783e85 100644 --- a/crates/storage/src/iter/changes_iterator.rs +++ b/crates/storage/src/iter/changes_iterator.rs @@ -14,8 +14,13 @@ use crate::{ Value, WriteOperation, }, - transactional::StorageChanges, + transactional::{ + Changes, + ReferenceBytesKey, + StorageChanges, + }, }; +use alloc::collections::BTreeMap; /// A type that allows to iterate over the `StorageChanges`. pub struct ChangesIterator<'a, Column> { @@ -137,19 +142,13 @@ where // marked as `WriteOperation::Remove` in the value // copied as-is from the above function, but only to return keys match self.changes { - StorageChanges::Changes(changes) => { - if let Some(tree) = changes.get(&column.id()) { - crate::iter::iterator(tree, prefix, start, direction) - .filter_map(|(key, value)| match value { - WriteOperation::Insert(_) => Some(key.clone().into()), - WriteOperation::Remove => None, - }) - .map(Ok) - .into_boxed() - } else { - core::iter::empty().into_boxed() - } - } + StorageChanges::Changes(changes) => get_insert_keys_from_changes( + changes, + column.id(), + prefix, + start, + direction, + ), StorageChanges::ChangesList(changes_list) => { // We have to clone the prefix and start, because we need to pass them to the iterator // if someone finds a solution without making it a vec, feel free to contribute :) @@ -158,24 +157,46 @@ where let start = start.map(|start| start.to_vec()); changes_list .iter() - .filter_map(move |changes| { - changes.get(&column).map(|tree| { - crate::iter::iterator( - tree, - prefix.as_deref(), - start.as_deref(), - direction, - ) - .filter_map(|(key, value)| match value { - WriteOperation::Insert(_) => Some(key.clone().into()), - WriteOperation::Remove => None, - }) - .map(Ok) - }) + .flat_map(move |changes| { + get_insert_keys_from_changes( + changes, + column, + prefix.as_deref(), + start.as_deref(), + direction, + ) }) - .flatten() .into_boxed() } } } } + +fn get_insert_keys_from_changes<'a>( + changes: &'a Changes, + column_id: u32, + prefix: Option<&[u8]>, + start: Option<&[u8]>, + direction: IterDirection, +) -> BoxedIter<'a, crate::kv_store::KeyItem> { + changes + .get(&column_id) + .map_or(core::iter::empty().into_boxed(), |tree| { + boxed_insert_iter(tree, prefix, start, direction) + }) +} + +fn boxed_insert_iter<'a>( + tree: &'a BTreeMap, + prefix: Option<&[u8]>, + start: Option<&[u8]>, + direction: IterDirection, +) -> BoxedIter<'a, crate::kv_store::KeyItem> { + crate::iter::iterator(tree, prefix, start, direction) + .filter_map(|(key, value)| match value { + WriteOperation::Insert(_) => Some(key.clone().into()), + WriteOperation::Remove => None, + }) + .map(Ok) + .into_boxed() +} From fe267c5f8b5f1a48f262f91a093a98b6d30b660b Mon Sep 17 00:00:00 2001 From: AurelienFT <32803821+AurelienFT@users.noreply.github.com> Date: Tue, 11 Feb 2025 11:01:10 +0100 Subject: [PATCH 11/11] Update CHANGELOG.md --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index da0442d29ea..4ef35be08fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2619](https://github.com/FuelLabs/fuel-core/pull/2691): Add possibility to submit list of changes to rocksdb. -- [2635](https://github.com/FuelLabs/fuel-core/pull/2635): Add metrics to gas price service - [2150](https://github.com/FuelLabs/fuel-core/pull/2150): Upgraded `libp2p` to `0.54.1` and introduced `ConnectionLimiter` to limit pending incoming/outgoing connections. ### Fixed