Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow StreamHandler to use a generic StreamId #1714

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use papyrus_protobuf::consensus::{HeightAndRound, ProposalPart, StreamMessage};
#[cfg(feature = "rpc")]
use papyrus_rpc::run_server;
use papyrus_storage::storage_metrics::update_storage_metrics;
Expand Down Expand Up @@ -192,8 +192,9 @@ fn spawn_consensus(

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let proposal_network_channels: BroadcastTopicChannels<StreamMessage<ProposalPart>> =
network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?;
let proposal_network_channels: BroadcastTopicChannels<
StreamMessage<ProposalPart, HeightAndRound>,
> = network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?;
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_protobuf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license-file.workspace = true
testing = ["papyrus_test_utils", "rand", "rand_chacha"]

[dependencies]
bytes.workspace = true
indexmap.workspace = true
lazy_static.workspace = true
primitive-types.workspace = true
Expand Down
48 changes: 44 additions & 4 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::fmt::Display;

use bytes::{Buf, BufMut};
use prost::DecodeError;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
Expand Down Expand Up @@ -26,10 +30,13 @@ pub enum StreamMessageBody<T> {
Fin,
}

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct StreamMessage<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> {
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct StreamMessage<
T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
StreamId: Into<Vec<u8>> + Clone,
> {
pub message: StreamMessageBody<T>,
pub stream_id: u64,
pub stream_id: StreamId,
pub message_id: u64,
}

Expand Down Expand Up @@ -108,7 +115,7 @@ impl From<ProposalInit> for ProposalPart {
}
}

impl<T> std::fmt::Display for StreamMessage<T>
impl<T, StreamId: Into<Vec<u8>> + Clone + Display> std::fmt::Display for StreamMessage<T, StreamId>
where
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
{
Expand All @@ -131,3 +138,36 @@ where
}
}
}

/// HeighAndRound is a tuple struct used as the StreamId for consensus and context.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HeightAndRound(pub u64, pub u32);

impl TryFrom<Vec<u8>> for HeightAndRound {
type Error = ProtobufConversionError;

fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
if value.len() != 12 {
return Err(ProtobufConversionError::DecodeError(DecodeError::new("Invalid length")));
}
let mut bytes = value.as_slice();
let height = bytes.get_u64();
let round = bytes.get_u32();
Ok(HeightAndRound(height, round))
}
}

impl From<HeightAndRound> for Vec<u8> {
fn from(value: HeightAndRound) -> Vec<u8> {
let mut bytes = Vec::with_capacity(12);
bytes.put_u64(value.0);
bytes.put_u32(value.1);
bytes
}
}

impl std::fmt::Display for HeightAndRound {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(height: {}, round: {})", self.0, self.1)
}
}
33 changes: 21 additions & 12 deletions crates/papyrus_protobuf/src/converters/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(test)]
#[path = "consensus_test.rs"]
mod consensus_test;

use std::convert::{TryFrom, TryInto};

use prost::Message;
Expand Down Expand Up @@ -79,8 +80,10 @@ impl From<Vote> for protobuf::Vote {

auto_impl_into_and_try_from_vec_u8!(Vote, protobuf::Vote);

impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>>
TryFrom<protobuf::StreamMessage> for StreamMessage<T>
impl<
T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
StreamId: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + Clone,
> TryFrom<protobuf::StreamMessage> for StreamMessage<T, StreamId>
{
type Error = ProtobufConversionError;

Expand All @@ -101,16 +104,18 @@ impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>>
StreamMessageBody::Fin
}
},
stream_id: value.stream_id,
stream_id: value.stream_id.try_into()?,
message_id: value.message_id,
})
}
}

impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> From<StreamMessage<T>>
for protobuf::StreamMessage
impl<
T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
StreamId: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + Clone,
> From<StreamMessage<T, StreamId>> for protobuf::StreamMessage
{
fn from(value: StreamMessage<T>) -> Self {
fn from(value: StreamMessage<T, StreamId>) -> Self {
Self {
message: match value {
StreamMessage {
Expand All @@ -122,7 +127,7 @@ impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> From<
Some(protobuf::stream_message::Message::Fin(protobuf::Fin {}))
}
},
stream_id: value.stream_id,
stream_id: value.stream_id.into(),
message_id: value.message_id,
}
}
Expand All @@ -131,17 +136,21 @@ impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> From<
// Can't use auto_impl_into_and_try_from_vec_u8!(StreamMessage, protobuf::StreamMessage);
// because it doesn't seem to work with generics.
// TODO(guyn): consider expanding the macro to support generics
impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> From<StreamMessage<T>>
for Vec<u8>
impl<
T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
StreamId: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + Clone,
> From<StreamMessage<T, StreamId>> for Vec<u8>
{
fn from(value: StreamMessage<T>) -> Self {
fn from(value: StreamMessage<T, StreamId>) -> Self {
let protobuf_value = <protobuf::StreamMessage>::from(value);
protobuf_value.encode_to_vec()
}
}

impl<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> TryFrom<Vec<u8>>
for StreamMessage<T>
impl<
T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
StreamId: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + Clone,
> TryFrom<Vec<u8>> for StreamMessage<T, StreamId>
{
type Error = ProtobufConversionError;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
Expand Down
5 changes: 3 additions & 2 deletions crates/papyrus_protobuf/src/converters/consensus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::consensus::{
TransactionBatch,
Vote,
};
use crate::converters::test_instances::TestStreamId;

// If all the fields of `AllResources` are 0 upon serialization,
// then the deserialized value will be interpreted as the `L1Gas` variant.
Expand Down Expand Up @@ -50,7 +51,7 @@ fn convert_stream_message_to_vec_u8_and_back() {
let mut rng = get_rng();

// Test that we can convert a StreamMessage with a ProposalPart message to bytes and back.
let mut stream_message: StreamMessage<ProposalPart> =
let mut stream_message: StreamMessage<ProposalPart, TestStreamId> =
StreamMessage::get_test_instance(&mut rng);

if let StreamMessageBody::Content(ProposalPart::Transactions(proposal)) =
Expand Down Expand Up @@ -128,7 +129,7 @@ fn convert_proposal_part_to_vec_u8_and_back() {
#[test]
fn stream_message_display() {
let mut rng = get_rng();
let stream_id = 42;
let stream_id = TestStreamId(42);
let message_id = 127;
let proposal = ProposalPart::get_test_instance(&mut rng);
let proposal_bytes: Vec<u8> = proposal.clone().into();
Expand Down
35 changes: 33 additions & 2 deletions crates/papyrus_protobuf/src/converters/test_instances.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::fmt::Display;

use papyrus_test_utils::{auto_impl_get_test_instance, get_number_of_variants, GetTestInstance};
use prost::DecodeError;
use rand::Rng;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;

use super::ProtobufConversionError;
use crate::consensus::{
ProposalFin,
ProposalInit,
Expand Down Expand Up @@ -47,9 +51,36 @@ auto_impl_get_test_instance! {

}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TestStreamId(pub u64);

impl From<TestStreamId> for Vec<u8> {
fn from(value: TestStreamId) -> Self {
value.0.to_be_bytes().to_vec()
}
}

impl TryFrom<Vec<u8>> for TestStreamId {
type Error = ProtobufConversionError;
fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
if bytes.len() != 8 {
return Err(ProtobufConversionError::DecodeError(DecodeError::new("Invalid length")));
};
let mut array = [0; 8];
array.copy_from_slice(&bytes);
Ok(TestStreamId(u64::from_be_bytes(array)))
}
}

impl Display for TestStreamId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TestStreamId({})", self.0)
}
}

// The auto_impl_get_test_instance macro does not work for StreamMessage because it has
// a generic type. TODO(guyn): try to make the macro work with generic types.
impl GetTestInstance for StreamMessage<ProposalPart> {
impl GetTestInstance for StreamMessage<ProposalPart, TestStreamId> {
fn get_test_instance(rng: &mut rand_chacha::ChaCha8Rng) -> Self {
let message = if rng.gen_bool(0.5) {
StreamMessageBody::Content(ProposalPart::Transactions(TransactionBatch {
Expand All @@ -58,6 +89,6 @@ impl GetTestInstance for StreamMessage<ProposalPart> {
} else {
StreamMessageBody::Fin
};
Self { message, stream_id: 12, message_id: 47 }
Self { message, stream_id: TestStreamId(12), message_id: 47 }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ message StreamMessage {
bytes content = 1;
Fin fin = 2;
}
uint64 stream_id = 3;
bytes stream_id = 3;
uint64 message_id = 4;
}

Expand Down
1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ papyrus_config.workspace = true
papyrus_network.workspace = true
papyrus_network_types.workspace = true
papyrus_protobuf.workspace = true
prost.workspace = true
serde = { workspace = true, features = ["derive"] }
starknet-types-core.workspace = true
starknet_api.workspace = true
Expand Down
Loading
Loading