Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Unix domain sockets #35

Merged
merged 6 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions examples/kvstore_34/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ use std::{
use bytes::Bytes;
use futures::future::FutureExt;
use structopt::StructOpt;
use tower::{Service, ServiceBuilder};

use tendermint::abci::{Event, EventAttributeIndexExt};

use tendermint::v0_34::abci::response;
use tendermint::v0_34::abci::{Request, Response};
use tendermint::v0_34::abci::{response, Request, Response};
use tower::{Service, ServiceBuilder};

use tower_abci::{
v034::{split, Server},
Expand Down Expand Up @@ -141,6 +138,10 @@ struct Opt {
/// Bind the TCP server to this port.
#[structopt(short, long, default_value = "26658")]
port: u16,

/// Bind the UDS server to this path
#[structopt(long)]
uds: Option<String>,
}

#[tokio::main]
Expand All @@ -157,7 +158,7 @@ async fn main() {
// Hand those components to the ABCI server, but customize request behavior
// for each category -- for instance, apply load-shedding only to mempool
// and info requests, but not to consensus requests.
let server = Server::builder()
let server_builder = Server::builder()
.consensus(consensus)
.snapshot(snapshot)
.mempool(
Expand All @@ -172,13 +173,16 @@ async fn main() {
.buffer(100)
.rate_limit(50, std::time::Duration::from_secs(1))
.service(info),
)
.finish()
.unwrap();

// Run the ABCI server.
server
.listen(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
);

let server = server_builder.finish().unwrap();

if let Some(uds_path) = opt.uds {
server.listen_unix(uds_path).await.unwrap();
} else {
server
.listen_tcp(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
}
}
27 changes: 17 additions & 10 deletions examples/kvstore_37/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ struct Opt {
/// Bind the TCP server to this port.
#[structopt(short, long, default_value = "26658")]
port: u16,

/// Bind the UDS server to this path
#[structopt(long)]
uds: Option<String>,
}

#[tokio::main]
Expand All @@ -161,7 +165,7 @@ async fn main() {
// Hand those components to the ABCI server, but customize request behavior
// for each category -- for instance, apply load-shedding only to mempool
// and info requests, but not to consensus requests.
let server = Server::builder()
let server_builder = Server::builder()
.consensus(consensus)
.snapshot(snapshot)
.mempool(
Expand All @@ -176,13 +180,16 @@ async fn main() {
.buffer(100)
.rate_limit(50, std::time::Duration::from_secs(1))
.service(info),
)
.finish()
.unwrap();

// Run the ABCI server.
server
.listen(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
);

let server = server_builder.finish().unwrap();

if let Some(uds_path) = opt.uds {
server.listen_unix(uds_path).await.unwrap();
} else {
server
.listen_tcp(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
}
}
60 changes: 46 additions & 14 deletions src/v034/server.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
use std::convert::{TryFrom, TryInto};
use std::path::Path;

use futures::future::{FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{FuturesOrdered, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::{
net::{TcpListener, TcpStream, ToSocketAddrs},
net::{TcpListener, ToSocketAddrs, UnixListener},
select,
};
use tokio_util::codec::{FramedRead, FramedWrite};
use tower::{Service, ServiceExt};

use crate::BoxError;
use tendermint::abci::MethodKind;

use tendermint::v0_34::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
};

use crate::BoxError;
/// An ABCI server which listens for connections and forwards requests to four
/// component ABCI [`Service`]s.
pub struct Server<C, M, I, S> {
Expand Down Expand Up @@ -123,29 +126,54 @@ where
ServerBuilder::default()
}

pub async fn listen<A: ToSocketAddrs + std::fmt::Debug>(self, addr: A) -> Result<(), BoxError> {
tracing::info!(?addr, "starting ABCI server");
pub async fn listen_unix(self, path: impl AsRef<Path>) -> Result<(), BoxError> {
let listener = UnixListener::bind(path)?;
let addr = listener.local_addr()?;
tracing::info!(?addr, "ABCI server starting on uds");

loop {
match listener.accept().await {
Ok((socket, _addr)) => {
tracing::debug!(?_addr, "accepted new connection");
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
}
Err(e) => {
tracing::error!({ %e }, "error accepting new connection");
}
}
}
}

pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
self,
addr: A,
) -> Result<(), BoxError> {
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
tracing::info!(?local_addr, "bound tcp listener");
let addr = listener.local_addr()?;
tracing::info!(?addr, "ABCI server starting on tcp socket");

loop {
match listener.accept().await {
Ok((socket, _addr)) => {
// set parent: None for the connection span, as it should
// exist independently of the listener's spans.
//let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);
tracing::debug!(?_addr, "accepted new connection");
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
//tokio::spawn(async move { conn.run(socket).await.unwrap() }.instrument(span));
tokio::spawn(async move { conn.run(socket).await.unwrap() });
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
}
Err(e) => {
tracing::warn!({ %e }, "error accepting new tcp connection");
tracing::error!({ %e }, "error accepting new connection");
}
}
}
Expand All @@ -172,14 +200,18 @@ where
{
// XXX handle errors gracefully
// figure out how / if to return errors to tendermint
async fn run(mut self, mut socket: TcpStream) -> Result<(), BoxError> {
async fn run(
mut self,
stream: impl AsyncReadExt + std::marker::Unpin,
sink: impl AsyncWriteExt + std::marker::Unpin,
) -> Result<(), BoxError> {
tracing::info!("listening for requests");

use tendermint_proto::v0_34::abci as pb;

let (mut request_stream, mut response_sink) = {
use crate::v034::codec::{Decode, Encode};
let (read, write) = socket.split();
let (read, write) = (stream, sink);
(
FramedRead::new(read, Decode::<pb::Request>::default()),
FramedWrite::new(write, Encode::<pb::Response>::default()),
Expand Down
60 changes: 45 additions & 15 deletions src/v037/server.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::convert::{TryFrom, TryInto};
use std::path::Path;

use futures::future::{FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{FuturesOrdered, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::{
net::{TcpListener, TcpStream, ToSocketAddrs},
net::{TcpListener, ToSocketAddrs, UnixListener},
select,
};
use tokio_util::codec::{FramedRead, FramedWrite};
use tower::{Service, ServiceExt};

use tendermint::abci::MethodKind;

use crate::BoxError;
use tendermint::abci::MethodKind;

use tendermint::v0_37::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
Expand Down Expand Up @@ -125,29 +126,54 @@ where
ServerBuilder::default()
}

pub async fn listen<A: ToSocketAddrs + std::fmt::Debug>(self, addr: A) -> Result<(), BoxError> {
tracing::info!(?addr, "starting ABCI server");
pub async fn listen_unix(self, path: impl AsRef<Path>) -> Result<(), BoxError> {
let listener = UnixListener::bind(path)?;
let addr = listener.local_addr()?;
tracing::info!(?addr, "ABCI server starting on uds");

loop {
match listener.accept().await {
Ok((socket, _addr)) => {
tracing::debug!(?_addr, "accepted new connection");
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
}
Err(e) => {
tracing::error!({ %e }, "error accepting new connection");
}
}
}
}

pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
self,
addr: A,
) -> Result<(), BoxError> {
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
tracing::info!(?local_addr, "bound tcp listener");
let addr = listener.local_addr()?;
tracing::info!(?addr, "ABCI server starting on tcp socket");

loop {
match listener.accept().await {
Ok((socket, _addr)) => {
// set parent: None for the connection span, as it should
// exist independently of the listener's spans.
//let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);
tracing::debug!(?_addr, "accepted new connection");
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
//tokio::spawn(async move { conn.run(socket).await.unwrap() }.instrument(span));
tokio::spawn(async move { conn.run(socket).await.unwrap() });
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
}
Err(e) => {
tracing::warn!({ %e }, "error accepting new tcp connection");
tracing::error!({ %e }, "error accepting new connection");
}
}
}
Expand All @@ -174,14 +200,18 @@ where
{
// XXX handle errors gracefully
// figure out how / if to return errors to tendermint
async fn run(mut self, mut socket: TcpStream) -> Result<(), BoxError> {
async fn run(
mut self,
stream: impl AsyncReadExt + std::marker::Unpin,
sink: impl AsyncWriteExt + std::marker::Unpin,
) -> Result<(), BoxError> {
tracing::info!("listening for requests");

use tendermint_proto::v0_37::abci as pb;

let (mut request_stream, mut response_sink) = {
use crate::v037::codec::{Decode, Encode};
let (read, write) = socket.split();
let (read, write) = (stream, sink);
(
FramedRead::new(read, Decode::<pb::Request>::default()),
FramedWrite::new(write, Encode::<pb::Response>::default()),
Expand Down
Loading