Skip to content

Commit

Permalink
feat(core): handling CTRL-C signal with graceful shutdown (#2213)
Browse files Browse the repository at this point in the history
  • Loading branch information
onur-ozkan authored Sep 19, 2024
1 parent 3b27172 commit baa72a7
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ opt-level = 3
strip = true
codegen-units = 1
# lto = true
panic = "abort"
panic = 'unwind'

[profile.dev]
opt-level = 0
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_bin_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,5 @@ fn prepare_for_mm2_stop() -> PrepareForStopResult {

async fn finalize_mm2_stop(ctx: MmArc) {
dispatch_lp_event(ctx.clone(), StopCtxEvent.into()).await;
let _ = ctx.stop();
let _ = ctx.stop().await;
}
15 changes: 14 additions & 1 deletion mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,10 @@ lazy_static! {
impl MmArc {
pub fn new(ctx: MmCtx) -> MmArc { MmArc(SharedRc::new(ctx)) }

pub fn stop(&self) -> Result<(), String> {
pub async fn stop(&self) -> Result<(), String> {
#[cfg(not(target_arch = "wasm32"))]
try_s!(self.close_async_connection().await);

try_s!(self.stop.pin(true));

// Notify shutdown listeners.
Expand All @@ -517,6 +520,16 @@ impl MmArc {
Ok(())
}

#[cfg(not(target_arch = "wasm32"))]
async fn close_async_connection(&self) -> Result<(), db_common::async_sql_conn::AsyncConnError> {
if let Some(async_conn) = self.async_sqlite_connection.as_option() {
let mut conn = async_conn.lock().await;
conn.close().await?;
}

Ok(())
}

#[cfg(feature = "track-ctx-pointer")]
fn track_ctx_pointer(&self) {
let ctx_weak = self.weak();
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ hyper = { version = "0.14.26", features = ["client", "http2", "server", "tcp"] }
rcgen = "0.10"
rustls = { version = "0.21", default-features = false }
rustls-pemfile = "1.0.2"
tokio = { version = "1.20", features = ["io-util", "rt-multi-thread", "net"] }
tokio = { version = "1.20", features = ["io-util", "rt-multi-thread", "net", "signal"] }

[target.'cfg(windows)'.dependencies]
winapi = "0.3"
Expand Down
27 changes: 25 additions & 2 deletions mm2src/mm2_main/src/mm2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

#[cfg(not(target_arch = "wasm32"))] use common::block_on;
use common::crash_reports::init_crash_reports;
use common::log;
use common::log::LogLevel;
use common::password_policy::password_policy;
use mm2_core::mm_ctx::MmCtxBuilder;
Expand All @@ -53,7 +54,6 @@ use lp_swap::PAYMENT_LOCKTIME;
use std::sync::atomic::Ordering;

use gstuff::slurp;

use serde::ser::Serialize;
use serde_json::{self as json, Value as Json};

Expand All @@ -63,7 +63,6 @@ use std::process::exit;
use std::ptr::null;
use std::str;

mod lp_native_dex;
pub use self::lp_native_dex::init_hw;
pub use self::lp_native_dex::lp_init;
use coins::update_coins_config;
Expand All @@ -74,6 +73,7 @@ use mm2_err_handle::prelude::*;
pub mod heartbeat_event;
pub mod lp_dispatcher;
pub mod lp_message_service;
mod lp_native_dex;
pub mod lp_network;
pub mod lp_ordermatch;
pub mod lp_stats;
Expand Down Expand Up @@ -159,10 +159,33 @@ pub async fn lp_main(
.with_datetime(datetime.clone())
.into_mm_arc();
ctx_cb(try_s!(ctx.ffi_handle()));

#[cfg(not(target_arch = "wasm32"))]
spawn_ctrl_c_handler(ctx.clone());

try_s!(lp_init(ctx, version, datetime).await);
Ok(())
}

/// Handles CTRL-C signals and shutdowns the KDF runtime gracefully.
///
/// It's important to spawn this task as soon as `Ctx` is in the correct state.
#[cfg(not(target_arch = "wasm32"))]
fn spawn_ctrl_c_handler(ctx: mm2_core::mm_ctx::MmArc) {
use crate::lp_dispatcher::{dispatch_lp_event, StopCtxEvent};

common::executor::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("Couldn't listen for the CTRL-C signal.");

log::info!("Wrapping things up and shutting down...");

dispatch_lp_event(ctx.clone(), StopCtxEvent.into()).await;
ctx.stop().await.expect("Couldn't stop the KDF runtime.");
});
}

fn help() {
const HELP_MSG: &str = r#"Command-line options.
The first command-line argument is special and designates the mode.
Expand Down
19 changes: 1 addition & 18 deletions mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

use coins::{lp_coinfind, lp_coinfind_any, lp_coininit, CoinsContext, MmCoinEnum};
use common::executor::Timer;
use common::log::error;
use common::{rpc_err_response, rpc_response, HyRes};
use futures::compat::Future01CompatExt;
use http::Response;
Expand Down Expand Up @@ -242,29 +241,13 @@ pub async fn my_balance(ctx: MmArc, req: Json) -> Result<Response<Vec<u8>>, Stri
Ok(try_s!(Response::builder().body(res)))
}

#[cfg(not(target_arch = "wasm32"))]
async fn close_async_connection(ctx: &MmArc) {
if let Some(async_conn) = ctx.async_sqlite_connection.as_option() {
let mut conn = async_conn.lock().await;
if let Err(e) = conn.close().await {
error!("Error stopping AsyncConnection: {}", e);
}
}
}

pub async fn stop(ctx: MmArc) -> Result<Response<Vec<u8>>, String> {
dispatch_lp_event(ctx.clone(), StopCtxEvent.into()).await;
// Should delay the shutdown a bit in order not to trip the "stop" RPC call in unit tests.
// Stopping immediately leads to the "stop" RPC call failing with the "errno 10054" sometimes.
let fut = async move {
Timer::sleep(0.05).await;

#[cfg(not(target_arch = "wasm32"))]
close_async_connection(&ctx).await;

if let Err(e) = ctx.stop() {
error!("Error stopping MmCtx: {}", e);
}
ctx.stop().await.expect("Couldn't stop the KDF runtime.");
};

// Please note we shouldn't use `MmCtx::spawner` to spawn this future,
Expand Down

0 comments on commit baa72a7

Please sign in to comment.