Skip to content

Commit

Permalink
Move fat client to separate binary
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Aug 16, 2024
1 parent 56ba388 commit 13ac594
Show file tree
Hide file tree
Showing 18 changed files with 485 additions and 378 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
- [`relay` CHANGELOG](relay/CHANGELOG.md)
- [`client` CHANGELOG](client/CHANGELOG.md)
- [`crawler` CHANGELOG](crawler/CHANGELOG.md)
- [`fat` CHANGELOG](fat/CHANGELOG.md)
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"compatibility-tests",
"core",
"crawler",
"fat",
"relay",
]
default-members = ["client"]
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ The main components of this repository are structured as follows:
- `core/`: This library forms the core of Avail Light Client, providing essential clients and logic for the system.
- `compatibility-test/`: This folder contains compatibility tests.
- `crawler/` : Diagnostic tool that crawls Light Client's P2P network.
- `fat/` : Pushes grid data to the P2P network to ensure high hit rates.
- `relay/`: Implements all necessary components to utilize the Circuit Relay transport protocol.
6 changes: 1 addition & 5 deletions client/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use avail_light_core::{network::Network, types::block_matrix_partition_format};
use avail_light_core::network::Network;
use clap::{command, Parser};
use kate_recovery::matrix::Partition;
use tracing::Level;

#[derive(Parser)]
Expand Down Expand Up @@ -51,9 +50,6 @@ pub struct CliOpts {
/// ed25519 private key for libp2p keypair generation
#[arg(long)]
pub private_key: Option<String>,
/// fraction and number of the block matrix part to fetch (e.g. 2/20 means second 1/20 part of a matrix) (default: None)
#[arg(long, value_parser = block_matrix_partition_format::parse)]
pub block_matrix_partition: Option<Partition>,
/// Set logs format to JSON
#[arg(long)]
pub logs_json: bool,
Expand Down
264 changes: 19 additions & 245 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use avail_light_core::{
shutdown::Controller,
telemetry::{self, MetricCounter, Metrics},
types::{
load_or_init_suri, IdentityConfig, KademliaMode, MaintenanceConfig, MultiaddrConfig,
RuntimeConfig, SecretKey, Uuid,
load_or_init_suri, IdentityConfig, MaintenanceConfig, MultiaddrConfig, RuntimeConfig,
SecretKey, Uuid,
},
utils::{default_subscriber, install_panic_hooks, json_subscriber, spawn_in_span},
};
Expand All @@ -30,7 +30,6 @@ use color_eyre::{
Result,
};
use kate_recovery::com::AppData;
use kate_recovery::matrix::Partition;
use std::{fs, path::Path, sync::Arc};
use tokio::sync::{broadcast, mpsc};
use tracing::trace;
Expand Down Expand Up @@ -71,12 +70,6 @@ async fn run(
("role", "lightnode".to_string()),
("peerID", peer_id.to_string()),
("avail_address", identity_cfg.avail_public_key.clone()),
(
"partition_size",
cfg.block_matrix_partition
.map(|Partition { number, fraction }| format!("{number}/{fraction}"))
.unwrap_or("n/a".to_string()),
),
("network", Network::name(&cfg.genesis_hash)),
("client_id", client_id.to_string()),
("execution_id", execution_id.to_string()),
Expand Down Expand Up @@ -104,7 +97,7 @@ async fn run(
version,
&cfg.genesis_hash,
&id_keys,
cfg.is_fat_client(),
false,
shutdown.clone(),
#[cfg(feature = "kademlia-rocksdb")]
db.inner(),
Expand Down Expand Up @@ -305,221 +298,17 @@ async fn run(
rpc_event_receiver: client_rpc_event_receiver,
};

if let Some(partition) = cfg.block_matrix_partition {
let fat_client = avail_light_core::fat_client::new(p2p_client.clone(), rpc_client.clone());

spawn_in_span(shutdown.with_cancel(avail_light_core::fat_client::run(
fat_client,
db.clone(),
(&cfg).into(),
ot_metrics.clone(),
channels,
partition,
shutdown.clone(),
)));
} else {
let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc);

spawn_in_span(shutdown.with_cancel(avail_light_core::light_client::run(
db.clone(),
light_network_client,
(&cfg).into(),
ot_metrics.clone(),
channels,
shutdown.clone(),
)));
}

ot_metrics.count(MetricCounter::Starts).await;

Ok(())
}

async fn run_fat(
cfg: RuntimeConfig,
identity_cfg: IdentityConfig,
db: RocksDB,
shutdown: Controller<String>,
client_id: Uuid,
execution_id: Uuid,
) -> Result<()> {
info!("Fat client mode");

let version = clap::crate_version!();
info!("Running Avail Light Fat Client version: {version}.");
info!("Using config: {cfg:?}");

let (id_keys, peer_id) = p2p::identity(&cfg.libp2p, db.clone())?;

let metric_attributes = vec![
("version", version.to_string()),
("role", "fatnode".to_string()),
("peerID", peer_id.to_string()),
("avail_address", identity_cfg.avail_public_key.clone()),
(
"partition_size",
cfg.block_matrix_partition
.map(|Partition { number, fraction }| format!("{number}/{fraction}"))
.unwrap_or("n/a".to_string()),
),
("network", Network::name(&cfg.genesis_hash)),
("client_id", client_id.to_string()),
("execution_id", execution_id.to_string()),
(
"client_alias",
cfg.client_alias.clone().unwrap_or("".to_string()),
),
];

let ot_metrics = Arc::new(
telemetry::otlp::initialize(
metric_attributes,
&cfg.origin,
&KademliaMode::Client.into(),
cfg.otel.clone(),
)
.wrap_err("Unable to initialize OpenTelemetry service")?,
);

// Create sender channel for P2P event loop commands
let (p2p_event_loop_sender, p2p_event_loop_receiver) = mpsc::unbounded_channel();
let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc);

let p2p_event_loop = p2p::EventLoop::new(
cfg.libp2p.clone(),
version,
&cfg.genesis_hash,
&id_keys,
cfg.is_fat_client(),
shutdown.clone(),
#[cfg(feature = "kademlia-rocksdb")]
db.inner(),
);

spawn_in_span(
shutdown.with_cancel(
p2p_event_loop
.await
.run(ot_metrics.clone(), p2p_event_loop_receiver),
),
);

let p2p_client = p2p::Client::new(
p2p_event_loop_sender,
cfg.libp2p.dht_parallelization_limit,
cfg.libp2p.kademlia.kad_record_ttl,
);

// Start listening on provided port
p2p_client
.start_listening(cfg.libp2p.multiaddress())
.await
.wrap_err("Listening on TCP not to fail.")?;
info!("TCP listener started on port {}", cfg.libp2p.port);

let p2p_clone = p2p_client.to_owned();
let cfg_clone = cfg.to_owned();
spawn_in_span(shutdown.with_cancel(async move {
info!("Bootstraping the DHT with bootstrap nodes...");
let bs_result = p2p_clone
.bootstrap_on_startup(&cfg_clone.libp2p.bootstraps)
.await;
match bs_result {
Ok(_) => {
info!("Bootstrap done.");
},
Err(e) => {
warn!("Bootstrap process: {e:?}.");
},
}
}));

let (rpc_client, rpc_events, rpc_subscriptions) =
rpc::init(db.clone(), &cfg.genesis_hash, &cfg.rpc, shutdown.clone()).await?;

// Subscribing to RPC events before first event is published
let first_header_rpc_event_receiver = rpc_events.subscribe();
let client_rpc_event_receiver = rpc_events.subscribe();

// spawn the RPC Network task for Event Loop to run in the background
// and shut it down, without delays
let rpc_subscriptions_handle = spawn_in_span(shutdown.with_cancel(shutdown.with_trigger(
"Subscription loop failure triggered shutdown".to_string(),
async {
let result = rpc_subscriptions.run().await;
if let Err(ref err) = result {
error!(%err, "Subscription loop ended with error");
};
result
},
)));

info!("Waiting for first finalized header...");
let block_header = match shutdown
.with_cancel(rpc::wait_for_finalized_header(
first_header_rpc_event_receiver,
360,
))
.await
.map_err(|shutdown_reason| eyre!(shutdown_reason))
.and_then(|inner| inner)
{
Err(report) => {
if !rpc_subscriptions_handle.is_finished() {
return Err(report);
}
let Ok(Ok(Err(subscriptions_error))) = rpc_subscriptions_handle.await else {
return Err(report);
};
return Err(eyre!(subscriptions_error));
},
Ok(num) => num,
};

db.put(LatestHeaderKey, block_header.number);

let (block_tx, block_rx) = broadcast::channel::<avail_light_core::types::BlockVerified>(1 << 7);

if cfg.sync_finality_enable {
let sync_finality = SyncFinality::new(db.clone(), rpc_client.clone());
spawn_in_span(shutdown.with_cancel(avail_light_core::sync_finality::run(
sync_finality,
shutdown.clone(),
block_header.clone(),
)));
} else {
warn!("Finality sync is disabled! Implicitly, blocks before LC startup will be considered verified as final");
// set the flag in the db, signaling across that we don't need to sync
db.put(IsFinalitySyncedKey, true);
}

let static_config_params: MaintenanceConfig = (&cfg).into();
spawn_in_span(shutdown.with_cancel(avail_light_core::maintenance::run(
p2p_client.clone(),
spawn_in_span(shutdown.with_cancel(avail_light_core::light_client::run(
db.clone(),
light_network_client,
(&cfg).into(),
ot_metrics.clone(),
block_rx,
static_config_params,
channels,
shutdown.clone(),
)));

let channels = avail_light_core::types::ClientChannels {
block_sender: block_tx,
rpc_event_receiver: client_rpc_event_receiver,
};

if let Some(partition) = cfg.block_matrix_partition {
let fat_client = avail_light_core::fat_client::new(p2p_client.clone(), rpc_client.clone());

spawn_in_span(shutdown.with_cancel(avail_light_core::fat_client::run(
fat_client,
db.clone(),
(&cfg).into(),
ot_metrics.clone(),
channels,
partition,
shutdown.clone(),
)));
}

ot_metrics.count(MetricCounter::Starts).await;

Ok(())
Expand Down Expand Up @@ -572,10 +361,6 @@ pub fn load_runtime_config(opts: &CliOpts) -> Result<RuntimeConfig> {
})
}

if let Some(partition) = &opts.block_matrix_partition {
cfg.block_matrix_partition = Some(*partition)
}

if let Some(client_alias) = &opts.client_alias {
cfg.client_alias = Some(client_alias.clone())
}
Expand Down Expand Up @@ -640,27 +425,16 @@ pub async fn main() -> Result<()> {
// spawn a task to watch for ctrl-c signals from user to trigger the shutdown
spawn_in_span(shutdown.on_user_signal("User signaled shutdown".to_string()));

if let Err(error) = if cfg.is_fat_client() {
run_fat(
cfg,
identity_cfg,
db,
shutdown.clone(),
client_id,
execution_id,
)
.await
} else {
run(
cfg,
identity_cfg,
db,
shutdown.clone(),
client_id,
execution_id,
)
.await
} {
if let Err(error) = run(
cfg,
identity_cfg,
db,
shutdown.clone(),
client_id,
execution_id,
)
.await
{
error!("{error:#}");
return Err(error.wrap_err("Starting Light Client failed"));
};
Expand Down
Loading

0 comments on commit 13ac594

Please sign in to comment.