Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Unix domain sockets #35

Merged
merged 6 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions examples/kvstore_37/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ struct Opt {
/// Bind the TCP server to this port.
#[structopt(short, long, default_value = "26658")]
port: u16,

/// Bind the UDS server to this path
#[structopt(long)]
uds: Option<String>,
}

#[tokio::main]
Expand All @@ -161,7 +165,7 @@ async fn main() {
// Hand those components to the ABCI server, but customize request behavior
// for each category -- for instance, apply load-shedding only to mempool
// and info requests, but not to consensus requests.
let server = Server::builder()
let mut server_builder = Server::builder()
.consensus(consensus)
.snapshot(snapshot)
.mempool(
Expand All @@ -177,12 +181,14 @@ async fn main() {
.rate_limit(50, std::time::Duration::from_secs(1))
.service(info),
)
.finish()
.with_tcp_addr(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
if let Some(uds) = &opt.uds {
server_builder = server_builder.with_uds_path(uds).unwrap();
}
let server = server_builder.finish().unwrap();

// Run the ABCI server.
server
.listen(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
server.start().await.unwrap();
}
182 changes: 156 additions & 26 deletions src/v037/server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use std::convert::{TryFrom, TryInto};
use std::io;
use std::path::Path;

use futures::future::{FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{FuturesOrdered, StreamExt};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::{
net::{TcpListener, TcpStream, ToSocketAddrs},
net::{TcpListener, TcpStream, ToSocketAddrs, UnixListener, UnixStream},
select,
};
use tokio_util::codec::{FramedRead, FramedWrite};
use tower::{Service, ServiceExt};

use tendermint::abci::MethodKind;
use tracing::warn;

use crate::BoxError;

Expand All @@ -19,20 +23,61 @@ use tendermint::v0_37::abci::{
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
};

enum SocketKind {
Tcp(TcpStream),
Uds(UnixStream),
}

impl SocketKind {
fn split<'a>(
&'a mut self,
) -> (
Box<dyn 'a + AsyncRead + Send + Unpin>,
Box<dyn 'a + AsyncWrite + Send + Unpin>,
) {
match self {
Self::Tcp(tcp) => {
let (read, write) = tcp.split();
(Box::new(read), Box::new(write))
}
Self::Uds(uds) => {
let (read, write) = uds.split();
(Box::new(read), Box::new(write))
}
erwanor marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

impl From<TcpStream> for SocketKind {
fn from(value: TcpStream) -> Self {
Self::Tcp(value)
}
}

impl From<UnixStream> for SocketKind {
fn from(value: UnixStream) -> Self {
Self::Uds(value)
}
}

/// An ABCI server which listens for connections and forwards requests to four
/// component ABCI [`Service`]s.
pub struct Server<C, M, I, S> {
consensus: C,
mempool: M,
info: I,
snapshot: S,
tcp_listener: Option<TcpListener>,
uds_listener: Option<UnixListener>,
}

pub struct ServerBuilder<C, M, I, S> {
consensus: Option<C>,
mempool: Option<M>,
info: Option<I>,
snapshot: Option<S>,
tcp_listener: Option<TcpListener>,
uds_listener: Option<UnixListener>,
}

impl<C, M, I, S> Default for ServerBuilder<C, M, I, S> {
Expand All @@ -42,6 +87,8 @@ impl<C, M, I, S> Default for ServerBuilder<C, M, I, S> {
mempool: None,
info: None,
snapshot: None,
tcp_listener: None,
uds_listener: None,
}
}
}
Expand Down Expand Up @@ -86,17 +133,44 @@ where
self
}

pub fn tcp_listener(mut self, tcp_listener: TcpListener) -> Self {
self.tcp_listener = Some(tcp_listener);
self
}

pub async fn with_tcp_addr(mut self, addr: impl ToSocketAddrs) -> io::Result<Self> {
self.tcp_listener = Some(TcpListener::bind(addr).await?);
Ok(self)
}

pub fn uds_listener(mut self, uds_listener: UnixListener) -> Self {
self.uds_listener = Some(uds_listener);
self
}

pub fn with_uds_path(mut self, path: impl AsRef<Path>) -> io::Result<Self> {
self.uds_listener = Some(UnixListener::bind(path)?);
Ok(self)
}

pub fn finish(self) -> Option<Server<C, M, I, S>> {
let consensus = self.consensus?;
let mempool = self.mempool?;
let info = self.info?;
let snapshot = self.snapshot?;
let tcp_listener = self.tcp_listener;
let uds_listener = self.uds_listener;
if tcp_listener.is_none() && uds_listener.is_none() {
return None;
}

Some(Server {
consensus,
mempool,
info,
snapshot,
tcp_listener,
uds_listener,
})
}
}
Expand Down Expand Up @@ -125,31 +199,77 @@ where
ServerBuilder::default()
}

pub async fn listen<A: ToSocketAddrs + std::fmt::Debug>(self, addr: A) -> Result<(), BoxError> {
tracing::info!(?addr, "starting ABCI server");
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
tracing::info!(?local_addr, "bound tcp listener");
fn handle_new_connection(&self, socket: impl Into<SocketKind>) {
erwanor marked this conversation as resolved.
Show resolved Hide resolved
let socket = socket.into();
// set parent: None for the connection span, as it should
// exist independently of the listener's spans.
//let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
//tokio::spawn(async move { conn.run(socket).await.unwrap() }.instrument(span));
tokio::spawn(async move { conn.run(socket).await.unwrap() });
}

pub async fn start(self) -> Result<(), BoxError> {
tracing::info!("starting ABCI server");

let tcp_addr = self
.tcp_listener
.as_ref()
.map(TcpListener::local_addr)
.transpose()
.map_err(|e| {
warn!(err.msg = %e, err.cause_chain = ?e, "failed getting tcp local addr");
e
})
.ok()
.flatten();
let uds_addr = self
.uds_listener
.as_ref()
.map(UnixListener::local_addr)
.transpose()
.map_err(|e| {
warn!(err.msg = %e, err.cause_chain = ?e, "failed getting uds local addr");
e
})
.ok()
.flatten();

tracing::info!(
addr.tcp = tcp_addr.map(debug),
addr.uds = uds_addr.map(debug),
"listening on local addresses"
);

loop {
match listener.accept().await {
Ok((socket, _addr)) => {
// set parent: None for the connection span, as it should
// exist independently of the listener's spans.
//let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
//tokio::spawn(async move { conn.run(socket).await.unwrap() }.instrument(span));
tokio::spawn(async move { conn.run(socket).await.unwrap() });
select!(
tcp = async { self.tcp_listener.as_ref().unwrap().accept().await }, if self.tcp_listener.is_some() => {
match tcp {
Ok((socket, _addr)) => {
self.handle_new_connection(socket);
}
Err(e) => {
tracing::warn!({ %e }, "error accepting new tcp connection");
}
}
}
Err(e) => {
tracing::warn!({ %e }, "error accepting new tcp connection");

uds = async { self.uds_listener.as_ref().unwrap().accept().await }, if self.uds_listener.is_some() => {
erwanor marked this conversation as resolved.
Show resolved Hide resolved
match uds {
Ok((socket, _addr)) => {
self.handle_new_connection(socket);
}
Err(e) => {
tracing::warn!({ %e }, "error accepting new uds connection");
}
}
}
}
)
}
}
}
Expand All @@ -174,14 +294,24 @@ where
{
// XXX handle errors gracefully
// figure out how / if to return errors to tendermint
async fn run(mut self, mut socket: TcpStream) -> Result<(), BoxError> {
tracing::info!("listening for requests");

async fn run(mut self, mut socket: SocketKind) -> Result<(), BoxError> {
use tendermint_proto::v0_37::abci as pb;

match &socket {
SocketKind::Tcp(tcp) => {
tracing::info!(
addr = tcp.local_addr().ok().map(debug),
"listening for tcp requests"
);
}
SocketKind::Uds(uds) => {
tracing::info!(addr = ?uds.local_addr().ok().map(debug), "listening for uds requests");
}
}

let (mut request_stream, mut response_sink) = {
use crate::v037::codec::{Decode, Encode};
let (read, write) = socket.split();
let (read, write) = SocketKind::split(&mut socket);
(
FramedRead::new(read, Decode::<pb::Request>::default()),
FramedWrite::new(write, Encode::<pb::Response>::default()),
Expand Down
Loading