This repository has been archived by the owner on Jan 11, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
Patch concurrent update error #458
Open
cryptoAtwill
wants to merge
5
commits into
fallback
Choose a base branch
from
patch-concurrent-update
base: fallback
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 3 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -63,6 +63,17 @@ impl FinalityWithNull { | |||||
/// Clear the cache and set the committed finality to the provided value | ||||||
pub fn reset(&self, finality: IPCParentFinality) -> Stm<()> { | ||||||
self.cached_data.write(SequentialKeyCache::sequential())?; | ||||||
|
||||||
if let Some(p) = self.last_committed_finality.read_clone()? { | ||||||
if p.height > finality.height { | ||||||
tracing::warn!( | ||||||
last = p.to_string(), | ||||||
next = finality.to_string(), | ||||||
"reset to a lower finality" | ||||||
); | ||||||
} | ||||||
} | ||||||
|
||||||
self.last_committed_finality.write(Some(finality)) | ||||||
} | ||||||
|
||||||
|
@@ -71,6 +82,17 @@ impl FinalityWithNull { | |||||
height: BlockHeight, | ||||||
maybe_payload: Option<ParentViewPayload>, | ||||||
) -> StmResult<(), Error> { | ||||||
if let Some(p) = self.last_committed_finality.read_clone()? { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Here too, the value isn't returned, so you can access it through the |
||||||
if p.height > height { | ||||||
tracing::debug!( | ||||||
height, | ||||||
last_committed_height = p.height, | ||||||
"no need update parent view" | ||||||
); | ||||||
return Ok(()); | ||||||
} | ||||||
} | ||||||
|
||||||
if let Some((block_hash, validator_changes, top_down_msgs)) = maybe_payload { | ||||||
self.parent_block_filled(height, block_hash, validator_changes, top_down_msgs) | ||||||
} else { | ||||||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ pub mod sync; | |
|
||
pub mod convert; | ||
pub mod proxy; | ||
#[cfg(test)] | ||
mod test; | ||
mod toggle; | ||
|
||
use async_stm::Stm; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
// Copyright 2022-2023 Protocol Labs | ||
// SPDX-License-Identifier: Apache-2.0, MIT | ||
|
||
use crate::finality::ParentViewPayload; | ||
use crate::proxy::ParentQueryProxy; | ||
use crate::sync::{LotusParentSyncer, ParentFinalityStateQuery}; | ||
use crate::{ | ||
BlockHeight, CachedFinalityProvider, Config, IPCParentFinality, ParentFinalityProvider, | ||
ParentViewProvider, SequentialKeyCache, Toggle, NULL_ROUND_ERR_MSG, | ||
}; | ||
use anyhow::anyhow; | ||
use async_stm::atomically; | ||
use async_trait::async_trait; | ||
use ipc_provider::manager::{GetBlockHashResult, TopDownQueryPayload}; | ||
use ipc_sdk::cross::CrossMsg; | ||
use ipc_sdk::staking::StakingChangeRequest; | ||
use std::sync::atomic::{AtomicBool, Ordering}; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
use tokio::sync::Mutex; | ||
|
||
struct TestParentFinalityStateQuery { | ||
latest_finality: IPCParentFinality, | ||
} | ||
|
||
impl ParentFinalityStateQuery for TestParentFinalityStateQuery { | ||
fn get_latest_committed_finality(&self) -> anyhow::Result<Option<IPCParentFinality>> { | ||
Ok(Some(self.latest_finality.clone())) | ||
} | ||
} | ||
|
||
/// Creates a mock of a new parent blockchain view. The key is the height and the value is the | ||
/// block hash. If block hash is None, it means the current height is a null block. | ||
macro_rules! new_parent_blocks { | ||
($($key:expr => $val:expr),* ,) => ( | ||
hash_map!($($key => $val),*) | ||
); | ||
($($key:expr => $val:expr),*) => ({ | ||
let mut map = SequentialKeyCache::sequential(); | ||
$( map.append($key, $val).unwrap(); )* | ||
map | ||
}); | ||
} | ||
|
||
struct TestParentProxy { | ||
blocks: SequentialKeyCache<BlockHeight, Option<ParentViewPayload>>, | ||
should_wait: Arc<AtomicBool>, | ||
tx: tokio::sync::mpsc::Sender<u8>, | ||
} | ||
|
||
#[async_trait] | ||
impl ParentQueryProxy for TestParentProxy { | ||
async fn get_chain_head_height(&self) -> anyhow::Result<BlockHeight> { | ||
Ok(self.blocks.upper_bound().unwrap()) | ||
} | ||
|
||
async fn get_genesis_epoch(&self) -> anyhow::Result<BlockHeight> { | ||
Ok(self.blocks.lower_bound().unwrap() - 1) | ||
} | ||
|
||
async fn get_block_hash(&self, height: BlockHeight) -> anyhow::Result<GetBlockHashResult> { | ||
let r = self.blocks.get_value(height).unwrap(); | ||
if r.is_none() { | ||
return Err(anyhow!(NULL_ROUND_ERR_MSG)); | ||
} | ||
|
||
for h in (self.blocks.lower_bound().unwrap()..height).rev() { | ||
let v = self.blocks.get_value(h).unwrap(); | ||
if v.is_none() { | ||
continue; | ||
} | ||
return Ok(GetBlockHashResult { | ||
parent_block_hash: v.clone().unwrap().0, | ||
block_hash: r.clone().unwrap().0, | ||
}); | ||
} | ||
panic!("invalid testing data") | ||
} | ||
|
||
async fn get_top_down_msgs( | ||
&self, | ||
height: BlockHeight, | ||
) -> anyhow::Result<TopDownQueryPayload<Vec<CrossMsg>>> { | ||
while self.should_wait.load(Ordering::SeqCst) { | ||
self.tx.send(0).await?; | ||
tokio::time::sleep(Duration::from_millis(10)).await; | ||
} | ||
|
||
let r = self.blocks.get_value(height).cloned().unwrap(); | ||
if r.is_none() { | ||
return Err(anyhow!(NULL_ROUND_ERR_MSG)); | ||
} | ||
let r = r.unwrap(); | ||
Ok(TopDownQueryPayload { | ||
value: r.2, | ||
block_hash: r.0, | ||
}) | ||
} | ||
|
||
async fn get_validator_changes( | ||
&self, | ||
height: BlockHeight, | ||
) -> anyhow::Result<TopDownQueryPayload<Vec<StakingChangeRequest>>> { | ||
let r = self.blocks.get_value(height).cloned().unwrap(); | ||
if r.is_none() { | ||
return Err(anyhow!(NULL_ROUND_ERR_MSG)); | ||
} | ||
let r = r.unwrap(); | ||
Ok(TopDownQueryPayload { | ||
value: r.1, | ||
block_hash: r.0, | ||
}) | ||
} | ||
} | ||
|
||
async fn new_setup( | ||
blocks: SequentialKeyCache<BlockHeight, Option<ParentViewPayload>>, | ||
should_wait: Arc<AtomicBool>, | ||
tx: tokio::sync::mpsc::Sender<u8>, | ||
) -> ( | ||
Arc<Toggle<CachedFinalityProvider<TestParentProxy>>>, | ||
LotusParentSyncer<TestParentFinalityStateQuery, TestParentProxy>, | ||
) { | ||
let config = Config { | ||
chain_head_delay: 2, | ||
polling_interval: Default::default(), | ||
exponential_back_off: Default::default(), | ||
exponential_retry_limit: 0, | ||
max_proposal_range: Some(10), | ||
max_cache_blocks: None, | ||
proposal_delay: None, | ||
}; | ||
let genesis_epoch = blocks.lower_bound().unwrap(); | ||
let proxy = Arc::new(TestParentProxy { | ||
blocks, | ||
should_wait, | ||
tx, | ||
}); | ||
let committed_finality = IPCParentFinality { | ||
height: genesis_epoch, | ||
block_hash: vec![0; 32], | ||
}; | ||
|
||
let provider = CachedFinalityProvider::new( | ||
config.clone(), | ||
genesis_epoch, | ||
Some(committed_finality.clone()), | ||
proxy.clone(), | ||
); | ||
let provider = Arc::new(Toggle::enabled(provider)); | ||
|
||
let syncer = LotusParentSyncer::new( | ||
config, | ||
proxy, | ||
provider.clone(), | ||
Arc::new(TestParentFinalityStateQuery { | ||
latest_finality: committed_finality, | ||
}), | ||
); | ||
|
||
(provider, syncer) | ||
} | ||
|
||
/// This test case is for when pulling the next block, there is a new commit finality request | ||
/// and all cache is purged. | ||
#[tokio::test] | ||
async fn while_syncing_cache_purged() { | ||
let parent_blocks = new_parent_blocks!( | ||
100 => Some((vec![0; 32], vec![], vec![])), // genesis block | ||
101 => Some((vec![1; 32], vec![], vec![])), | ||
102 => Some((vec![2; 32], vec![], vec![])), | ||
103 => Some((vec![3; 32], vec![], vec![])), | ||
104 => Some((vec![4; 32], vec![], vec![])), | ||
105 => Some((vec![5; 32], vec![], vec![])), | ||
106 => Some((vec![6; 32], vec![], vec![])), | ||
107 => Some((vec![6; 32], vec![], vec![])), | ||
108 => Some((vec![6; 32], vec![], vec![])), | ||
109 => Some((vec![6; 32], vec![], vec![])), | ||
110 => Some((vec![6; 32], vec![], vec![])) | ||
); | ||
let should_wait = Arc::new(AtomicBool::new(false)); | ||
let (tx, mut rx) = tokio::sync::mpsc::channel(10); | ||
|
||
let (provider, mut syncer) = new_setup(parent_blocks, should_wait.clone(), tx).await; | ||
syncer.sync().await.unwrap(); | ||
syncer.sync().await.unwrap(); | ||
syncer.sync().await.unwrap(); | ||
assert_eq!( | ||
atomically(|| provider.next_proposal()).await, | ||
Some(IPCParentFinality { | ||
height: 101, | ||
block_hash: vec![1; 32] | ||
}), | ||
"sanity check, make sure there is data in cache" | ||
); | ||
|
||
let syncer = Arc::new(Mutex::new(syncer)); | ||
// now make sure we are "waiting" for polling top down messages, like in real io | ||
should_wait.store(true, Ordering::SeqCst); | ||
let cloned_syncer = syncer.clone(); | ||
let handle = tokio::spawn(async move { | ||
let mut syncer = cloned_syncer.lock().await; | ||
syncer.sync().await.unwrap(); | ||
}); | ||
|
||
loop { | ||
if (rx.recv().await).is_some() { | ||
// syncer.sync is waiting, we mock a new proposal from peer | ||
atomically(|| { | ||
provider.set_new_finality( | ||
IPCParentFinality { | ||
height: 105, | ||
block_hash: vec![5; 32], | ||
}, | ||
Some(IPCParentFinality { | ||
height: 100, | ||
block_hash: vec![0; 32], | ||
}), | ||
) | ||
}) | ||
.await; | ||
should_wait.store(false, Ordering::SeqCst); | ||
break; | ||
} | ||
} | ||
handle.await.unwrap(); | ||
assert_eq!( | ||
atomically(|| provider.next_proposal()).await, | ||
None, | ||
"cache should be evicted, so proposal should be made" | ||
); | ||
|
||
atomically(|| { | ||
assert_eq!(Some(105), provider.latest_height()?); | ||
assert_eq!( | ||
Some(IPCParentFinality { | ||
height: 105, | ||
block_hash: vec![5; 32] | ||
}), | ||
provider.last_committed_finality()? | ||
); | ||
Ok(()) | ||
}) | ||
.await; | ||
|
||
assert_eq!( | ||
provider | ||
.validator_changes_from(105, 105) | ||
.await | ||
.unwrap() | ||
.len(), | ||
0 | ||
); | ||
|
||
// make sure syncer still works | ||
let mut syncer = syncer.lock().await; | ||
syncer.sync().await.unwrap(); | ||
|
||
atomically(|| { | ||
assert_eq!(Some(106), provider.latest_height()?); | ||
assert_eq!( | ||
Some(IPCParentFinality { | ||
height: 105, | ||
block_hash: vec![5; 32] | ||
}), | ||
provider.last_committed_finality()? | ||
); | ||
Ok(()) | ||
}) | ||
.await; | ||
|
||
syncer.sync().await.unwrap(); | ||
syncer.sync().await.unwrap(); | ||
assert_eq!( | ||
atomically(|| provider.next_proposal()).await, | ||
Some(IPCParentFinality { | ||
height: 106, | ||
block_hash: vec![6; 32] | ||
}), | ||
); | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
.read()?
should suffice here.