Skip to content

Commit

Permalink
Create task struct for light client.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Jun 29, 2023
1 parent 41e10b8 commit 83bbe03
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 57 deletions.
94 changes: 44 additions & 50 deletions src/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,63 +376,57 @@ pub async fn process_block(
Ok(())
}

/// Runs light client.
///
/// # Arguments
///
/// * `light_client` - Light client implementation
/// * `cfg` - Light client configuration
/// * `block_tx` - Channel used to send header of verified block
/// * `pp` - Public parameters (i.e. SRS) needed for proof verification
/// * `registry` - Prometheus metrics registry
/// * `counter` - Processed block mutex counter
pub async fn run(
light_client: impl LightClient,
cfg: LightClientConfig,
block_tx: Option<Sender<BlockVerified>>,
pp: PublicParameters,
metrics: Metrics,
counter: Arc<Mutex<u32>>,
mut message_rx: Receiver<(Header, Instant)>,
error_sender: Sender<anyhow::Error>,
) {
info!("Starting light client...");

while let Some((header, received_at)) = message_rx.recv().await {
if let Some(seconds) = cfg.block_processing_delay.sleep_duration(received_at) {
info!("Sleeping for {seconds:?} seconds");
tokio::time::sleep(seconds).await;
}
pub struct Task<T: LightClient> {
pub light_client: T,
pub cfg: LightClientConfig,
pub block_tx: Option<Sender<BlockVerified>>,
pub pp: PublicParameters,
pub metrics: Metrics,
pub counter: Arc<Mutex<u32>>,
pub message_rx: Receiver<(Header, Instant)>,
pub error_sender: Sender<anyhow::Error>,
}

if let Err(error) = process_block(
&light_client,
&cfg,
&pp,
&header,
received_at,
&metrics,
counter.clone(),
)
.await
{
error!("Cannot process block: {error}");
if let Err(error) = error_sender.send(error).await {
error!("Cannot send error message: {error}");
impl<T: LightClient> Task<T> {
pub async fn run(mut self) {
info!("Starting light client...");

while let Some((header, received_at)) = self.message_rx.recv().await {
if let Some(seconds) = self.cfg.block_processing_delay.sleep_duration(received_at) {
info!("Sleeping for {seconds:?} seconds");
tokio::time::sleep(seconds).await;
}

if let Err(error) = process_block(
&self.light_client,
&self.cfg,
&self.pp,
&header,
received_at,
&self.metrics,
self.counter.clone(),
)
.await
{
error!("Cannot process block: {error}");
if let Err(error) = self.error_sender.send(error).await {
error!("Cannot send error message: {error}");
}
return;
}
return;
}

let Ok(client_msg) = types::BlockVerified::try_from(header) else {
let Ok(client_msg) = types::BlockVerified::try_from(header) else {
error!("Cannot create message from header");
continue;
};

// notify dht-based application client
// that newly mined block has been received
if let Some(ref channel) = block_tx {
if let Err(error) = channel.send(client_msg).await {
error!("Cannot send block verified message: {error}");
continue;
// notify dht-based application client
// that newly mined block has been received
if let Some(ref channel) = self.block_tx {
if let Err(error) = channel.send(client_msg).await {
error!("Cannot send block verified message: {error}");
continue;
}
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,18 +314,18 @@ async fn run(error_sender: Sender<anyhow::Error>) -> Result<()> {
error_sender.clone(),
));

let light_client = light_client::new(db, network_client, rpc_client);

tokio::task::spawn(light_client::run(
light_client,
(&cfg).into(),
let light_client_task = light_client::Task {
light_client: light_client::new(db, network_client, rpc_client),
cfg: (&cfg).into(),
block_tx,
pp,
lc_metrics,
metrics: lc_metrics,
counter,
message_rx,
error_sender,
));
};

tokio::task::spawn(light_client_task.run());
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions src/network/mem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl Default for MemoryStoreConfig {
}

impl MemoryStore {
#[allow(dead_code)]
/// Creates a new `MemoryRecordStore` with a default configuration.
pub fn new(local_id: PeerId) -> Self {
Self::with_config(local_id, Default::default())
Expand All @@ -88,6 +89,7 @@ impl MemoryStore {
}
}

#[allow(dead_code)]
/// Retains the records satisfying a predicate.
pub fn retain<F>(&mut self, f: F)
where
Expand Down

0 comments on commit 83bbe03

Please sign in to comment.