From a43c4a6fe8ba1780a522131714b5051c0b359ac1 Mon Sep 17 00:00:00 2001 From: Adam Wierzbicki Date: Mon, 2 Oct 2023 17:12:42 +0200 Subject: [PATCH] Dataset summary printing --- Cargo.lock | 44 ++++++++++++ crates/query-gateway/Cargo.toml | 3 + crates/query-gateway/src/client.rs | 84 ++++++++++++++++------ crates/query-gateway/src/main.rs | 4 ++ crates/router-controller/src/controller.rs | 10 ++- 5 files changed, 123 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ad0db60..3679e23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1230,6 +1230,12 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3ac9f8b63eca6fd385229b3675f6cc0dc5c8a5c8a54a59d4f52ffd670d87b0c" +[[package]] +name = "bytecount" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad152d03a2c813c80bb94fedbf3a3f02b28f793e39e7c214c8a0bcc196343de7" + [[package]] name = "byteorder" version = "1.4.3" @@ -4284,6 +4290,17 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "papergrid" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2ccbe15f2b6db62f9a9871642746427e297b0ceb85f9a7f1ee5ff47d184d0c8" +dependencies = [ + "bytecount", + "fnv", + "unicode-width", +] + [[package]] name = "parity-scale-codec" version = "3.6.5" @@ -4850,6 +4867,7 @@ dependencies = [ "duration-string", "env_logger", "flate2", + "futures", "log", "prost 0.11.9", "rand", @@ -4857,7 +4875,9 @@ dependencies = [ "serde", "serde_yaml", "subsquid-network-transport", + "tabled", "tokio", + "tokio-stream", "uuid 1.4.1", ] @@ -6007,6 +6027,30 @@ dependencies = [ "libc", ] +[[package]] +name = "tabled" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfe9c3632da101aba5131ed63f9eed38665f8b3c68703a6bb18124835c1a5d22" +dependencies = [ + "papergrid", + "tabled_derive", + "unicode-width", +] + +[[package]] +name = "tabled_derive" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99f688a08b54f4f02f0a3c382aefdb7884d3d69609f785bd253dc033243e3fe4" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "tap" version = "1.0.1" diff --git a/crates/query-gateway/Cargo.toml b/crates/query-gateway/Cargo.toml index 6508230..d431559 100644 --- a/crates/query-gateway/Cargo.toml +++ b/crates/query-gateway/Cargo.toml @@ -13,12 +13,15 @@ derivative = "2" duration-string = { version = "0.3", features = ["serde"] } env_logger = "0.10" flate2 = "1" +futures = "0.3" log = "0.4" prost = "0.11" rand = "0.8" serde = { version = "1", features = ["derive"] } serde_yaml = "0.9" +tabled = "0.14" tokio = { version = "1", features = ["macros", "rt-multi-thread", "sync"] } +tokio-stream = { version = "0.1", features = ["sync"] } uuid = { version = "1", features = ["v4", "fast-rng"] } subsquid-network-transport = { version = "0.1", path = "../../subsquid-network/transport" } diff --git a/crates/query-gateway/src/client.rs b/crates/query-gateway/src/client.rs index 96fb60c..e85386a 100644 --- a/crates/query-gateway/src/client.rs +++ b/crates/query-gateway/src/client.rs @@ -5,10 +5,15 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use derivative::Derivative; +use futures::stream::StreamExt; use prost::Message as ProstMsg; use rand::prelude::IteratorRandom; +use tabled::settings::Style; +use tabled::{Table, Tabled}; use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::task::JoinHandle; +use tokio::time::interval; +use tokio_stream::wrappers::IntervalStream; use contract_client::Worker; use router_controller::messages::{ @@ -18,10 +23,11 @@ use router_controller::messages::{ use subsquid_network_transport::{MsgContent, PeerId}; use crate::config::{Config, DatasetId}; +use crate::PING_TOPIC; type Message = subsquid_network_transport::Message>; -const WORKER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(30); +const WORKER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(120); const WORKER_GREYLIST_TIME: Duration = Duration::from_secs(600); const DEFAULT_QUERY_TIMEOUT: Duration = Duration::from_secs(60); @@ -113,6 +119,46 @@ impl DatasetState { } self.worker_ranges.insert(peer_id, state); } + + pub fn highest_indexable_block(&self) -> u32 { + let range_set: RangeSet = self + .worker_ranges + .values() + .cloned() + .flat_map(|r| r.ranges) + .into(); + match range_set.ranges.get(0) { + Some(range) if range.begin == 0 => range.end, + _ => 0, + } + } +} + +#[derive(Tabled)] +struct DatasetSummary<'a> { + #[tabled(rename = "dataset")] + name: &'a String, + #[tabled(rename = "highest indexable block")] + highest_indexable_block: u32, + #[tabled(rename = "highest seen block")] + highest_seen_block: u32, +} + +impl<'a> DatasetSummary<'a> { + pub fn new(name: &'a String, state: Option<&DatasetState>) -> Self { + match state { + None => Self { + name, + highest_indexable_block: 0, + highest_seen_block: 0, + }, + Some(state) => Self { + name, + highest_indexable_block: state.highest_indexable_block(), + highest_seen_block: state.height, + }, + } + } } #[derive(Default)] @@ -194,6 +240,12 @@ impl NetworkState { .get(dataset_id) .map(|state| state.height) } + + pub fn summary(&self) -> impl Iterator { + self.available_datasets + .iter() + .map(|(name, id)| DatasetSummary::new(name, self.dataset_states.get(id))) + } } struct QueryHandler { @@ -212,8 +264,10 @@ struct QueryHandler { impl QueryHandler { async fn run(mut self) { + let mut summary_timer = IntervalStream::new(interval(Duration::from_secs(30))).fuse(); loop { let _ = tokio::select! { + _ = summary_timer.select_next_some() => self.print_summary().await, Some(query) = self.query_receiver.recv() => self.handle_query(query) .await .map_err(|e| log::error!("Error handling query: {e:?}")), @@ -229,6 +283,13 @@ impl QueryHandler { } } + async fn print_summary(&self) -> Result<(), ()> { + let mut summary = Table::new(self.network_state.read().await.summary()); + summary.with(Style::sharp()); + log::info!("Datasets summary:\n{summary}"); + Ok(()) + } + fn generate_query_id() -> String { uuid::Uuid::new_v4().to_string() } @@ -337,12 +398,8 @@ impl QueryHandler { let Envelope { msg } = Envelope::decode(content.as_slice())?; match msg { Some(Msg::QueryResult(result)) => self.query_result(peer_id, result).await?, - Some(Msg::Ping(ping)) => self.ping(peer_id, ping).await, - Some(Msg::DatasetState(state)) => { - // TODO: This is a legacy message. Remove it. - let dataset_id = topic.ok_or_else(|| anyhow::anyhow!("Message topic missing"))?; - self.update_dataset_state(peer_id, DatasetId(dataset_id), state) - .await; + Some(Msg::Ping(ping)) if topic.is_some_and(|t| t == PING_TOPIC) => { + self.ping(peer_id, ping).await } _ => log::warn!("Unexpected message received: {msg:?}"), } @@ -361,19 +418,6 @@ impl QueryHandler { } } - async fn update_dataset_state( - &mut self, - peer_id: PeerId, - dataset_id: DatasetId, - state: RangeSet, - ) { - log::debug!("Updating dataset state. worker_id={peer_id} dataset_id={dataset_id}"); - self.network_state - .write() - .await - .update_dataset_state(peer_id, dataset_id, state) - } - async fn query_result( &mut self, peer_id: PeerId, diff --git a/crates/query-gateway/src/main.rs b/crates/query-gateway/src/main.rs index ed3f45e..9599176 100644 --- a/crates/query-gateway/src/main.rs +++ b/crates/query-gateway/src/main.rs @@ -1,4 +1,5 @@ use std::path::PathBuf; +use std::time::Duration; use clap::Parser; use env_logger::Env; @@ -86,6 +87,9 @@ async fn main() -> anyhow::Result<()> { ) .await?; + // Wait one worker ping cycle before starting to serve + tokio::time::sleep(Duration::from_secs(20)).await; + // Start HTTP server http_server::run_server(query_client, &http_listen_addr).await } diff --git a/crates/router-controller/src/controller.rs b/crates/router-controller/src/controller.rs index d247a56..b8fd0a8 100644 --- a/crates/router-controller/src/controller.rs +++ b/crates/router-controller/src/controller.rs @@ -263,10 +263,10 @@ impl Controller { }; if next_block > c.first_block() { log::error!("Received overlapping chunks: {} and {}", p, c); - return false + return false; } else { log::error!("There is a gap between {} and {}", p, c); - return false + return false; } } next_block = c.last_block() + 1 @@ -533,6 +533,8 @@ mod tests { worker_url: w.to_string(), state: Some(Default::default()), pause: false, + stored_bytes: 0, + version: "".to_string(), }); } @@ -545,6 +547,8 @@ mod tests { worker_url: w.to_string(), state: Some(Default::default()), pause: false, + stored_bytes: 0, + version: "".to_string(), }) }) .collect(); @@ -557,6 +561,8 @@ mod tests { worker_url: w.to_string(), state: Some(state.deref().clone()), pause: false, + stored_bytes: 0, + version: "".to_string(), }); }