Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

Patch concurrent update error #458

Open
wants to merge 5 commits into
base: fallback
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions fendermint/vm/topdown/src/finality/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ impl FinalityWithNull {
/// Clear the cache and set the committed finality to the provided value
pub fn reset(&self, finality: IPCParentFinality) -> Stm<()> {
self.cached_data.write(SequentialKeyCache::sequential())?;

if let Some(p) = self.last_committed_finality.read_clone()? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think .read()? should suffice here.

if p.height > finality.height {
tracing::warn!(
last = p.to_string(),
next = finality.to_string(),
"reset to a lower finality"
);
}
}

self.last_committed_finality.write(Some(finality))
}

Expand All @@ -71,6 +82,17 @@ impl FinalityWithNull {
height: BlockHeight,
maybe_payload: Option<ParentViewPayload>,
) -> StmResult<(), Error> {
if let Some(p) = self.last_committed_finality.read_clone()? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if let Some(p) = self.last_committed_finality.read_clone()? {
if let Some(p) = self.last_committed_finality.read()? {

Here too, the value isn't returned, so you can access it through the Arc

if p.height > height {
tracing::debug!(
height,
last_committed_height = p.height,
"no need update parent view"
);
return Ok(());
}
}

if let Some((block_hash, validator_changes, top_down_msgs)) = maybe_payload {
self.parent_block_filled(height, block_hash, validator_changes, top_down_msgs)
} else {
Expand Down
2 changes: 2 additions & 0 deletions fendermint/vm/topdown/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub mod sync;

pub mod convert;
pub mod proxy;
#[cfg(test)]
mod test;
mod toggle;

use async_stm::Stm;
Expand Down
2 changes: 1 addition & 1 deletion fendermint/vm/topdown/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod syncer;
mod tendermint;

use crate::proxy::ParentQueryProxy;
use crate::sync::syncer::LotusParentSyncer;
pub(crate) use crate::sync::syncer::LotusParentSyncer;
use crate::sync::tendermint::TendermintAwareSyncer;
use crate::{CachedFinalityProvider, Config, IPCParentFinality, ParentFinalityProvider, Toggle};
use anyhow::anyhow;
Expand Down
2 changes: 1 addition & 1 deletion fendermint/vm/topdown/src/sync/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;

/// Parent syncer that constantly poll parent. This struct handles lotus null blocks and deferred
/// execution. For ETH based parent, it should work out of the box as well.
pub(crate) struct LotusParentSyncer<T, P> {
pub struct LotusParentSyncer<T, P> {
config: Config,
parent_proxy: Arc<P>,
provider: Arc<Toggle<CachedFinalityProvider<P>>>,
Expand Down
281 changes: 281 additions & 0 deletions fendermint/vm/topdown/src/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::finality::ParentViewPayload;
use crate::proxy::ParentQueryProxy;
use crate::sync::{LotusParentSyncer, ParentFinalityStateQuery};
use crate::{
BlockHeight, CachedFinalityProvider, Config, IPCParentFinality, ParentFinalityProvider,
ParentViewProvider, SequentialKeyCache, Toggle, NULL_ROUND_ERR_MSG,
};
use anyhow::anyhow;
use async_stm::atomically;
use async_trait::async_trait;
use ipc_provider::manager::{GetBlockHashResult, TopDownQueryPayload};
use ipc_sdk::cross::CrossMsg;
use ipc_sdk::staking::StakingChangeRequest;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

struct TestParentFinalityStateQuery {
latest_finality: IPCParentFinality,
}

impl ParentFinalityStateQuery for TestParentFinalityStateQuery {
fn get_latest_committed_finality(&self) -> anyhow::Result<Option<IPCParentFinality>> {
Ok(Some(self.latest_finality.clone()))
}
}

/// Creates a mock of a new parent blockchain view. The key is the height and the value is the
/// block hash. If block hash is None, it means the current height is a null block.
macro_rules! new_parent_blocks {
($($key:expr => $val:expr),* ,) => (
hash_map!($($key => $val),*)
);
($($key:expr => $val:expr),*) => ({
let mut map = SequentialKeyCache::sequential();
$( map.append($key, $val).unwrap(); )*
map
});
}

struct TestParentProxy {
blocks: SequentialKeyCache<BlockHeight, Option<ParentViewPayload>>,
should_wait: Arc<AtomicBool>,
tx: tokio::sync::mpsc::Sender<u8>,
}

#[async_trait]
impl ParentQueryProxy for TestParentProxy {
async fn get_chain_head_height(&self) -> anyhow::Result<BlockHeight> {
Ok(self.blocks.upper_bound().unwrap())
}

async fn get_genesis_epoch(&self) -> anyhow::Result<BlockHeight> {
Ok(self.blocks.lower_bound().unwrap() - 1)
}

async fn get_block_hash(&self, height: BlockHeight) -> anyhow::Result<GetBlockHashResult> {
let r = self.blocks.get_value(height).unwrap();
if r.is_none() {
return Err(anyhow!(NULL_ROUND_ERR_MSG));
}

for h in (self.blocks.lower_bound().unwrap()..height).rev() {
let v = self.blocks.get_value(h).unwrap();
if v.is_none() {
continue;
}
return Ok(GetBlockHashResult {
parent_block_hash: v.clone().unwrap().0,
block_hash: r.clone().unwrap().0,
});
}
panic!("invalid testing data")
}

async fn get_top_down_msgs(
&self,
height: BlockHeight,
) -> anyhow::Result<TopDownQueryPayload<Vec<CrossMsg>>> {
while self.should_wait.load(Ordering::SeqCst) {
self.tx.send(0).await?;
tokio::time::sleep(Duration::from_millis(10)).await;
}

let r = self.blocks.get_value(height).cloned().unwrap();
if r.is_none() {
return Err(anyhow!(NULL_ROUND_ERR_MSG));
}
let r = r.unwrap();
Ok(TopDownQueryPayload {
value: r.2,
block_hash: r.0,
})
}

async fn get_validator_changes(
&self,
height: BlockHeight,
) -> anyhow::Result<TopDownQueryPayload<Vec<StakingChangeRequest>>> {
let r = self.blocks.get_value(height).cloned().unwrap();
if r.is_none() {
return Err(anyhow!(NULL_ROUND_ERR_MSG));
}
let r = r.unwrap();
Ok(TopDownQueryPayload {
value: r.1,
block_hash: r.0,
})
}
}

async fn new_setup(
blocks: SequentialKeyCache<BlockHeight, Option<ParentViewPayload>>,
should_wait: Arc<AtomicBool>,
tx: tokio::sync::mpsc::Sender<u8>,
) -> (
Arc<Toggle<CachedFinalityProvider<TestParentProxy>>>,
LotusParentSyncer<TestParentFinalityStateQuery, TestParentProxy>,
) {
let config = Config {
chain_head_delay: 2,
polling_interval: Default::default(),
exponential_back_off: Default::default(),
exponential_retry_limit: 0,
max_proposal_range: Some(10),
max_cache_blocks: None,
proposal_delay: None,
};
let genesis_epoch = blocks.lower_bound().unwrap();
let proxy = Arc::new(TestParentProxy {
blocks,
should_wait,
tx,
});
let committed_finality = IPCParentFinality {
height: genesis_epoch,
block_hash: vec![0; 32],
};

let provider = CachedFinalityProvider::new(
config.clone(),
genesis_epoch,
Some(committed_finality.clone()),
proxy.clone(),
);
let provider = Arc::new(Toggle::enabled(provider));

let syncer = LotusParentSyncer::new(
config,
proxy,
provider.clone(),
Arc::new(TestParentFinalityStateQuery {
latest_finality: committed_finality,
}),
);

(provider, syncer)
}

/// This test case is for when pulling the next block, there is a new commit finality request
/// and all cache is purged.
#[tokio::test]
async fn while_syncing_cache_purged() {
let parent_blocks = new_parent_blocks!(
100 => Some((vec![0; 32], vec![], vec![])), // genesis block
101 => Some((vec![1; 32], vec![], vec![])),
102 => Some((vec![2; 32], vec![], vec![])),
103 => Some((vec![3; 32], vec![], vec![])),
104 => Some((vec![4; 32], vec![], vec![])),
105 => Some((vec![5; 32], vec![], vec![])),
106 => Some((vec![6; 32], vec![], vec![])),
107 => Some((vec![6; 32], vec![], vec![])),
108 => Some((vec![6; 32], vec![], vec![])),
109 => Some((vec![6; 32], vec![], vec![])),
110 => Some((vec![6; 32], vec![], vec![]))
);
let should_wait = Arc::new(AtomicBool::new(false));
let (tx, mut rx) = tokio::sync::mpsc::channel(10);

let (provider, mut syncer) = new_setup(parent_blocks, should_wait.clone(), tx).await;
syncer.sync().await.unwrap();
syncer.sync().await.unwrap();
syncer.sync().await.unwrap();
assert_eq!(
atomically(|| provider.next_proposal()).await,
Some(IPCParentFinality {
height: 101,
block_hash: vec![1; 32]
}),
"sanity check, make sure there is data in cache"
);

let syncer = Arc::new(Mutex::new(syncer));
// now make sure we are "waiting" for polling top down messages, like in real io
should_wait.store(true, Ordering::SeqCst);
let cloned_syncer = syncer.clone();
let handle = tokio::spawn(async move {
let mut syncer = cloned_syncer.lock().await;
syncer.sync().await.unwrap();
});

loop {
if (rx.recv().await).is_some() {
// syncer.sync is waiting, we mock a new proposal from peer
atomically(|| {
provider.set_new_finality(
IPCParentFinality {
height: 105,
block_hash: vec![5; 32],
},
Some(IPCParentFinality {
height: 100,
block_hash: vec![0; 32],
}),
)
})
.await;
should_wait.store(false, Ordering::SeqCst);
break;
}
}
handle.await.unwrap();
assert_eq!(
atomically(|| provider.next_proposal()).await,
None,
"cache should be evicted, so proposal should be made"
);

atomically(|| {
assert_eq!(Some(105), provider.latest_height()?);
assert_eq!(
Some(IPCParentFinality {
height: 105,
block_hash: vec![5; 32]
}),
provider.last_committed_finality()?
);
Ok(())
})
.await;

assert_eq!(
provider
.validator_changes_from(105, 105)
.await
.unwrap()
.len(),
0
);

// make sure syncer still works
let mut syncer = syncer.lock().await;
syncer.sync().await.unwrap();

atomically(|| {
assert_eq!(Some(106), provider.latest_height()?);
assert_eq!(
Some(IPCParentFinality {
height: 105,
block_hash: vec![5; 32]
}),
provider.last_committed_finality()?
);
Ok(())
})
.await;

syncer.sync().await.unwrap();
syncer.sync().await.unwrap();
assert_eq!(
atomically(|| provider.next_proposal()).await,
Some(IPCParentFinality {
height: 106,
block_hash: vec![6; 32]
}),
);
}
Loading