Skip to content

Commit 10b55c9

Browse files
committed
feat: implement a resource pool
1 parent a3cd906 commit 10b55c9

File tree

3 files changed

+310
-11
lines changed

3 files changed

+310
-11
lines changed

mithril-aggregator/src/services/prover.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use std::{
44
collections::{BTreeMap, BTreeSet, HashMap},
55
sync::Arc,
66
};
7-
use tokio::sync::Mutex;
87

98
use mithril_common::{
109
crypto_helper::{MKMap, MKMapNode, MKTree},
1110
entities::{
1211
BlockRange, CardanoDbBeacon, CardanoTransaction, CardanoTransactionsSetProof,
1312
TransactionHash,
1413
},
14+
resource_pool::ResourcePool,
1515
signable_builder::BlockRangeRootRetriever,
1616
StdResult,
1717
};
@@ -62,7 +62,7 @@ pub trait TransactionsRetriever: Sync + Send {
6262
pub struct MithrilProverService {
6363
transaction_retriever: Arc<dyn TransactionsRetriever>,
6464
block_range_root_retriever: Arc<dyn BlockRangeRootRetriever>,
65-
mk_map_cache: Mutex<Option<MKMap<BlockRange, MKMapNode<BlockRange>>>>,
65+
mk_map_cache: ResourcePool<MKMap<BlockRange, MKMapNode<BlockRange>>>,
6666
}
6767

6868
impl MithrilProverService {
@@ -74,7 +74,7 @@ impl MithrilProverService {
7474
Self {
7575
transaction_retriever,
7676
block_range_root_retriever,
77-
mk_map_cache: Mutex::new(None),
77+
mk_map_cache: ResourcePool::default(),
7878
}
7979
}
8080

@@ -139,9 +139,9 @@ impl ProverService for MithrilProverService {
139139
let mk_trees = BTreeMap::from_iter(mk_trees?);
140140

141141
// 3 - Compute block range roots Merkle map
142+
// TODO: the cache computation should be done in the state machine only when new artifact is produced and at node startup
142143
self.compute_cache(up_to).await?;
143-
let mut mk_map = self.mk_map_cache.lock().await;
144-
let mk_map = mk_map.as_mut().unwrap();
144+
let mut mk_map = self.mk_map_cache.acquire_resource();
145145

146146
// 4 - Enrich the Merkle map with the block ranges Merkle trees
147147
for (block_range, mk_tree) in mk_trees {
@@ -150,6 +150,9 @@ impl ProverService for MithrilProverService {
150150

151151
// 5 - Compute the proof for all transactions
152152
if let Ok(mk_proof) = mk_map.compute_proof(transaction_hashes) {
153+
self.mk_map_cache
154+
.return_resource(mk_map.into_inner(), mk_map.discriminant());
155+
153156
let transaction_hashes_certified: Vec<TransactionHash> = transaction_hashes
154157
.iter()
155158
.filter(|hash| mk_proof.contains(&hash.as_str().into()).is_ok())
@@ -166,22 +169,30 @@ impl ProverService for MithrilProverService {
166169
}
167170

168171
async fn compute_cache(&self, up_to: &CardanoDbBeacon) -> StdResult<()> {
169-
let mut mk_map = self.mk_map_cache.lock().await;
170-
if mk_map.is_none() {
171-
println!("Computing Merkle map from block range roots");
172+
if self.mk_map_cache.count() == 0 {
173+
println!("Computing Merkle map cache from block range roots");
174+
172175
let mk_map_cache = self
173176
.block_range_root_retriever
174177
.compute_merkle_map_from_block_range_roots(up_to.immutable_file_number)
175178
.await?;
176-
mk_map.replace(mk_map_cache);
179+
let discriminant_new = self.mk_map_cache.discriminant() + 1;
180+
self.mk_map_cache.set_discriminant(discriminant_new);
181+
for i in 0..10 {
182+
println!("Computing Merkle map cache from block range roots: {}", i);
183+
self.mk_map_cache
184+
.return_resource(mk_map_cache.clone(), discriminant_new);
185+
}
186+
self.mk_map_cache
187+
.return_resource(mk_map_cache, discriminant_new);
188+
println!("Done computing Merkle map cache from block range roots");
177189
}
178190

179191
Ok(())
180192
}
181193

182194
async fn clear_cache(&self) -> StdResult<()> {
183-
let mut mk_map = self.mk_map_cache.lock().await;
184-
mk_map.take();
195+
self.mk_map_cache.drain();
185196

186197
Ok(())
187198
}

mithril-common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub mod entities;
5959
pub mod era;
6060
pub mod messages;
6161
pub mod protocol;
62+
pub mod resource_pool;
6263
pub mod signable_builder;
6364

6465
cfg_test_tools! {

mithril-common/src/resource_pool.rs

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
//! Resource pool implementation
2+
// TODO: fix tests
3+
use anyhow::anyhow;
4+
use std::{
5+
collections::VecDeque,
6+
ops::{Deref, DerefMut},
7+
sync::{Condvar, Mutex},
8+
time::Duration,
9+
};
10+
11+
use crate::StdResult;
12+
13+
/// Resource pool FIFO implementation
14+
pub struct ResourcePool<T: Send + Sync> {
15+
/// The size of the pool
16+
size: usize,
17+
18+
/// Discriminant for the resource pool to check if a returned resource is stale
19+
discriminant: Mutex<u64>,
20+
21+
/// Resources in the pool
22+
resources: Mutex<VecDeque<T>>,
23+
24+
/// Condition variable to notify when a resource is available
25+
not_empty: Condvar,
26+
}
27+
28+
impl<T: Send + Sync> ResourcePool<T> {
29+
/// Create a new resource pool
30+
pub fn new(pool_size: usize, resources: Vec<T>) -> Self {
31+
Self {
32+
size: pool_size,
33+
discriminant: Mutex::new(0),
34+
resources: Mutex::new(resources.into()),
35+
not_empty: Condvar::new(),
36+
}
37+
}
38+
39+
/// Acquire a resource from the pool
40+
pub fn acquire_resource(&self) -> ResourcePoolItem<'_, T> {
41+
self.acquire_resource_with_timeout(Duration::MAX).unwrap()
42+
}
43+
44+
/// Acquire a resource from the pool with a timeout
45+
pub fn acquire_resource_with_timeout(
46+
&self,
47+
duration: Duration,
48+
) -> StdResult<ResourcePoolItem<'_, T>> {
49+
let mut resources = self.resources.lock().unwrap();
50+
while resources.is_empty() {
51+
let (resources_locked, timeout) =
52+
self.not_empty.wait_timeout(resources, duration).unwrap();
53+
if timeout.timed_out() {
54+
return Err(anyhow!("Acquire resource has timed out"));
55+
}
56+
resources = resources_locked;
57+
}
58+
59+
Ok(ResourcePoolItem::new(self, resources.pop_front().unwrap()))
60+
}
61+
62+
/// Return a resource to the pool
63+
/// A resource is returned to the pool only if the discriminant matches
64+
/// and if the pool is not already full
65+
pub fn return_resource(&self, resource: T, discriminant: u64) {
66+
if self.count() == self.size {
67+
return;
68+
}
69+
let mut resources = self.resources.lock().unwrap();
70+
if self.discriminant() != discriminant {
71+
return;
72+
}
73+
resources.push_back(resource);
74+
self.not_empty.notify_one();
75+
}
76+
77+
/// Drain the pool
78+
pub fn drain(&self) {
79+
let mut resources = self.resources.lock().unwrap();
80+
let _ = resources.drain(..).collect::<Vec<_>>();
81+
}
82+
83+
/// Get the discriminant of the resource pool item
84+
pub fn discriminant(&self) -> u64 {
85+
*self.discriminant.lock().unwrap()
86+
}
87+
88+
/// Set the discriminant of the resource pool item
89+
pub fn set_discriminant(&self, discriminant: u64) {
90+
*self.discriminant.lock().unwrap() = discriminant;
91+
}
92+
93+
/// Count the resources in the pool
94+
pub fn count(&self) -> usize {
95+
self.resources.lock().unwrap().len()
96+
}
97+
98+
/// Size of the resource pool
99+
pub fn size(&self) -> usize {
100+
self.size
101+
}
102+
}
103+
104+
impl<T: Send + Sync> Default for ResourcePool<T> {
105+
fn default() -> Self {
106+
Self::new(10, vec![])
107+
}
108+
}
109+
110+
/// Resource pool item which will return the resource to the pool when dropped
111+
pub struct ResourcePoolItem<'a, T: Send + Sync> {
112+
resource_pool: &'a ResourcePool<T>,
113+
discriminant: u64,
114+
resource: Option<T>,
115+
}
116+
117+
impl<'a, T: Send + Sync> ResourcePoolItem<'a, T> {
118+
/// Create a new resource pool item
119+
pub fn new(resource_pool: &'a ResourcePool<T>, resource: T) -> Self {
120+
let discriminant = *resource_pool.discriminant.lock().unwrap();
121+
Self {
122+
resource_pool,
123+
discriminant,
124+
resource: Some(resource),
125+
}
126+
}
127+
128+
/// Get the discriminant of the resource pool item
129+
pub fn discriminant(&self) -> u64 {
130+
self.discriminant
131+
}
132+
133+
/// Get a reference to the inner resource
134+
pub fn resource(&self) -> &T {
135+
self.resource.as_ref().unwrap()
136+
}
137+
138+
/// Take the inner resource
139+
pub fn into_inner(&mut self) -> T {
140+
self.resource.take().unwrap()
141+
}
142+
}
143+
144+
impl<T: Send + Sync> Deref for ResourcePoolItem<'_, T> {
145+
type Target = T;
146+
147+
fn deref(&self) -> &T {
148+
self.resource.as_ref().unwrap()
149+
}
150+
}
151+
152+
impl<T: Send + Sync> DerefMut for ResourcePoolItem<'_, T> {
153+
fn deref_mut(&mut self) -> &mut T {
154+
self.resource.as_mut().unwrap()
155+
}
156+
}
157+
158+
impl<T: Send + Sync> Drop for ResourcePoolItem<'_, T> {
159+
fn drop(&mut self) {
160+
let resource = self.into_inner();
161+
self.resource_pool
162+
.return_resource(resource, self.discriminant);
163+
}
164+
}
165+
166+
#[cfg(test)]
167+
mod tests {
168+
use std::time::Duration;
169+
170+
use super::*;
171+
172+
#[test]
173+
fn test_resource_pool_acquire_returns_resource_when_available() {
174+
let pool_size = 10;
175+
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
176+
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
177+
178+
let mut resources_items = vec![];
179+
for _ in 0..pool_size {
180+
let resource_item = pool
181+
.acquire_resource_with_timeout(Duration::from_millis(1000))
182+
.unwrap();
183+
resources_items.push(resource_item);
184+
}
185+
let resources_result = resources_items
186+
.iter_mut()
187+
.map(|resource_item| resource_item.resource().to_owned())
188+
.collect::<Vec<_>>();
189+
190+
assert_eq!(resources_expected, resources_result);
191+
assert_eq!(pool.count(), 0);
192+
}
193+
194+
#[tokio::test]
195+
async fn test_resource_pool_acquire_locks_when_no_resource_available() {
196+
let pool_size = 10;
197+
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
198+
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
199+
200+
let mut resources_items = vec![];
201+
for _ in 0..pool_size {
202+
let resource_item = pool
203+
.acquire_resource_with_timeout(Duration::from_millis(1000))
204+
.unwrap();
205+
resources_items.push(resource_item);
206+
}
207+
208+
assert!(pool
209+
.acquire_resource_with_timeout(Duration::from_millis(1000))
210+
.is_err());
211+
}
212+
213+
#[tokio::test]
214+
async fn test_resource_pool_drains_successfully() {
215+
let pool_size = 10;
216+
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
217+
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
218+
assert_eq!(pool.count(), pool_size);
219+
220+
pool.drain();
221+
222+
assert_eq!(pool.count(), 0);
223+
}
224+
225+
#[tokio::test]
226+
async fn test_resource_pool_returns_fresh_resource() {
227+
let pool_size = 10;
228+
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
229+
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
230+
assert_eq!(pool.count(), pool_size);
231+
232+
let mut resource_item = pool
233+
.acquire_resource_with_timeout(Duration::from_millis(1000))
234+
.unwrap();
235+
assert_eq!(pool.count(), pool_size - 1);
236+
pool.return_resource(resource_item.into_inner(), pool.discriminant());
237+
238+
assert_eq!(pool.count(), pool_size);
239+
}
240+
241+
#[tokio::test]
242+
async fn test_resource_pool_returns_automatically_resource() {
243+
let pool_size = 10;
244+
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
245+
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
246+
assert_eq!(pool.count(), pool_size);
247+
248+
{
249+
let _resource_item = pool
250+
.acquire_resource_with_timeout(Duration::from_millis(1000))
251+
.unwrap();
252+
assert_eq!(pool.count(), pool_size - 1);
253+
}
254+
255+
assert_eq!(pool.count(), pool_size);
256+
}
257+
258+
#[tokio::test]
259+
async fn test_resource_pool_does_not_return_resource_when_pool_is_full() {
260+
let pool_size = 10;
261+
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
262+
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
263+
assert_eq!(pool.count(), pool_size);
264+
265+
pool.return_resource("resource".to_string(), pool.discriminant());
266+
267+
assert_eq!(pool.count(), pool_size);
268+
}
269+
270+
#[tokio::test]
271+
async fn test_resource_pool_does_not_return_stale_resource() {
272+
let pool_size = 10;
273+
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
274+
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
275+
assert_eq!(pool.count(), pool_size);
276+
277+
let mut resource_item = pool
278+
.acquire_resource_with_timeout(Duration::from_millis(1000))
279+
.unwrap();
280+
assert_eq!(pool.count(), pool_size - 1);
281+
let discriminant_stale = pool.discriminant();
282+
pool.set_discriminant(pool.discriminant() + 1);
283+
pool.return_resource(resource_item.into_inner(), discriminant_stale);
284+
285+
assert_eq!(pool.count(), pool_size - 1);
286+
}
287+
}

0 commit comments

Comments
 (0)