Skip to content

Commit

Permalink
chore: update deps (#474)
Browse files Browse the repository at this point in the history
  • Loading branch information
Freyskeyd committed Mar 12, 2024
1 parent 213b8d4 commit 264c569
Show file tree
Hide file tree
Showing 23 changed files with 492 additions and 499 deletions.
747 changes: 390 additions & 357 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 9 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,21 @@ tiny-keccak = {version = "1.5"}
ethers = {version = "2.0.9", features = ["legacy", "abigen-online"]}

# Log, Tracing & telemetry
opentelemetry = { version = "0.19", features = ["rt-tokio", "metrics"] }
opentelemetry-otlp = { version = "0.12", features = ["grpc-tonic", "metrics", "tls-roots"] }
opentelemetry = { version = "0.22", features = ["metrics"] }
opentelemetry-otlp = { version = "0.15", features = ["grpc-tonic", "metrics", "tls-roots"] }
opentelemetry_sdk = { version = "0.22" }

prometheus = "0.13.3"
prometheus-client = "0.21"
prometheus-client = "0.22"
tracing = { version = "0.1", default-features = false }
tracing-attributes = "0.1"
tracing-opentelemetry = "0.19"
tracing-opentelemetry = "0.23"
tracing-subscriber = { version = "0.3", default-features = false }

# gRPC
prost = {version = "0.12"}
tonic = { version = "0.10", default-features = false }
tonic-build = { version = "0.10", default-features = false, features = [
tonic = { version = "0.11", default-features = false }
tonic-build = { version = "0.11", default-features = false, features = [
"prost", "transport"
] }

Expand All @@ -67,7 +69,7 @@ http = "0.2.9"
tower-http = { version = "0.4", features = ["cors"] }

# P2P related
libp2p = { version = "0.52", default-features = false, features = ["noise"]}
libp2p = { version = "0.53", default-features = false, features = ["noise"]}

# Serialization & Deserialization
bincode = { version = "1.3", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ tracing.workspace = true
uuid.workspace = true

[build-dependencies]
tonic-build = { version = "0.10", default-features = false, features = [
tonic-build = { version = "0.11", default-features = false, features = [
"prost", "transport"
] }

Expand Down
22 changes: 6 additions & 16 deletions crates/topos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::process::ExitStatus;

use futures::stream::FuturesUnordered;
use futures::StreamExt;
use opentelemetry::{global, sdk::metrics::controllers::BasicController};
use opentelemetry::global;
use process::Errors;
use tokio::{
signal::{self, unix::SignalKind},
Expand All @@ -19,7 +19,6 @@ use topos_config::{
use topos_telemetry::tracing::setup_tracing;
use topos_wallet::SecretManager;
use tracing::{debug, error, info};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::util::TryInitError;

mod process;
Expand Down Expand Up @@ -57,7 +56,7 @@ pub async fn start(
) -> Result<(), Error> {
// Setup instrumentation if both otlp agent and otlp service name
// are provided as arguments
let basic_controller = setup_tracing(
setup_tracing(
verbose,
no_color,
otlp_agent,
Expand Down Expand Up @@ -112,14 +111,14 @@ pub async fn start(
tokio::select! {
_ = sigterm_stream.recv() => {
info!("Received SIGTERM, shutting down application...");
shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await;
shutdown(shutdown_trigger, shutdown_receiver).await;
}
_ = signal::ctrl_c() => {
info!("Received ctrl_c, shutting down application...");
shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await;
shutdown( shutdown_trigger, shutdown_receiver).await;
}
Some(result) = processes.next() => {
shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await;
shutdown(shutdown_trigger, shutdown_receiver).await;
processes.clear();
match result {
Ok(Ok(status)) => {
Expand Down Expand Up @@ -214,21 +213,12 @@ fn spawn_processes(
Ok(processes)
}

async fn shutdown(
basic_controller: Option<BasicController>,
trigger: CancellationToken,
mut termination: mpsc::Receiver<()>,
) {
async fn shutdown(trigger: CancellationToken, mut termination: mpsc::Receiver<()>) {
trigger.cancel();
// Wait that all sender get dropped
info!("Waiting that all components dropped");
let _ = termination.recv().await;
info!("Shutdown procedure finished, exiting...");
// Shutdown tracing
global::shutdown_tracer_provider();
if let Some(basic_controller) = basic_controller {
if let Err(e) = basic_controller.stop(&tracing::Span::current().context()) {
error!("Error stopping tracing: {e}");
}
}
}
2 changes: 1 addition & 1 deletion crates/topos-p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ http.workspace = true
lazy_static.workspace = true
libp2p = { workspace = true, features = ["macros", "gossipsub", "tcp", "dns", "tokio", "request-response", "identify", "kad", "serde", "yamux", "secp256k1"] }
pin-project = "1.1.3"
libp2p-swarm-test = "0.2.0"
prometheus-client.workspace = true
rand.workspace = true
serde = { workspace = true, features = ["derive"] }
Expand All @@ -39,6 +38,7 @@ prost.workspace = true
topos-core = { path = "../topos-core/" }

[dev-dependencies]
libp2p-swarm-test = "0.3.0"
test-log.workspace = true
env_logger.workspace = true
rstest = { workspace = true, features = ["async-timeout"] }
Expand Down
5 changes: 2 additions & 3 deletions crates/topos-p2p/src/behaviour/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}

fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
self.inner.on_swarm_event(event)
}

Expand All @@ -157,7 +157,6 @@ impl NetworkBehaviour for DiscoveryBehaviour {
fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
params: &mut impl libp2p::swarm::PollParameters,
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
// Poll the kademlia bootstrap interval future in order to define if we need to call the
// `bootstrap`
Expand All @@ -169,7 +168,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}

if let Poll::Ready(event) = self.inner.poll(cx, params) {
if let Poll::Ready(event) = self.inner.poll(cx) {
match event {
// When a Bootstrap query ends, we reset the `query_id`
ToSwarm::GenerateEvent(KademliaEvent::OutboundQueryProgressed {
Expand Down
11 changes: 7 additions & 4 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use libp2p::{
use prost::Message as ProstMessage;
use topos_core::api::grpc::tce::v1::Batch;
use topos_metrics::{P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL, P2P_GOSSIP_BATCH_SIZE};
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY};

Expand Down Expand Up @@ -147,7 +147,7 @@ impl NetworkBehaviour for Behaviour {
)
}

fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
self.gossipsub.on_swarm_event(event)
}

Expand All @@ -164,7 +164,6 @@ impl NetworkBehaviour for Behaviour {
fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
params: &mut impl libp2p::swarm::PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if self.tick.poll_tick(cx).is_ready() {
// Publish batch
Expand All @@ -186,7 +185,7 @@ impl NetworkBehaviour for Behaviour {
}
}

let event = match self.gossipsub.poll(cx, params) {
let event = match self.gossipsub.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(ToSwarm::GenerateEvent(event)) => Some(event),
Poll::Ready(ToSwarm::ListenOn { opts }) => {
Expand Down Expand Up @@ -225,6 +224,10 @@ impl NetworkBehaviour for Behaviour {
Poll::Ready(ToSwarm::NewExternalAddrCandidate(addr)) => {
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(addr))
}
Poll::Ready(event) => {
warn!("Unhandled event in gossip behaviour: {:?}", event);
None
}
};

if let Some(gossipsub::Event::Message { ref message_id, .. }) = event {
Expand Down
7 changes: 3 additions & 4 deletions crates/topos-p2p/src/behaviour/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,8 @@ impl Behaviour {
peer_id,
connection_id,
endpoint,
handler,
remaining_established,
}: ConnectionClosed<<Self as NetworkBehaviour>::ConnectionHandler>,
}: ConnectionClosed,
) {
debug!("Connection {connection_id} closed with peer {peer_id}");
if let Some(connections) = self.connected.get_mut(&peer_id) {
Expand Down Expand Up @@ -514,7 +513,7 @@ impl NetworkBehaviour for Behaviour {
}
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(connection_established) => {
self.on_connection_established(connection_established)
Expand All @@ -535,13 +534,13 @@ impl NetworkBehaviour for Behaviour {
| FromSwarm::ListenerError(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::NewListenAddr(_) => (),
event => debug!("Unhandled event from swarm (grpc): {:?}", event),
}
}

fn poll(
&mut self,
cx: &mut Context<'_>,
params: &mut impl libp2p::swarm::PollParameters,
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
// Sending event to both `Swarm` and `ConnectionHandler`
if let Some(ev) = self.pending_events.pop_front() {
Expand Down
16 changes: 7 additions & 9 deletions crates/topos-p2p/src/behaviour/grpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use libp2p::swarm::{
handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol,
};
use tracing::{debug, warn};

Expand All @@ -36,7 +36,7 @@ pub struct Handler {
/// Optional outbound request id
outbound_request_id: Option<ProtocolRequest>,
protocols: HashSet<String>,
keep_alive: KeepAlive,
keep_alive: bool,
}

impl Handler {
Expand All @@ -46,7 +46,7 @@ impl Handler {
pending_events: VecDeque::new(),
outbound_request_id: None,
protocols,
keep_alive: KeepAlive::Yes,
keep_alive: true,
}
}
}
Expand All @@ -56,8 +56,6 @@ impl ConnectionHandler for Handler {

type ToBehaviour = event::Event;

type Error = void::Void;

type InboundProtocol = GrpcUpgradeProtocol;

type OutboundProtocol = GrpcUpgradeProtocol;
Expand All @@ -77,7 +75,7 @@ impl ConnectionHandler for Handler {
)
}

fn connection_keep_alive(&self) -> libp2p::swarm::KeepAlive {
fn connection_keep_alive(&self) -> bool {
self.keep_alive
}

Expand Down Expand Up @@ -127,7 +125,7 @@ impl ConnectionHandler for Handler {
self.pending_events.push_back(Event::OutboundTimeout(info));

// Closing the connection handler
self.keep_alive = KeepAlive::No;
self.keep_alive = false;
}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
info,
Expand All @@ -137,13 +135,14 @@ impl ConnectionHandler for Handler {
.push_back(Event::UnsupportedProtocol(info.request_id, info.protocol));

// Closing the connection handler
self.keep_alive = KeepAlive::No;
self.keep_alive = false;
}
ConnectionEvent::DialUpgradeError(_)
| ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => (),
event => warn!("Unhandled connection event: {:?}", event),
}
}
#[allow(deprecated)]
Expand All @@ -155,7 +154,6 @@ impl ConnectionHandler for Handler {
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
> {
if let Some(event) = self.pending_events.pop_front() {
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-p2p/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::sync::{
mpsc::{self, error::SendError},
oneshot,
};
use tonic::transport::NamedService;
use tonic::server::NamedService;
use topos_core::api::grpc::GrpcClient;

use crate::{
Expand Down
1 change: 1 addition & 0 deletions crates/topos-p2p/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub enum ComposedEvent {
Grpc(grpc::Event),
Void,
}

impl From<grpc::Event> for ComposedEvent {
fn from(event: grpc::Event) -> Self {
ComposedEvent::Grpc(event)
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub use runtime::Runtime;

use hyper::Body;
use tonic::body::BoxBody;
use tonic::server::NamedService;
use tonic::transport::server::Router;
use tonic::transport::NamedService;
use topos_core::api::grpc::p2p::info_service_server::InfoService;
use topos_core::api::grpc::p2p::info_service_server::InfoServiceServer;
use tower::Service;
Expand Down Expand Up @@ -89,7 +89,7 @@ pub mod utils {

use libp2p::{identity, PeerId};
use tokio::{sync::mpsc, sync::oneshot};
use tonic::transport::NamedService;
use tonic::server::NamedService;
use topos_core::api::grpc::GrpcClient;

use tracing::debug;
Expand Down
4 changes: 1 addition & 3 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ impl<'a> NetworkBuilder<'a> {
dns_tcp.or_transport(tcp)
};

let mut multiplex_config = libp2p::yamux::Config::default();
multiplex_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::on_read());
multiplex_config.set_max_buffer_size(1024 * 1024 * 16);
let multiplex_config = libp2p::yamux::Config::default();

let transport = transport
.upgrade(upgrade::Version::V1)
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-p2p/src/runtime/handle_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
error::{CommandExecutionError, P2PError},
protocol_name, Command, Runtime,
};
use libp2p::{kad::record::Key, PeerId};
use libp2p::{kad::RecordKey, PeerId};
use rand::{thread_rng, Rng};
use topos_metrics::P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL;
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -101,7 +101,7 @@ impl Runtime {
let query_id = behaviour
.discovery
.inner
.get_record(Key::new(&to.to_string()));
.get_record(RecordKey::new(&to.to_string()));

debug!("Created a get_record query {query_id:?} for discovering {to}");
if let Some(id) = self.pending_record_requests.insert(query_id, sender) {
Expand Down
Loading

0 comments on commit 264c569

Please sign in to comment.