Skip to content

Commit

Permalink
Support both cells and rows crawling.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Jul 25, 2023
1 parent 1cde5c4 commit 79622c9
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
3. **Fat-Client Mode**: The client retrieves larger contiguous chunks of the matrix on each block via RPC calls to an Avail node, and stores them on the DHT. This mode is activated when the `block_matrix_partition` parameter is set in the config file, and is mainly used with the `disable_proof_verification` flag because of the resource cost of cell validation.
**IMPORTANT**: disabling proof verification introduces a trust assumption towards the node, that the data provided is correct.

4. **Crawl-Client Mode**: The client crawls cells from DHT for entire block, and calculates success rate. Every block processing is delayed by `crawl_block_delay` parameter. Delay should be enough so crawling of large block can be compensated. This mode is enabled by setting `crawl_block` parameter to `true`. Success rate is emmited in logs and metrics.
4. **Crawl-Client Mode**: The client crawls cells from DHT for entire block, and calculates success rate. Every block processing is delayed by `crawl_block_delay` parameter. Delay should be enough so crawling of large block can be compensated. This mode is enabled by setting `crawl_block` parameter to `true`. Success rate is emmited in logs and metrics. Crawler can be run in three modes: `cells`, `rows` and `both`. Default mode is `cells`, and it can be configured by `crawl_block_mode` parameter.

## Installation

Expand Down
59 changes: 42 additions & 17 deletions src/crawl_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
network::Client,
telemetry::metrics::{MetricEvent, Metrics},
types::{self, delay_for},
types::{self, delay_for, CrawlMode},
};
use avail_subxt::primitives::Header;
use kate_recovery::matrix::Partition;
Expand All @@ -19,6 +19,7 @@ pub async fn run(
network_client: Client,
delay: u64,
metrics: Metrics,
mode: CrawlMode,
) {
info!("Starting crawl client...");

Expand All @@ -28,7 +29,7 @@ pub async fn run(
tokio::time::sleep(seconds).await;
}
let block_number = header.number;
info!("Crawling block {block_number}...");
info!(block_number, "Crawling block...");

let start = SystemTime::now();

Expand All @@ -37,27 +38,51 @@ pub async fn run(
continue;
};

let positions = block
.dimensions
.iter_extended_partition_positions(&ENTIRE_BLOCK)
.collect::<Vec<_>>();
if mode == CrawlMode::Cells || mode == CrawlMode::Both {
let positions = block
.dimensions
.iter_extended_partition_positions(&ENTIRE_BLOCK)
.collect::<Vec<_>>();

let total = positions.len();
let (fetched, _) = network_client
.fetch_cells_from_dht(block_number, &positions)
.await;
let total = positions.len();
let (fetched, _) = network_client
.fetch_cells_from_dht(block_number, &positions)
.await;

let success_rate = fetched.len() as f64 / total as f64;
info!(
block_number,
"Fetched {fetched} cells of {total}, success rate: {success_rate}",
fetched = fetched.len(),
);
metrics.record(MetricEvent::CrawlCellsSuccessRate(success_rate));
}

if mode == CrawlMode::Rows || mode == CrawlMode::Both {
let dimensions = block.dimensions;
let rows: Vec<u32> = (0..dimensions.extended_rows()).step_by(2).collect();
let total = rows.len();
let fetched = network_client
.fetch_rows_from_dht(block_number, &dimensions, &rows)
.await
.iter()
.step_by(2)
.flatten()
.count();

let success_rate = fetched as f64 / total as f64;
info!(
block_number,
"Fetched {fetched} rows of {total}, success rate: {success_rate}"
);
metrics.record(MetricEvent::CrawlRowsSuccessRate(success_rate));
}

let elapsed = start
.elapsed()
.map(|elapsed| format!("{elapsed:?}"))
.unwrap_or("unknown".to_string());

let success_rate = fetched.len() as f64 / total as f64;
info!(
"Block {block_number}, fetched {fetched} of {total}, success rate: {success_rate}, elapsed: {elapsed}",
fetched = fetched.len(),
);

metrics.record(MetricEvent::CrawlCellsSuccessRate(success_rate));
info!(block_number, "Crawling block finished in {elapsed}")
}
}
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,14 @@ async fn run(error_sender: Sender<anyhow::Error>) -> Result<()> {

if cfg.crawl_block {
let delay = cfg.crawl_block_delay;
let mode = cfg.crawl_block_mode.clone();
let message_rx = message_tx.subscribe();
tokio::task::spawn(crawl_client::run(
message_rx,
network_client.clone(),
delay,
lc_metrics.clone(),
mode,
));
}

Expand Down
13 changes: 13 additions & 0 deletions src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct Metrics {
dht_put_rows_success: Gauge<f64, AtomicU64>,
kad_routing_table_peer_num: Gauge,
crawl_cells_success_rate: Gauge<f64, AtomicU64>,
crawl_rows_success_rate: Gauge<f64, AtomicU64>,
}

pub enum MetricEvent {
Expand All @@ -37,6 +38,7 @@ pub enum MetricEvent {
DHTPutRowsSuccess(f64),
KadRoutingTablePeerNum(u32),
CrawlCellsSuccessRate(f64),
CrawlRowsSuccessRate(f64),
}

impl Metrics {
Expand Down Expand Up @@ -133,6 +135,13 @@ impl Metrics {
crawl_cells_success_rate.clone(),
);

let crawl_rows_success_rate = Gauge::default();
sub_reg.register(
"crawl_rows_sucess_rate",
"Success rate of the crawling DHT rows operation",
crawl_rows_success_rate.clone(),
);

Self {
session_block_counter,
total_block_number,
Expand All @@ -147,6 +156,7 @@ impl Metrics {
dht_put_rows_success,
kad_routing_table_peer_num,
crawl_cells_success_rate,
crawl_rows_success_rate,
}
}

Expand Down Expand Up @@ -191,6 +201,9 @@ impl Metrics {
MetricEvent::CrawlCellsSuccessRate(num) => {
self.crawl_cells_success_rate.set(num.into());
},
MetricEvent::CrawlRowsSuccessRate(num) => {
self.crawl_rows_success_rate.set(num.into());
},
}
}
}
11 changes: 11 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ pub struct RuntimeConfig {
pub crawl_block: bool,
/// Crawl block delay. Increment to ensure large block crawling (default: 20)
pub crawl_block_delay: u64,
/// Crawl block mode. Available modes are "cells", "rows" and "both" (default: "cells")
pub crawl_block_mode: CrawlMode,
}

pub struct Delay(Option<Duration>);
Expand Down Expand Up @@ -487,6 +489,14 @@ impl From<&RuntimeConfig> for AppClientConfig {
}
}

#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
#[serde(rename_all = "kebab-case")]
pub enum CrawlMode {
Rows,
Cells,
Both,
}

impl Default for RuntimeConfig {
fn default() -> Self {
RuntimeConfig {
Expand Down Expand Up @@ -535,6 +545,7 @@ impl Default for RuntimeConfig {
max_kad_provided_keys: 1024,
crawl_block: false,
crawl_block_delay: 20,
crawl_block_mode: CrawlMode::Cells,
}
}
}
Expand Down

0 comments on commit 79622c9

Please sign in to comment.