diff --git a/examples/kvstore_34/main.rs b/examples/kvstore_34/main.rs index 6bf5f23..6c3fac6 100644 --- a/examples/kvstore_34/main.rs +++ b/examples/kvstore_34/main.rs @@ -10,12 +10,9 @@ use std::{ use bytes::Bytes; use futures::future::FutureExt; use structopt::StructOpt; -use tower::{Service, ServiceBuilder}; - use tendermint::abci::{Event, EventAttributeIndexExt}; - -use tendermint::v0_34::abci::response; -use tendermint::v0_34::abci::{Request, Response}; +use tendermint::v0_34::abci::{response, Request, Response}; +use tower::{Service, ServiceBuilder}; use tower_abci::{ v034::{split, Server}, @@ -141,6 +138,10 @@ struct Opt { /// Bind the TCP server to this port. #[structopt(short, long, default_value = "26658")] port: u16, + + /// Bind the UDS server to this path + #[structopt(long)] + uds: Option, } #[tokio::main] @@ -157,7 +158,7 @@ async fn main() { // Hand those components to the ABCI server, but customize request behavior // for each category -- for instance, apply load-shedding only to mempool // and info requests, but not to consensus requests. - let server = Server::builder() + let server_builder = Server::builder() .consensus(consensus) .snapshot(snapshot) .mempool( @@ -172,13 +173,16 @@ async fn main() { .buffer(100) .rate_limit(50, std::time::Duration::from_secs(1)) .service(info), - ) - .finish() - .unwrap(); - - // Run the ABCI server. - server - .listen(format!("{}:{}", opt.host, opt.port)) - .await - .unwrap(); + ); + + let server = server_builder.finish().unwrap(); + + if let Some(uds_path) = opt.uds { + server.listen_unix(uds_path).await.unwrap(); + } else { + server + .listen_tcp(format!("{}:{}", opt.host, opt.port)) + .await + .unwrap(); + } } diff --git a/examples/kvstore_37/main.rs b/examples/kvstore_37/main.rs index 9ca6598..3ad5fbb 100644 --- a/examples/kvstore_37/main.rs +++ b/examples/kvstore_37/main.rs @@ -185,12 +185,11 @@ async fn main() { let server = server_builder.finish().unwrap(); if let Some(uds_path) = opt.uds { - // Run the ABCI server. server.listen_unix(uds_path).await.unwrap(); } else { - let tcp_addr = format!("{}:{}", opt.host, opt.port); - - // Run the ABCI server. - server.listen_tcp(tcp_addr).await.unwrap(); + server + .listen_tcp(format!("{}:{}", opt.host, opt.port)) + .await + .unwrap(); } } diff --git a/src/v034/server.rs b/src/v034/server.rs index 1d8afd1..1577149 100644 --- a/src/v034/server.rs +++ b/src/v034/server.rs @@ -1,22 +1,25 @@ use std::convert::{TryFrom, TryInto}; +use std::path::Path; use futures::future::{FutureExt, TryFutureExt}; use futures::sink::SinkExt; use futures::stream::{FuturesOrdered, StreamExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::{ - net::{TcpListener, TcpStream, ToSocketAddrs}, + net::{TcpListener, ToSocketAddrs, UnixListener}, select, }; use tokio_util::codec::{FramedRead, FramedWrite}; use tower::{Service, ServiceExt}; +use crate::BoxError; use tendermint::abci::MethodKind; + use tendermint::v0_34::abci::{ ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest, MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse, }; -use crate::BoxError; /// An ABCI server which listens for connections and forwards requests to four /// component ABCI [`Service`]s. pub struct Server { @@ -123,29 +126,54 @@ where ServerBuilder::default() } - pub async fn listen(self, addr: A) -> Result<(), BoxError> { - tracing::info!(?addr, "starting ABCI server"); + pub async fn listen_unix(self, path: impl AsRef) -> Result<(), BoxError> { + let listener = UnixListener::bind(path)?; + let addr = listener.local_addr()?; + tracing::info!(?addr, "ABCI server starting on uds"); + + loop { + match listener.accept().await { + Ok((socket, _addr)) => { + tracing::debug!(?_addr, "accepted new connection"); + let conn = Connection { + consensus: self.consensus.clone(), + mempool: self.mempool.clone(), + info: self.info.clone(), + snapshot: self.snapshot.clone(), + }; + let (read, write) = socket.into_split(); + tokio::spawn(async move { conn.run(read, write).await.unwrap() }); + } + Err(e) => { + tracing::error!({ %e }, "error accepting new connection"); + } + } + } + } + + pub async fn listen_tcp( + self, + addr: A, + ) -> Result<(), BoxError> { let listener = TcpListener::bind(addr).await?; - let local_addr = listener.local_addr()?; - tracing::info!(?local_addr, "bound tcp listener"); + let addr = listener.local_addr()?; + tracing::info!(?addr, "ABCI server starting on tcp socket"); loop { match listener.accept().await { Ok((socket, _addr)) => { - // set parent: None for the connection span, as it should - // exist independently of the listener's spans. - //let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr); + tracing::debug!(?_addr, "accepted new connection"); let conn = Connection { consensus: self.consensus.clone(), mempool: self.mempool.clone(), info: self.info.clone(), snapshot: self.snapshot.clone(), }; - //tokio::spawn(async move { conn.run(socket).await.unwrap() }.instrument(span)); - tokio::spawn(async move { conn.run(socket).await.unwrap() }); + let (read, write) = socket.into_split(); + tokio::spawn(async move { conn.run(read, write).await.unwrap() }); } Err(e) => { - tracing::warn!({ %e }, "error accepting new tcp connection"); + tracing::error!({ %e }, "error accepting new connection"); } } } @@ -172,14 +200,18 @@ where { // XXX handle errors gracefully // figure out how / if to return errors to tendermint - async fn run(mut self, mut socket: TcpStream) -> Result<(), BoxError> { + async fn run( + mut self, + stream: impl AsyncReadExt + std::marker::Unpin, + sink: impl AsyncWriteExt + std::marker::Unpin, + ) -> Result<(), BoxError> { tracing::info!("listening for requests"); use tendermint_proto::v0_34::abci as pb; let (mut request_stream, mut response_sink) = { use crate::v034::codec::{Decode, Encode}; - let (read, write) = socket.split(); + let (read, write) = (stream, sink); ( FramedRead::new(read, Decode::::default()), FramedWrite::new(write, Encode::::default()),