Skip to content

Commit

Permalink
graceful api shutdown (#3677)
Browse files Browse the repository at this point in the history
  • Loading branch information
deevope authored Jan 7, 2022
1 parent 2237f42 commit c92d2e9
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 97 deletions.
91 changes: 27 additions & 64 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ctrlc = { version = "3.1", features = ["termination"] }
cursive_table_view = "0.13.2"
humansize = "1.1.0"
serde = "1"
futures = "0.3.19"
serde_json = "1"
log = "0.4"
term = "0.6"
Expand Down
21 changes: 20 additions & 1 deletion api/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ use crate::router::ResponseFuture;
use crate::router::Router;
use crate::util::to_base64;
use crate::util::RwLock;
use crate::util::StopState;
use crate::web::*;
use easy_jsonrpc_mw::{Handler, MaybeReply};
use futures::channel::oneshot;
use hyper::{Body, Request, Response, StatusCode};
use serde::Serialize;
use std::net::SocketAddr;
use std::sync::{Arc, Weak};
use std::thread;

/// Listener version, providing same API but listening for requests on a
/// port and wrapping the calls
Expand All @@ -56,6 +59,8 @@ pub fn node_apis<B, P>(
api_secret: Option<String>,
foreign_api_secret: Option<String>,
tls_config: Option<TLSConfig>,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
stop_state: Arc<StopState>,
) -> Result<(), Error>
where
B: BlockChain + 'static,
Expand Down Expand Up @@ -104,10 +109,24 @@ where
let mut apis = ApiServer::new();
warn!("Starting HTTP Node APIs server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
let api_thread = apis.start(socket_addr, router, tls_config);
let api_thread = apis.start(socket_addr, router, tls_config, api_chan);

warn!("HTTP Node listener started.");

thread::Builder::new()
.name("api_monitor".to_string())
.spawn(move || {
// monitor for stop state is_stopped
loop {
std::thread::sleep(std::time::Duration::from_millis(100));
if stop_state.is_stopped() {
apis.stop();
break;
}
}
})
.ok();

match api_thread {
Ok(_) => Ok(()),
Err(e) => {
Expand Down
54 changes: 38 additions & 16 deletions api/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,11 @@ impl ApiServer {
addr: SocketAddr,
router: Router,
conf: Option<TLSConfig>,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) -> Result<thread::JoinHandle<()>, Error> {
match conf {
Some(conf) => self.start_tls(addr, router, conf),
None => self.start_no_tls(addr, router),
Some(conf) => self.start_tls(addr, router, conf, api_chan),
None => self.start_no_tls(addr, router, api_chan),
}
}

Expand All @@ -198,25 +199,34 @@ impl ApiServer {
&mut self,
addr: SocketAddr,
router: Router,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) -> Result<thread::JoinHandle<()>, Error> {
if self.shutdown_sender.is_some() {
return Err(ErrorKind::Internal(
"Can't start HTTP API server, it's running already".to_string(),
)
.into());
}
let (tx, _rx) = oneshot::channel::<()>();
let rx = &mut api_chan.1;
let tx = &mut api_chan.0;

// Jones's trick to update memory
let m = oneshot::channel::<()>();
let tx = std::mem::replace(tx, m.0);
self.shutdown_sender = Some(tx);

thread::Builder::new()
.name("apis".to_string())
.spawn(move || {
let server = async move {
let server = Server::bind(&addr).serve(make_service_fn(move |_| {
let router = router.clone();
async move { Ok::<_, Infallible>(router) }
}));
// TODO graceful shutdown is unstable, investigate
//.with_graceful_shutdown(rx)
let server = Server::bind(&addr)
.serve(make_service_fn(move |_| {
let router = router.clone();
async move { Ok::<_, Infallible>(router) }
}))
.with_graceful_shutdown(async {
rx.await.ok();
});

server.await
};
Expand All @@ -238,6 +248,7 @@ impl ApiServer {
addr: SocketAddr,
router: Router,
conf: TLSConfig,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) -> Result<thread::JoinHandle<()>, Error> {
if self.shutdown_sender.is_some() {
return Err(ErrorKind::Internal(
Expand All @@ -246,6 +257,14 @@ impl ApiServer {
.into());
}

let rx = &mut api_chan.1;
let tx = &mut api_chan.0;

// Jones's trick to update memory
let m = oneshot::channel::<()>();
let tx = std::mem::replace(tx, m.0);
self.shutdown_sender = Some(tx);

let acceptor = TlsAcceptor::from(conf.build_server_config()?);

thread::Builder::new()
Expand All @@ -258,12 +277,14 @@ impl ApiServer {
.and_then(move |s| acceptor.accept(s))
.filter(|r| r.is_ok());

let server = Server::builder(accept::from_stream(listener)).serve(
make_service_fn(move |_| {
let server = Server::builder(accept::from_stream(listener))
.serve(make_service_fn(move |_| {
let router = router.clone();
async move { Ok::<_, Infallible>(router) }
}),
);
}))
.with_graceful_shutdown(async {
rx.await.ok();
});

server.await
};
Expand All @@ -281,9 +302,10 @@ impl ApiServer {
/// Stops the API server, it panics in case of error
pub fn stop(&mut self) -> bool {
if self.shutdown_sender.is_some() {
// TODO re-enable stop after investigation
//let tx = mem::replace(&mut self.shutdown_sender, None).unwrap();
//tx.send(()).expect("Failed to stop API server");
let tx = self.shutdown_sender.as_mut().unwrap();
let m = oneshot::channel::<()>();
let tx = std::mem::replace(tx, m.0);
tx.send(()).expect("Failed to stop API server");
info!("API server has been stopped");
true
} else {
Expand Down
Loading

0 comments on commit c92d2e9

Please sign in to comment.