Skip to content

Commit

Permalink
use more common code for upgrade actions
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Jul 18, 2024
1 parent 70ffae8 commit d731fa8
Show file tree
Hide file tree
Showing 15 changed files with 252 additions and 348 deletions.
3 changes: 2 additions & 1 deletion g3keymess/src/control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ mod bridge;
mod quit;
pub use quit::QuitActor;

pub mod upgrade;
mod upgrade;
pub use upgrade::UpgradeActor;

mod local;
pub use local::{DaemonController, UniqueController};
Expand Down
8 changes: 1 addition & 7 deletions g3keymess/src/control/quit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,9 @@ use g3_daemon::control::quit::QuitAction;

use super::local::{DaemonController, UniqueController};

#[derive(Default)]
pub struct QuitActor {}

impl QuitActor {
pub fn spawn_run() {
let actor = QuitActor {};
tokio::spawn(actor.into_running(g3_daemon::runtime::config::get_server_offline_delay()));
}
}

impl QuitAction for QuitActor {
async fn do_release_controller(&self) {
DaemonController::abort().await;
Expand Down
126 changes: 26 additions & 100 deletions g3keymess/src/control/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,122 +14,48 @@
* limitations under the License.
*/

use std::sync::Mutex;
use std::thread::JoinHandle;
use g3_daemon::control::upgrade::UpgradeAction;

use anyhow::anyhow;
use log::warn;
use tokio::sync::{mpsc, oneshot};
use capnp_rpc::rpc_twoparty_capnp::Side;
use capnp_rpc::RpcSystem;

use g3keymess_proto::proc_capnp::proc_control;
use g3keymess_proto::types_capnp::operation_result;

use g3_daemon::control::LocalController;

static MSG_CHANNEL: Mutex<Option<mpsc::Sender<Msg>>> = Mutex::new(None);
static THREAD_HANDLE: Mutex<Option<JoinHandle<()>>> = Mutex::new(None);

enum Msg {
CancelShutdown,
ReleaseController(oneshot::Sender<()>),
ConfirmShutdown,
pub struct UpgradeActor {
proc_control: proc_control::Client,
}

pub fn cancel_old_shutdown() {
let msg_channel = MSG_CHANNEL.lock().unwrap().take();
if let Some(sender) = msg_channel {
let _ = sender.try_send(Msg::CancelShutdown);
let handle = THREAD_HANDLE.lock().unwrap().take();
if let Some(handle) = handle {
let _ = handle.join();
}
impl UpgradeAction for UpgradeActor {
async fn connect_rpc() -> anyhow::Result<(RpcSystem<Side>, Self)> {
LocalController::connect_rpc::<proc_control::Client>(
crate::build::PKG_NAME,
crate::opts::daemon_group(),
)
.await
.map(|(r, proc_control)| (r, UpgradeActor { proc_control }))
}
}

pub async fn release_old_controller() {
let msg_channel = MSG_CHANNEL.lock().unwrap().clone();
if let Some(sender) = msg_channel {
let (done_sender, done_receiver) = oneshot::channel();
if sender
.send(Msg::ReleaseController(done_sender))
.await
.is_ok()
{
let _ = done_receiver.await;
}
async fn cancel_shutdown(&self) -> anyhow::Result<()> {
let req = self.proc_control.cancel_shutdown_request();
let rsp = req.send().promise.await?;
check_operation_result(rsp.get()?.get_result()?)
}
}

pub fn finish() {
let msg_channel = MSG_CHANNEL.lock().unwrap().take();
if let Some(sender) = msg_channel {
let _ = sender.try_send(Msg::ConfirmShutdown);
let handle = THREAD_HANDLE.lock().unwrap().take();
if let Some(handle) = handle {
tokio::task::spawn_blocking(move || {
let _ = handle.join();
});
}
async fn release_controller(&self) -> anyhow::Result<()> {
let req = self.proc_control.release_controller_request();
let rsp = req.send().promise.await?;
check_operation_result(rsp.get()?.get_result()?)
}
}

pub fn connect_to_old_daemon() {
let (sender, receiver) = mpsc::channel(4);
let handle = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
rt.block_on(async {
if let Err(e) = connect_run(receiver).await {
warn!("upgrade channel error: {e}");
}
})
});
let mut msg_channel = MSG_CHANNEL.lock().unwrap();
*msg_channel = Some(sender);
let mut thread_handle = THREAD_HANDLE.lock().unwrap();
*thread_handle = Some(handle);
}

async fn connect_run(mut msg_receiver: mpsc::Receiver<Msg>) -> anyhow::Result<()> {
let (rpc_system, proc_control) = LocalController::connect_rpc::<proc_control::Client>(
crate::build::PKG_NAME,
crate::opts::daemon_group(),
)
.await?;
tokio::task::LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
rpc_system
.await
.map_err(|e| warn!("upgrade rpc system error: {e:?}"))
});

while let Some(msg) = msg_receiver.recv().await {
match msg {
Msg::CancelShutdown => {
let req = proc_control.cancel_shutdown_request();
let rsp = req.send().promise.await?;
return check_operation_result(rsp.get()?.get_result()?);
}
Msg::ReleaseController(finish_sender) => {
let req = proc_control.release_controller_request();
let rsp = req.send().promise.await?;
check_operation_result(rsp.get()?.get_result()?)?;
let _ = finish_sender.send(());
}
Msg::ConfirmShutdown => {
let req = proc_control.offline_request();
let rsp = req.send().promise.await?;
return check_operation_result(rsp.get()?.get_result()?);
}
}
}

Ok(())
})
.await
async fn confirm_shutdown(&self) -> anyhow::Result<()> {
let req = self.proc_control.offline_request();
let rsp = req.send().promise.await?;
check_operation_result(rsp.get()?.get_result()?)
}
}

fn check_operation_result(r: operation_result::Reader<'_>) -> anyhow::Result<()> {
Expand Down
14 changes: 8 additions & 6 deletions g3keymess/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
use anyhow::{anyhow, Context};
use log::{debug, error, info, warn};

use g3_daemon::control::{QuitAction, UpgradeAction};

use g3keymess::opts::ProcArgs;

fn main() -> anyhow::Result<()> {
Expand All @@ -39,13 +41,13 @@ fn main() -> anyhow::Result<()> {
// set up process logger early, only proc args is used inside
let _log_guard = g3_daemon::log::process::setup(&proc_args.daemon_config);
if proc_args.daemon_config.need_daemon_controller() {
g3keymess::control::upgrade::connect_to_old_daemon();
g3keymess::control::UpgradeActor::connect_to_old_daemon();
}

let config_file = match g3keymess::config::load() {
Ok(c) => c,
Err(e) => {
g3keymess::control::upgrade::cancel_old_shutdown();
g3_daemon::control::upgrade::cancel_old_shutdown();
return Err(e.context("failed to load config"));
}
};
Expand Down Expand Up @@ -120,21 +122,21 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> {
.start()
.context("failed to start unique controller")?;
if args.daemon_config.need_daemon_controller() {
g3keymess::control::upgrade::release_old_controller().await;
g3_daemon::control::upgrade::release_old_controller().await;
let daemon_ctl = g3keymess::control::DaemonController::start()
.context("failed to start daemon controller")?;
tokio::spawn(async move {
daemon_ctl.await;
});
}
g3keymess::control::QuitActor::spawn_run();
g3keymess::control::QuitActor::tokio_spawn_run();

g3keymess::signal::register().context("failed to setup signal handler")?;

match load_and_spawn(unique_ctl_path).await {
Ok(_) => g3keymess::control::upgrade::finish(),
Ok(_) => g3_daemon::control::upgrade::finish(),
Err(e) => {
g3keymess::control::upgrade::cancel_old_shutdown();
g3_daemon::control::upgrade::cancel_old_shutdown();
return Err(e);
}
}
Expand Down
3 changes: 2 additions & 1 deletion g3proxy/src/control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ mod bridge;
mod quit;
pub use quit::QuitActor;

pub mod upgrade;
mod upgrade;
pub use upgrade::UpgradeActor;

mod local;
pub use local::{DaemonController, UniqueController};
Expand Down
8 changes: 1 addition & 7 deletions g3proxy/src/control/quit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,9 @@ use g3_daemon::control::quit::QuitAction;

use super::local::{DaemonController, UniqueController};

#[derive(Default)]
pub struct QuitActor {}

impl QuitActor {
pub fn spawn_run() {
let actor = QuitActor {};
tokio::spawn(actor.into_running(g3_daemon::runtime::config::get_server_offline_delay()));
}
}

impl QuitAction for QuitActor {
async fn do_release_controller(&self) {
DaemonController::abort().await;
Expand Down
Loading

0 comments on commit d731fa8

Please sign in to comment.