Skip to content

Commit

Permalink
itf runner: initialize actual state using expected state
Browse files Browse the repository at this point in the history
* adde spec to impl type converters

* extract dummy data generators to utils

* delete unnecessary comments

* rename test
  • Loading branch information
stojanovic00 committed Feb 4, 2025
1 parent 2dc9e42 commit 096457e
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 40 deletions.
2 changes: 1 addition & 1 deletion code/crates/starknet/host/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<T> Default for MinHeap<T> {
}

impl<T> MinHeap<T> {
fn push(&mut self, msg: StreamMessage<T>) {
pub fn push(&mut self, msg: StreamMessage<T>) {
self.0.push(MinSeq(msg));
}

Expand Down
3 changes: 1 addition & 2 deletions code/crates/test/mbt/src/deserializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ where
}
}

// I don't know if this is the right way
// Quint specification has its own Option type that is treated as enum in rust
// so I had to extract message from it and convert it to rust's Option type
// so message has to be extracted from it and be converted to rust's Option type
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[serde(tag = "tag", content = "value")]
enum MessageOption {
Expand Down
2 changes: 1 addition & 1 deletion code/crates/test/mbt/src/tests/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub mod utils;
const SHA2_256: u64 = 0x12;

#[test]
fn test_itf() {
fn test_mbt_part_streaming() {
let temp_dir = tempfile::TempDir::with_prefix("informalsystems-malachitebft-part-streaming")
.expect("Failed to create temp dir");
let temp_path = temp_dir.path().to_owned();
Expand Down
54 changes: 20 additions & 34 deletions code/crates/test/mbt/src/tests/streaming/runner.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use super::utils;
use crate::streaming::{MessageType, State as SpecificationState};
use itf::Runner as ItfRunner;
use malachitebft_core_types::Round;
use malachitebft_engine::util::streaming::{StreamContent, StreamMessage};
use malachitebft_peer::PeerId;
use malachitebft_starknet_host::{
streaming::{PartStreamsMap, StreamState as StreamStateImpl},
types::{Address, Height, ProposalInit, ProposalPart, Transaction, Transactions},
types::ProposalPart,
};

pub struct StreamingRunner {
Expand All @@ -33,9 +32,25 @@ impl ItfRunner for StreamingRunner {
fn init(&mut self, expected: &Self::ExpectedState) -> Result<Self::ActualState, Self::Error> {
println!("🔵 init: expected state={:?}", expected.state);
let mut streams_map = PartStreamsMap::default();

let initial_state: StreamStateImpl<ProposalPart> = StreamStateImpl {
buffer: utils::spec_to_impl_buffer(&expected.state.buffer, self.stream_id),
init_info: utils::init_message_to_proposal_init(&expected.incoming_message),
seen_sequences: expected
.state
.received
.iter()
.map(|msg| msg.sequence as u64)
.collect(),
next_sequence: expected.state.next_sequence as u64,
total_messages: expected.state.total_messages as usize,
fin_received: expected.state.fin_received,
emitted_messages: expected.state.emitted.len(),
};

streams_map
.streams
.insert((self.peer_id, self.stream_id), StreamStateImpl::default());
.insert((self.peer_id, self.stream_id), initial_state);
Ok(streams_map)
}

Expand All @@ -53,44 +68,15 @@ impl ItfRunner for StreamingRunner {
let message = match &expected.incoming_message {
Some(msg) => match &msg.msg_type {
MessageType::Init => {
// Dummy proposer address
let bytes: [u8; 32] = [
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C,
0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18,
0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20,
];
let proposer_addr = Address::new(bytes);

let height = Height {
block_number: 1,
fork_id: 1,
};

let round = Round::new(2);
let valid_round = Round::new(1);

let proposal_init = ProposalInit {
height: height,
proposal_round: round,
valid_round: valid_round,
proposer: proposer_addr,
};

let proposal_init = utils::generate_dummy_proposal_init();
StreamMessage::<ProposalPart>::new(
self.stream_id,
msg.sequence as u64,
StreamContent::Data(ProposalPart::Init(proposal_init)),
)
}
MessageType::Data => {
// Dummy transactions
let tx1 = Transaction::new(vec![0x01, 0x02, 0x03]);
let tx2 = Transaction::new(vec![0x04, 0x05, 0x06]);
let tx3 = Transaction::new(vec![0x07, 0x08, 0x09]);

let tx_vec = vec![tx1, tx2, tx3];

let transactions = Transactions::new(tx_vec);
let transactions = utils::generate_dummy_transactions();
StreamMessage::<ProposalPart>::new(
self.stream_id,
msg.sequence as u64,
Expand Down
85 changes: 83 additions & 2 deletions code/crates/test/mbt/src/tests/streaming/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::collections::HashSet;

use crate::streaming::{Buffer, Message};
use malachitebft_engine::util::streaming::Sequence;
use malachitebft_starknet_host::{streaming::MinHeap, types::ProposalPart};
use malachitebft_core_types::Round;
use malachitebft_engine::util::streaming::{Sequence, StreamId};
use malachitebft_engine::util::streaming::{StreamContent, StreamMessage};
use malachitebft_starknet_host::{
streaming::MinHeap,
types::{Address, Height, ProposalInit, ProposalPart, Transaction, Transactions},
};

pub fn messages_equal_sequences(
sequences: &HashSet<Sequence>,
Expand All @@ -27,3 +32,79 @@ pub fn compare_buffers(actual_buffer: &MinHeap<ProposalPart>, expected_buffer: &

actual_set == expected_set
}

pub fn spec_to_impl_buffer(spec_buffer: &Buffer, stream_id: StreamId) -> MinHeap<ProposalPart> {
let mut impl_buffer = MinHeap::default();

for rec in &spec_buffer.0 {
let message = match rec.1.msg_type {
crate::streaming::MessageType::Init => {
let proposal_init = generate_dummy_proposal_init();
StreamMessage::<ProposalPart>::new(
stream_id,
rec.0 as u64,
StreamContent::Data(ProposalPart::Init(proposal_init)),
)
}
crate::streaming::MessageType::Data => {
let transactions = generate_dummy_transactions();
StreamMessage::<ProposalPart>::new(
stream_id,
rec.0 as u64,
StreamContent::Data(ProposalPart::Transactions(transactions)),
)
}
crate::streaming::MessageType::Fin => StreamMessage::<ProposalPart>::new(
stream_id,
rec.0 as u64,
StreamContent::Fin(true),
),
};
impl_buffer.push(message);
}

impl_buffer
}

// Specifications init messages is just string, so no useful data can be extracted from it
pub fn init_message_to_proposal_init(message: &Option<Message>) -> Option<ProposalInit> {
match message {
Some(_) => Some(generate_dummy_proposal_init()),
None => None,
}
}

pub fn generate_dummy_proposal_init() -> ProposalInit {
let bytes: [u8; 32] = [
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F,
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,
0x1F, 0x20,
];
let proposer_addr = Address::new(bytes);

let height = Height {
block_number: 1,
fork_id: 1,
};

let round = Round::new(2);
let valid_round = Round::new(1);

ProposalInit {
height: height,
proposal_round: round,
valid_round: valid_round,
proposer: proposer_addr,
}
}

pub fn generate_dummy_transactions() -> Transactions {
let tx1 = Transaction::new(vec![0x01, 0x02, 0x03]);
let tx2 = Transaction::new(vec![0x04, 0x05, 0x06]);
let tx3 = Transaction::new(vec![0x07, 0x08, 0x09]);

let tx_vec = vec![tx1, tx2, tx3];

let transactions = Transactions::new(tx_vec);
transactions
}

0 comments on commit 096457e

Please sign in to comment.