Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kona): Block GossipSub #15

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,252 changes: 1,173 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions kona/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ reth-node-ethereum.workspace = true
alloy = { version = "0.2", features = ["full"] }
alloy-rlp = "0.3.4"

# p2p
snap = "1.1.1"
discv5 = "0.6.0"
tokio.workspace = true
openssl = { version = "0.10.66", features = ["vendored"] }
libp2p-identity = { version = "0.2.9", features = [ "secp256k1" ] }
libp2p = { version = "0.54.0", features = ["macros", "tokio", "tcp", "noise", "gossipsub", "ping"] }

# other
clap = "4"
tracing = "0.1.40"
Expand All @@ -31,6 +39,7 @@ reqwest = "0.12.5"
serde_json = "1.0.120"
parking_lot = "0.12.3"
eyre.workspace = true
lazy_static = "1.5.0"

[dev-dependencies]
reth-exex-test-utils.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions kona/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use reth_node_ethereum::EthereumNode;
use superchain_registry::ROLLUP_CONFIGS;
use tracing::{debug, error, info, warn};

mod p2p;

mod blobs;
use blobs::ExExBlobProvider;

Expand Down
49 changes: 49 additions & 0 deletions kona/src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//! Block Handler

use eyre::Result;
use libp2p::swarm::NetworkBehaviour;
use libp2p::gossipsub::Config;
use libp2p::gossipsub::MessageAuthenticity;
use libp2p::gossipsub::IdentTopic;

use crate::p2p::event::Event;
use crate::p2p::handler::Handler;

/// Specifies the [NetworkBehaviour] of the node
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Event")]
pub struct Behaviour {
/// Responds to inbound pings and send outbound pings.
ping: libp2p::ping::Behaviour,
/// Enables gossipsub as the routing layer.
gossipsub: libp2p::gossipsub::Behaviour,
}

impl Behaviour {
/// Configures the swarm behaviors, subscribes to the gossip topics, and returns a new [Behaviour].
pub fn new(cfg: Config, handlers: &[Box<dyn Handler>]) -> Result<Self> {
let ping = libp2p::ping::Behaviour::default();

let mut gossipsub =
libp2p::gossipsub::Behaviour::new(MessageAuthenticity::Anonymous, cfg)
.map_err(|_| eyre::eyre!("gossipsub behaviour creation failed"))?;

handlers
.iter()
.flat_map(|handler| {
handler
.topics()
.iter()
.map(|topic| {
let topic = IdentTopic::new(topic.to_string());
gossipsub
.subscribe(&topic)
.map_err(|_| eyre::eyre!("subscription failed"))
})
.collect::<Vec<_>>()
})
.collect::<Result<Vec<bool>>>()?;

Ok(Self { ping, gossipsub })
}
}
91 changes: 91 additions & 0 deletions kona/src/p2p/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! Builder for an OP Stack P2P network.

use eyre::Result;
use std::net::SocketAddr;
use alloy::primitives::Address;
use libp2p::gossipsub::ConfigBuilder;
use libp2p_identity::Keypair;
use tokio::sync::watch::channel;

use crate::p2p::config;
use crate::p2p::handler::BlockHandler;
use crate::p2p::behaviour::Behaviour;
use crate::p2p::driver::GossipDriver;

/// Constructs a [GossipDriver] for the OP Stack P2P network.
#[derive(Default)]
pub struct GossipDriverBuilder {
/// The chain ID of the network.
chain_id: Option<u64>,
/// The unsafe block signer.
unsafe_block_signer: Option<Address>,
/// The socket address that the service is listening on.
socket: Option<SocketAddr>,
/// The [ConfigBuilder] constructs the [Config] for `gossipsub`.
inner: Option<ConfigBuilder>,
/// The [Keypair] for the node.
keypair: Option<Keypair>,
}

impl GossipDriverBuilder {
/// Creates a new [GossipDriverBuilder].
pub fn new() -> Self {
Self::default()
}

/// Specifies the chain ID of the network.
pub fn with_chain_id(&mut self, chain_id: u64) -> &mut Self {
self.chain_id = Some(chain_id);
self
}

/// Specifies the unsafe block signer.
pub fn with_unsafe_block_signer(&mut self, unsafe_block_signer: Address) -> &mut Self {
self.unsafe_block_signer = Some(unsafe_block_signer);
self
}

/// Specifies the socket address that the service is listening on.
pub fn with_socket(&mut self, socket: SocketAddr) -> &mut Self {
self.socket = Some(socket);
self
}

/// Specifies the keypair for the node.
pub fn with_keypair(&mut self, keypair: Keypair) -> &mut Self {
self.keypair = Some(keypair);
self
}

// TODO: extend builder with [ConfigBuilder] methods.

/// Specifies the [ConfigBuilder] for the `gossipsub` configuration.
pub fn with_gossip_config(&mut self, inner: ConfigBuilder) -> &mut Self {
self.inner = Some(inner);
self
}

/// Builds the [GossipDriver].
pub fn build(self) -> Result<GossipDriver> {
// Build the config for gossipsub.
let config = self.inner.unwrap_or(config::default_config_builder()).build()?;
let unsafe_block_signer = self.unsafe_block_signer.ok_or_else(|| eyre::eyre!("unsafe block signer not set"))?;
let chain_id = self.chain_id.ok_or_else(|| eyre::eyre!("chain ID not set"))?;

// Create the block handler.
let (unsafe_block_signer_sender, unsafe_block_signer_recv) = channel(unsafe_block_signer);
let (block_handler, unsafe_block_recv) = BlockHandler::new(chain_id, unsafe_block_signer_recv);

// Construct the gossipsub behaviour.
let behaviour = Behaviour::new(config, &[Box::new(block_handler)])?;

Ok(GossipDriver {
behaviour,
unsafe_block_recv,
unsafe_block_signer_sender,
chain_id,
addr: self.socket.ok_or_else(|| eyre::eyre!("socket address not set"))?,
keypair: self.keypair.unwrap_or(Keypair::generate_secp256k1()),
})
}
}
119 changes: 119 additions & 0 deletions kona/src/p2p/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
//! P2P Configuration

use std::time::Duration;
use lazy_static::lazy_static;
use openssl::sha::sha256;
use snap::raw::Decoder;
use libp2p::gossipsub::{ConfigBuilder, Message, MessageId};

////////////////////////////////////////////////////////////////////////////////////////////////
// GossipSub Constants
////////////////////////////////////////////////////////////////////////////////////////////////

/// The maximum gossip size.
/// Limits the total size of gossip RPC containers as well as decompressed individual messages.
pub const MAX_GOSSIP_SIZE: usize = 10 * (1 << 20);

/// The minimum gossip size.
/// Used to make sure that there is at least some data to validate the signature against.
pub const MIN_GOSSIP_SIZE: usize = 66;

/// The maximum outbound queue.
pub const MAX_OUTBOUND_QUEUE: usize = 256;

/// The maximum validate queue.
pub const MAX_VALIDATE_QUEUE: usize = 256;

/// The global validate throttle.
pub const GLOBAL_VALIDATE_THROTTLE: usize = 512;

/// The default mesh D.
pub const DEFAULT_MESH_D: usize = 8;

/// The default mesh D low.
pub const DEFAULT_MESH_DLO: usize = 6;

/// The default mesh D high.
pub const DEFAULT_MESH_DHI: usize = 12;

/// The default mesh D lazy.
pub const DEFAULT_MESH_DLAZY: usize = 6;

////////////////////////////////////////////////////////////////////////////////////////////////
// Duration Constants
////////////////////////////////////////////////////////////////////////////////////////////////

lazy_static! {
/// The gossip heartbeat.
pub static ref GOSSIP_HEARTBEAT: Duration = Duration::from_millis(500);

/// The seen messages TTL.
/// Limits the duration that message IDs are remembered for gossip deduplication purposes.
pub static ref SEEN_MESSAGES_TTL: Duration = 130 * *GOSSIP_HEARTBEAT;

/// The pper score inspect frequency.
/// The frequency at which peer scores are inspected.
pub static ref PEER_SCORE_INSPECT_FREQUENCY: Duration = 15 * Duration::from_secs(1);
}

////////////////////////////////////////////////////////////////////////////////////////////////
// Config Building
////////////////////////////////////////////////////////////////////////////////////////////////

/// Builds the default gossipsub configuration.
///
/// Notable defaults:
/// - flood_publish: false (call `.flood_publish(true)` on the [ConfigBuilder] to enable)
/// - backoff_slack: 1
/// - peer exchange is disabled
/// - maximum byte size for gossip messages: 2048 bytes
///
/// # Returns
///
/// A [`ConfigBuilder`] with the default gossipsub configuration already set.
/// Call `.build()` on the returned builder to get the final [libp2p::gossipsub::Config].
pub fn default_config_builder() -> ConfigBuilder {
let mut builder = ConfigBuilder::default();
builder
.mesh_n(DEFAULT_MESH_D)
.mesh_n_low(DEFAULT_MESH_DLO)
.mesh_n_high(DEFAULT_MESH_DHI)
.gossip_lazy(DEFAULT_MESH_DLAZY)
.heartbeat_interval(*GOSSIP_HEARTBEAT)
.fanout_ttl(Duration::from_secs(24))
.history_length(12)
.history_gossip(3)
.duplicate_cache_time(Duration::from_secs(65))
.validation_mode(libp2p::gossipsub::ValidationMode::None)
.validate_messages()
.message_id_fn(compute_message_id);

builder
}

/// Computes the [MessageId] of a `gossipsub` message.
fn compute_message_id(msg: &Message) -> MessageId {
let mut decoder = Decoder::new();
let id = match decoder.decompress_vec(&msg.data) {
Ok(data) => {
let domain_valid_snappy: Vec<u8> = vec![0x1, 0x0, 0x0, 0x0];
sha256(
[domain_valid_snappy.as_slice(), data.as_slice()]
.concat()
.as_slice(),
)[..20]
.to_vec()
}
Err(_) => {
let domain_invalid_snappy: Vec<u8> = vec![0x0, 0x0, 0x0, 0x0];
sha256(
[domain_invalid_snappy.as_slice(), msg.data.as_slice()]
.concat()
.as_slice(),
)[..20]
.to_vec()
}
};

MessageId(id)
}
89 changes: 89 additions & 0 deletions kona/src/p2p/driver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! Driver for the P2P Service.

use anyhow::Result;
use std::net::SocketAddr;
use alloy::primitives::Address;
use libp2p_identity::Keypair;
use tokio::select;
use tokio::sync::watch::{Receiver, Sender};
use libp2p::{Multiaddr, SwarmBuilder, PeerId, Transport};
use libp2p::swarm::SwarmEvent;

use crate::p2p::event::Event;
use crate::p2p::behaviour::Behaviour;
use crate::p2p::types::ExecutionPayloadEnvelope;

/// Driver contains the logic for the P2P service.
pub struct GossipDriver {
/// The [Behaviour] of the node.
pub behaviour: Behaviour,
/// Channel to receive unsafe blocks.
pub unsafe_block_recv: Receiver<ExecutionPayloadEnvelope>,
/// Channel to send unsafe signer updates.
pub unsafe_block_signer_sender: Sender<Address>,
/// The socket address that the service is listening on.
pub addr: SocketAddr,
/// The chain ID of the network.
pub chain_id: u64,
/// A unique keypair to validate the node's identity
pub keypair: Keypair,
}

impl GossipDriver {
/// Starts the Discv5 peer discovery & libp2p services
/// and continually listens for new peers and messages to handle
pub fn start(mut self) -> Result<()> {

// TODO: pull this swarm building out into the builder
let addr = NetworkAddress::try_from(self.addr)?;
let tcp_cfg = libp2p::tcp::Config::default();
let auth_cfg = libp2p::noise::Config::new(&self.keypair)?;
let transport = libp2p::tcp::tokio::Transport::new(tcp_cfg)
.upgrade(libp2p::core::upgrade::Version::V1Lazy)
.authenticate(auth_cfg)
.boxed();
let mut swarm = SwarmBuilder::with_tokio_executor(transport, self.behaviour, PeerId::from(self.keypair.public()))
.build();
let mut peer_recv = discovery::start(addr, self.chain_id)?;
let multiaddr = Multiaddr::from(addr);
swarm
.listen_on(multiaddr)
.map_err(|_| eyre::eyre!("swarm listen failed"))?;

let mut handlers = Vec::new();
handlers.append(&mut self.handlers);

tokio::spawn(async move {
loop {
select! {
peer = peer_recv.recv().fuse() => {
if let Some(peer) = peer {
let peer = Multiaddr::from(peer);
_ = swarm.dial(peer);
}
},
event = swarm.select_next_some() => {
if let SwarmEvent::Behaviour(Event::Gossipsub(libp2p::gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
message,
})) = event {
let handler = self.handlers
.iter()
.find(|h| h.topics().contains(&message.topic));
if let Some(handler) = handler {
let status = handler.handle(message);
_ = swarm
.behaviour_mut()
.gossipsub
.report_message_validation_result(&message_id, &propagation_source, status);
}
}
},
}
}
});

Ok(())
}
}
Loading
Loading