diff --git a/src/discovery/behaviour.rs b/src/discovery/behaviour.rs index a496330..b4706b6 100644 --- a/src/discovery/behaviour.rs +++ b/src/discovery/behaviour.rs @@ -4,7 +4,7 @@ use discv5::{Discv5, Discv5ConfigBuilder, Discv5Event, Enr, QueryError}; use futures::stream::FuturesUnordered; use futures::{Future, FutureExt, StreamExt}; use libp2p::core::Endpoint; -use libp2p::swarm::dummy::{ConnectionHandler as DummyConnectionHandler, ConnectionHandler}; +use libp2p::swarm::dummy::ConnectionHandler as DummyConnectionHandler; use libp2p::swarm::{ ConnectionDenied, ConnectionId, DialError, DialFailure, FromSwarm, NetworkBehaviour, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, diff --git a/src/rpc/handler.rs b/src/rpc/handler.rs index 84f512d..b82f943 100644 --- a/src/rpc/handler.rs +++ b/src/rpc/handler.rs @@ -6,10 +6,8 @@ use crate::rpc::protocol::{ }; use futures::{FutureExt, SinkExt, StreamExt}; use libp2p::swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}; -use libp2p::swarm::{ - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, SubstreamProtocol, -}; -use libp2p::PeerId; +use libp2p::swarm::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol}; +use libp2p::{PeerId, Stream}; use lighthouse_network::rpc::methods::RPCCodedResponse; use smallvec::SmallVec; use std::collections::hash_map::Entry; @@ -48,9 +46,9 @@ pub struct SubstreamId(usize); enum InboundSubstreamState { // The underlying substream is not being used. - Idle(InboundFramed), + Idle(InboundFramed), // The underlying substream is processing responses. - Busy(Pin, String>> + Send>>), + Busy(Pin, String>> + Send>>), // Temporary state during processing Poisoned, } @@ -280,8 +278,6 @@ impl Handler { RpcRequestProtocol, lighthouse_network::rpc::outbound::OutboundRequest, >, - // stream: ::Output, - // info: Self::OutboundOpenInfo, ) { info!("[{}] on_fully_negotiated_outbound", self.peer_id,); let request = outbound.info; diff --git a/src/rpc/protocol.rs b/src/rpc/protocol.rs index dd81a1f..896fb7c 100644 --- a/src/rpc/protocol.rs +++ b/src/rpc/protocol.rs @@ -2,8 +2,7 @@ use ::types::fork_context::ForkContext; use futures::future::BoxFuture; use futures::prelude::*; use libp2p::core::UpgradeInfo; -use libp2p::swarm::NegotiatedSubstream; -use libp2p::{InboundUpgrade, OutboundUpgrade, PeerId}; +use libp2p::{InboundUpgrade, OutboundUpgrade, PeerId, Stream}; use std::fmt::{Display, Formatter}; use std::sync::Arc; use std::time::Duration; @@ -178,21 +177,15 @@ impl UpgradeInfo for RpcRequestProtocol { } } -pub(crate) type OutboundFramed = Framed< - Compat, - lighthouse_network::rpc::codec::OutboundCodec, ->; +pub(crate) type OutboundFramed = + Framed, lighthouse_network::rpc::codec::OutboundCodec>; -impl OutboundUpgrade for RpcRequestProtocol { +impl OutboundUpgrade for RpcRequestProtocol { type Output = OutboundFramed; type Error = lighthouse_network::rpc::RPCError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound( - self, - socket: NegotiatedSubstream, - protocol_id: Self::Info, - ) -> Self::Future { + fn upgrade_outbound(self, socket: Stream, protocol_id: Self::Info) -> Self::Future { info!( "[{}] RpcRequestProtocol::upgrade_outbound: request: {:?}", self.request.peer_id, self.request.request