Skip to content

Commit

Permalink
refactor(control): simplify endpoint definition
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Aug 1, 2024
1 parent 0da2e28 commit b0f30ce
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 209 deletions.
22 changes: 3 additions & 19 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,31 +72,15 @@ impl Cmd {
#[derive(Subcommand)]
enum NodeCmd {
Run(node::CmdRun),
Ping(node::control::CmdPing),
FindArchive(node::control::CmdFindArchive),
DumpArchive(node::control::CmdDumpArchive),
DumpBlock(node::control::CmdDumpBlock),
DumpProof(node::control::CmdDumpProof),
GcArchives(node::control::CmdGcArchives),
GcBlocks(node::control::CmdGcBlocks),
GcStates(node::control::CmdGcStates),
#[clap(subcommand)]
MemProfiler(node::control::CmdMemProfiler),
#[clap(flatten)]
Control(node::CmdControl),
}

impl NodeCmd {
fn run(self) -> Result<()> {
match self {
Self::Run(cmd) => cmd.run(),
Self::Ping(cmd) => cmd.run(),
Self::FindArchive(cmd) => cmd.run(),
Self::DumpArchive(cmd) => cmd.run(),
Self::DumpBlock(cmd) => cmd.run(),
Self::DumpProof(cmd) => cmd.run(),
Self::GcArchives(cmd) => cmd.run(),
Self::GcBlocks(cmd) => cmd.run(),
Self::GcStates(cmd) => cmd.run(),
Self::MemProfiler(cmd) => cmd.run(),
Self::Control(cmd) => cmd.run(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cli/src/node/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use everscale_types::cell::HashBytes;
use serde::{Deserialize, Serialize};
use tycho_collator::types::CollationConfig;
use tycho_collator::validator::ValidatorStdImplConfig;
use tycho_control::ControlServerStdImplConfig;
use tycho_control::ControlServerConfig;
use tycho_core::block_strider::BlockchainBlockProviderConfig;
use tycho_core::blockchain_rpc::BlockchainRpcServiceConfig;
use tycho_core::overlay_client::PublicOverlayClientConfig;
Expand Down Expand Up @@ -69,7 +69,7 @@ pub struct NodeConfig {

pub rpc: Option<RpcConfig>,

pub control: Option<ControlServerStdImplConfig>,
pub control: Option<ControlServerConfig>,

pub metrics: Option<MetricsConfig>,

Expand Down
30 changes: 30 additions & 0 deletions cli/src/node/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,36 @@ use tycho_util::futures::JoinTask;

use crate::util::signal;

#[derive(Subcommand)]
pub enum CmdControl {
Ping(CmdPing),
FindArchive(CmdFindArchive),
DumpArchive(CmdDumpArchive),
DumpBlock(CmdDumpBlock),
DumpProof(CmdDumpProof),
GcArchives(CmdGcArchives),
GcBlocks(CmdGcBlocks),
GcStates(CmdGcStates),
#[clap(subcommand)]
MemProfiler(CmdMemProfiler),
}

impl CmdControl {
pub fn run(self) -> Result<()> {
match self {
Self::Ping(cmd) => cmd.run(),
Self::FindArchive(cmd) => cmd.run(),
Self::DumpArchive(cmd) => cmd.run(),
Self::DumpBlock(cmd) => cmd.run(),
Self::DumpProof(cmd) => cmd.run(),
Self::GcArchives(cmd) => cmd.run(),
Self::GcBlocks(cmd) => cmd.run(),
Self::GcStates(cmd) => cmd.run(),
Self::MemProfiler(cmd) => cmd.run(),
}
}
}

/// Ping the control server.
#[derive(Parser)]
pub struct CmdPing {
Expand Down
13 changes: 7 additions & 6 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tycho_collator::types::CollationConfig;
use tycho_collator::validator::{
Validator, ValidatorNetworkContext, ValidatorStdImpl, ValidatorStdImplConfig,
};
use tycho_control::{ControlEndpoint, ControlServerStdImpl, ControlServerStdImplConfig};
use tycho_control::{ControlEndpoint, ControlServer, ControlServerConfig};
use tycho_core::block_strider::{
ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockSubscriberExt,
BlockchainBlockProvider, BlockchainBlockProviderConfig, FileZerostateProvider, GcSubscriber,
Expand All @@ -44,13 +44,14 @@ use tycho_util::cli::{LoggerConfig, LoggerTargets};
use tycho_util::futures::JoinTask;

use self::config::{MetricsConfig, NodeConfig, NodeKeys};
pub use self::control::CmdControl;
#[cfg(feature = "jemalloc")]
use crate::util::alloc::{spawn_allocator_metrics_loop, JemallocMemoryProfiler};
use crate::util::error::ResultExt;
use crate::util::signal;

mod config;
pub mod control;
mod control;

const SERVICE_NAME: &str = "tycho-node";

Expand Down Expand Up @@ -292,7 +293,7 @@ pub struct Node {
state_tracker: MinRefMcStateTracker,

rpc_config: Option<RpcConfig>,
control_config: Option<ControlServerStdImplConfig>,
control_config: Option<ControlServerConfig>,
blockchain_block_provider_config: BlockchainBlockProviderConfig,

collation_config: CollationConfig,
Expand Down Expand Up @@ -557,10 +558,10 @@ impl Node {
let gc_subscriber = GcSubscriber::new(self.storage.clone());

// Create RPC
// NOTE: This variable is used as a guard to abort the server future on drop.
let _control_state = if let Some(config) = &self.control_config {
// TODO: Add memory profiler
let server = {
let mut builder = ControlServerStdImpl::builder()
let mut builder = ControlServer::builder()
.with_gc_subscriber(gc_subscriber.clone())
.with_storage(self.storage.clone());

Expand All @@ -572,7 +573,7 @@ impl Node {
builder.build()
};

let endpoint = ControlEndpoint::bind(&config.socket_path, server)
let endpoint = ControlEndpoint::bind(config, server)
.await
.wrap_err("failed to setup control server endpoint")?;

Expand Down
2 changes: 1 addition & 1 deletion control/src/client/mod.rs → control/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tycho_core::block_strider::ManualGcTrigger;
use tycho_util::futures::JoinTask;

use crate::error::{ClientError, ClientResult};
use crate::server::*;
use crate::proto::*;

pub struct ControlClient {
inner: ControlServerClient,
Expand Down
98 changes: 4 additions & 94 deletions control/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,103 +1,13 @@
use std::future::Future;
use std::path::Path;

use futures_util::future::BoxFuture;
use futures_util::{FutureExt, StreamExt};

pub use self::client::ControlClient;
pub use self::error::{ClientError, ServerResult};
pub use self::server::impls::*;
pub use self::server::{
ArchiveInfo, ArchiveInfoRequest, ArchiveInfoResponse, ArchiveSliceRequest,
ArchiveSliceResponse, BlockProofRequest, BlockProofResponse, BlockRequest, BlockResponse,
ControlServer,
};

pub type Context = tarpc::context::Context;
pub use self::profiler::{MemoryProfiler, StubMemoryProfiler};
pub use self::server::{ControlEndpoint, ControlServer, ControlServerBuilder, ControlServerConfig};

mod client;
mod error;
mod profiler;
mod proto;
mod server;

// TODO: Change the path to a more general setup.
pub const DEFAULT_SOCKET_PATH: &str = "/var/venom/data/tycho.sock";

pub struct ControlEndpoint {
inner: BoxFuture<'static, ()>,
}

impl ControlEndpoint {
pub async fn bind<P, S>(path: P, server: S) -> std::io::Result<Self>
where
P: AsRef<Path>,
S: ControlServerExt,
{
use tarpc::tokio_serde::formats::Bincode;

let mut listener = tarpc::serde_transport::unix::listen(path, Bincode::default).await?;
listener.config_mut().max_frame_length(usize::MAX);

let inner = listener
// Ignore accept errors.
.filter_map(|r| futures_util::future::ready(r.ok()))
.map(tarpc::server::BaseChannel::with_defaults)
.map(move |channel| server.clone().execute_all(channel))
// Max 1 channel.
.buffer_unordered(1)
.for_each(|_| async {})
.boxed();

Ok(Self { inner })
}

pub async fn serve(self) {
self.inner.await;
}
}

// FIXME: Remove when https://github.com/google/tarpc/pull/448 is merged.
#[macro_export]
macro_rules! impl_serve {
($ident:ident) => {
impl $crate::ControlServerExt for $ident {
fn execute_all<T>(
self,
channel: $crate::__internal::RawChannel<T>,
) -> impl ::std::future::Future<Output = ()> + Send
where
T: tarpc::Transport<
$crate::__internal::tarpc::Response<$crate::__internal::RawResponse>,
$crate::__internal::tarpc::ClientMessage<$crate::__internal::RawRequest>,
> + Send
+ 'static,
{
use $crate::__internal::futures_util::{future, StreamExt};
use $crate::__internal::tarpc::server::Channel;

channel.execute(self.serve()).for_each(|t| {
$crate::__internal::tokio::spawn(t);
future::ready(())
})
}
}
};
}

pub trait ControlServerExt: ControlServer + Clone + Send + 'static {
fn execute_all<T>(self, channel: __internal::RawChannel<T>) -> impl Future<Output = ()> + Send
where
T: tarpc::Transport<
tarpc::Response<__internal::RawResponse>,
tarpc::ClientMessage<__internal::RawRequest>,
> + Send
+ 'static;
}

#[doc(hidden)]
pub mod __internal {
pub use {futures_util, tarpc, tokio};

pub type RawChannel<T> = tarpc::server::BaseChannel<RawRequest, RawResponse, T>;
pub type RawRequest = crate::server::ControlServerRequest;
pub type RawResponse = crate::server::ControlServerResponse;
}
34 changes: 34 additions & 0 deletions control/src/profiler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::sync::Arc;

use anyhow::Result;

#[async_trait::async_trait]
pub trait MemoryProfiler: Send + Sync + 'static {
async fn set_enabled(&self, enabled: bool) -> bool;
async fn dump(&self) -> Result<Vec<u8>>;
}

#[async_trait::async_trait]
impl<T: MemoryProfiler> MemoryProfiler for Arc<T> {
async fn set_enabled(&self, enabled: bool) -> bool {
T::set_enabled(self, enabled).await
}

async fn dump(&self) -> Result<Vec<u8>> {
T::dump(self).await
}
}

#[derive(Debug, Clone, Copy)]
pub struct StubMemoryProfiler;

#[async_trait::async_trait]
impl MemoryProfiler for StubMemoryProfiler {
async fn set_enabled(&self, _: bool) -> bool {
false
}

async fn dump(&self) -> Result<Vec<u8>> {
anyhow::bail!("stub memory profiler does not support dumping data")
}
}
9 changes: 0 additions & 9 deletions control/src/server/mod.rs → control/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,6 @@ use tycho_core::block_strider::ManualGcTrigger;

use crate::error::ServerResult;

pub mod impls {
pub use self::std_impl::{
ControlServerStdBuilder, ControlServerStdImpl, ControlServerStdImplConfig, MemoryProfiler,
StubMemoryProfiler,
};

mod std_impl;
}

#[tarpc::service]
pub trait ControlServer {
/// Ping a node. Returns node timestamp in milliseconds.
Expand Down
Loading

0 comments on commit b0f30ce

Please sign in to comment.