From 83bbe03c2e34cf513a089375f05e06412ab9619a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Terenti=C4=87?= Date: Wed, 28 Jun 2023 12:43:32 +0200 Subject: [PATCH] Create task struct for light client. --- src/light_client.rs | 94 +++++++++++++++++++--------------------- src/main.rs | 14 +++--- src/network/mem_store.rs | 2 + 3 files changed, 53 insertions(+), 57 deletions(-) diff --git a/src/light_client.rs b/src/light_client.rs index 60d8fef60..49dae6870 100644 --- a/src/light_client.rs +++ b/src/light_client.rs @@ -376,63 +376,57 @@ pub async fn process_block( Ok(()) } -/// Runs light client. -/// -/// # Arguments -/// -/// * `light_client` - Light client implementation -/// * `cfg` - Light client configuration -/// * `block_tx` - Channel used to send header of verified block -/// * `pp` - Public parameters (i.e. SRS) needed for proof verification -/// * `registry` - Prometheus metrics registry -/// * `counter` - Processed block mutex counter -pub async fn run( - light_client: impl LightClient, - cfg: LightClientConfig, - block_tx: Option>, - pp: PublicParameters, - metrics: Metrics, - counter: Arc>, - mut message_rx: Receiver<(Header, Instant)>, - error_sender: Sender, -) { - info!("Starting light client..."); - - while let Some((header, received_at)) = message_rx.recv().await { - if let Some(seconds) = cfg.block_processing_delay.sleep_duration(received_at) { - info!("Sleeping for {seconds:?} seconds"); - tokio::time::sleep(seconds).await; - } +pub struct Task { + pub light_client: T, + pub cfg: LightClientConfig, + pub block_tx: Option>, + pub pp: PublicParameters, + pub metrics: Metrics, + pub counter: Arc>, + pub message_rx: Receiver<(Header, Instant)>, + pub error_sender: Sender, +} - if let Err(error) = process_block( - &light_client, - &cfg, - &pp, - &header, - received_at, - &metrics, - counter.clone(), - ) - .await - { - error!("Cannot process block: {error}"); - if let Err(error) = error_sender.send(error).await { - error!("Cannot send error message: {error}"); +impl Task { + pub async fn run(mut self) { + info!("Starting light client..."); + + while let Some((header, received_at)) = self.message_rx.recv().await { + if let Some(seconds) = self.cfg.block_processing_delay.sleep_duration(received_at) { + info!("Sleeping for {seconds:?} seconds"); + tokio::time::sleep(seconds).await; + } + + if let Err(error) = process_block( + &self.light_client, + &self.cfg, + &self.pp, + &header, + received_at, + &self.metrics, + self.counter.clone(), + ) + .await + { + error!("Cannot process block: {error}"); + if let Err(error) = self.error_sender.send(error).await { + error!("Cannot send error message: {error}"); + } + return; } - return; - } - let Ok(client_msg) = types::BlockVerified::try_from(header) else { + let Ok(client_msg) = types::BlockVerified::try_from(header) else { error!("Cannot create message from header"); continue; }; - // notify dht-based application client - // that newly mined block has been received - if let Some(ref channel) = block_tx { - if let Err(error) = channel.send(client_msg).await { - error!("Cannot send block verified message: {error}"); - continue; + // notify dht-based application client + // that newly mined block has been received + if let Some(ref channel) = self.block_tx { + if let Err(error) = channel.send(client_msg).await { + error!("Cannot send block verified message: {error}"); + continue; + } } } } diff --git a/src/main.rs b/src/main.rs index 4cf108b92..ddbb90d49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -314,18 +314,18 @@ async fn run(error_sender: Sender) -> Result<()> { error_sender.clone(), )); - let light_client = light_client::new(db, network_client, rpc_client); - - tokio::task::spawn(light_client::run( - light_client, - (&cfg).into(), + let light_client_task = light_client::Task { + light_client: light_client::new(db, network_client, rpc_client), + cfg: (&cfg).into(), block_tx, pp, - lc_metrics, + metrics: lc_metrics, counter, message_rx, error_sender, - )); + }; + + tokio::task::spawn(light_client_task.run()); Ok(()) } diff --git a/src/network/mem_store.rs b/src/network/mem_store.rs index 15b955085..725d6afa6 100644 --- a/src/network/mem_store.rs +++ b/src/network/mem_store.rs @@ -72,6 +72,7 @@ impl Default for MemoryStoreConfig { } impl MemoryStore { + #[allow(dead_code)] /// Creates a new `MemoryRecordStore` with a default configuration. pub fn new(local_id: PeerId) -> Self { Self::with_config(local_id, Default::default()) @@ -88,6 +89,7 @@ impl MemoryStore { } } + #[allow(dead_code)] /// Retains the records satisfying a predicate. pub fn retain(&mut self, f: F) where