Skip to content

Commit

Permalink
fix: update StreamHandler to use new StreamHashMap interface
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 25, 2024
1 parent 64ef9cb commit 69d1bac
Show file tree
Hide file tree
Showing 20 changed files with 315 additions and 106 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use papyrus_protobuf::consensus::{HeightAndRound, ProposalPart, StreamMessage};
#[cfg(feature = "rpc")]
use papyrus_rpc::run_server;
use papyrus_storage::storage_metrics::update_storage_metrics;
Expand Down Expand Up @@ -192,8 +192,9 @@ fn spawn_consensus(

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let proposal_network_channels: BroadcastTopicChannels<StreamMessage<ProposalPart>> =
network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?;
let proposal_network_channels: BroadcastTopicChannels<
StreamMessage<ProposalPart, HeightAndRound>,
> = network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?;
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_protobuf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license-file.workspace = true
testing = ["papyrus_test_utils", "rand", "rand_chacha"]

[dependencies]
bytes.workspace = true
indexmap.workspace = true
lazy_static.workspace = true
primitive-types.workspace = true
Expand Down
48 changes: 44 additions & 4 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::fmt::Display;

use bytes::{Buf, BufMut};
use prost::DecodeError;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
Expand Down Expand Up @@ -26,10 +30,13 @@ pub enum StreamMessageBody<T> {
Fin,
}

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct StreamMessage<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> {
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct StreamMessage<
T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
StreamId: Into<Vec<u8>> + Clone,
> {
pub message: StreamMessageBody<T>,
pub stream_id: u64,
pub stream_id: StreamId,
pub message_id: u64,
}

Expand Down Expand Up @@ -108,7 +115,7 @@ impl From<ProposalInit> for ProposalPart {
}
}

impl<T> std::fmt::Display for StreamMessage<T>
impl<T, StreamId: Into<Vec<u8>> + Clone + Display> std::fmt::Display for StreamMessage<T, StreamId>
where
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
{
Expand All @@ -131,3 +138,36 @@ where
}
}
}

/// HeighAndRound is a tuple struct used as the StreamId for consensus and context.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HeightAndRound(pub u64, pub u32);

impl TryFrom<Vec<u8>> for HeightAndRound {
type Error = ProtobufConversionError;

fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
if value.len() != 12 {
return Err(ProtobufConversionError::DecodeError(DecodeError::new("Invalid length")));
}
let mut bytes = value.as_slice();
let height = bytes.get_u64();
let round = bytes.get_u32();
Ok(HeightAndRound(height, round))
}
}

impl From<HeightAndRound> for Vec<u8> {
fn from(value: HeightAndRound) -> Vec<u8> {
let mut bytes = Vec::with_capacity(12);
bytes.put_u64(value.0);
bytes.put_u32(value.1);
bytes
}
}

impl std::fmt::Display for HeightAndRound {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(height: {}, round: {})", self.0, self.1)
}
}
33 changes: 21 additions & 12 deletions crates/papyrus_protobuf/src/converters/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(test)]
#[path = "consensus_test.rs"]
mod consensus_test;

use std::convert::{TryFrom, TryInto};

use prost::Message;
Expand Down Expand Up @@ -79,8 +80,10 @@ impl From<Vote> for protobuf::Vote {

auto_impl_into_and_try_from_vec_u8!(Vote, protobuf::Vote);

impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>>
TryFrom<protobuf::StreamMessage> for StreamMessage<T>
impl<
T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
StreamId: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + Clone,
> TryFrom<protobuf::StreamMessage> for StreamMessage<T, StreamId>
{
type Error = ProtobufConversionError;

Expand All @@ -101,16 +104,18 @@ impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>>
StreamMessageBody::Fin
}
},
stream_id: value.stream_id,
stream_id: value.stream_id.try_into()?,
message_id: value.message_id,
})
}
}

impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> From<StreamMessage<T>>
for protobuf::StreamMessage
impl<
T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
StreamId: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + Clone,
> From<StreamMessage<T, StreamId>> for protobuf::StreamMessage
{
fn from(value: StreamMessage<T>) -> Self {
fn from(value: StreamMessage<T, StreamId>) -> Self {
Self {
message: match value {
StreamMessage {
Expand All @@ -122,7 +127,7 @@ impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> From<
Some(protobuf::stream_message::Message::Fin(protobuf::Fin {}))
}
},
stream_id: value.stream_id,
stream_id: value.stream_id.into(),
message_id: value.message_id,
}
}
Expand All @@ -131,17 +136,21 @@ impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> From<
// Can't use auto_impl_into_and_try_from_vec_u8!(StreamMessage, protobuf::StreamMessage);
// because it doesn't seem to work with generics.
// TODO(guyn): consider expanding the macro to support generics
impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> From<StreamMessage<T>>
for Vec<u8>
impl<
T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
StreamId: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + Clone,
> From<StreamMessage<T, StreamId>> for Vec<u8>
{
fn from(value: StreamMessage<T>) -> Self {
fn from(value: StreamMessage<T, StreamId>) -> Self {
let protobuf_value = <protobuf::StreamMessage>::from(value);
protobuf_value.encode_to_vec()
}
}

impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> TryFrom<Vec<u8>>
for StreamMessage<T>
impl<
T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
StreamId: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + Clone,
> TryFrom<Vec<u8>> for StreamMessage<T, StreamId>
{
type Error = ProtobufConversionError;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
Expand Down
5 changes: 3 additions & 2 deletions crates/papyrus_protobuf/src/converters/consensus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::consensus::{
TransactionBatch,
Vote,
};
use crate::converters::test_instances::TestStreamId;

// If all the fields of `AllResources` are 0 upon serialization,
// then the deserialized value will be interpreted as the `L1Gas` variant.
Expand Down Expand Up @@ -50,7 +51,7 @@ fn convert_stream_message_to_vec_u8_and_back() {
let mut rng = get_rng();

// Test that we can convert a StreamMessage with a ProposalPart message to bytes and back.
let mut stream_message: StreamMessage<ProposalPart> =
let mut stream_message: StreamMessage<ProposalPart, TestStreamId> =
StreamMessage::get_test_instance(&mut rng);

if let StreamMessageBody::Content(ProposalPart::Transactions(proposal)) =
Expand Down Expand Up @@ -128,7 +129,7 @@ fn convert_proposal_part_to_vec_u8_and_back() {
#[test]
fn stream_message_display() {
let mut rng = get_rng();
let stream_id = 42;
let stream_id = TestStreamId(42);
let message_id = 127;
let proposal = ProposalPart::get_test_instance(&mut rng);
let proposal_bytes: Vec<u8> = proposal.clone().into();
Expand Down
35 changes: 33 additions & 2 deletions crates/papyrus_protobuf/src/converters/test_instances.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::fmt::Display;

use papyrus_test_utils::{auto_impl_get_test_instance, get_number_of_variants, GetTestInstance};
use prost::DecodeError;
use rand::Rng;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;

use super::ProtobufConversionError;
use crate::consensus::{
ProposalFin,
ProposalInit,
Expand Down Expand Up @@ -47,9 +51,36 @@ auto_impl_get_test_instance! {

}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TestStreamId(pub u64);

impl From<TestStreamId> for Vec<u8> {
fn from(value: TestStreamId) -> Self {
value.0.to_be_bytes().to_vec()
}
}

impl TryFrom<Vec<u8>> for TestStreamId {
type Error = ProtobufConversionError;
fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
if bytes.len() != 8 {
return Err(ProtobufConversionError::DecodeError(DecodeError::new("Invalid length")));
};
let mut array = [0; 8];
array.copy_from_slice(&bytes);
Ok(TestStreamId(u64::from_be_bytes(array)))
}
}

impl Display for TestStreamId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TestStreamId({})", self.0)
}
}

// The auto_impl_get_test_instance macro does not work for StreamMessage because it has
// a generic type. TODO(guyn): try to make the macro work with generic types.
impl GetTestInstance for StreamMessage<ProposalPart> {
impl GetTestInstance for StreamMessage<ProposalPart, TestStreamId> {
fn get_test_instance(rng: &mut rand_chacha::ChaCha8Rng) -> Self {
let message = if rng.gen_bool(0.5) {
StreamMessageBody::Content(ProposalPart::Transactions(TransactionBatch {
Expand All @@ -58,6 +89,6 @@ impl GetTestInstance for StreamMessage<ProposalPart> {
} else {
StreamMessageBody::Fin
};
Self { message, stream_id: 12, message_id: 47 }
Self { message, stream_id: TestStreamId(12), message_id: 47 }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ message StreamMessage {
bytes content = 1;
Fin fin = 2;
}
uint64 stream_id = 3;
bytes stream_id = 3;
uint64 message_id = 4;
}

Expand Down
1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ papyrus_config.workspace = true
papyrus_network.workspace = true
papyrus_network_types.workspace = true
papyrus_protobuf.workspace = true
prost.workspace = true
serde = { workspace = true, features = ["derive"] }
starknet-types-core.workspace = true
starknet_api.workspace = true
Expand Down
Loading

0 comments on commit 69d1bac

Please sign in to comment.