Skip to content

Commit

Permalink
Dataset summary printing
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Oct 2, 2023
1 parent 9fa6c92 commit a43c4a6
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 22 deletions.
44 changes: 44 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/query-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ derivative = "2"
duration-string = { version = "0.3", features = ["serde"] }
env_logger = "0.10"
flate2 = "1"
futures = "0.3"
log = "0.4"
prost = "0.11"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
tabled = "0.14"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "sync"] }
tokio-stream = { version = "0.1", features = ["sync"] }
uuid = { version = "1", features = ["v4", "fast-rng"] }

subsquid-network-transport = { version = "0.1", path = "../../subsquid-network/transport" }
Expand Down
84 changes: 64 additions & 20 deletions crates/query-gateway/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use derivative::Derivative;
use futures::stream::StreamExt;
use prost::Message as ProstMsg;
use rand::prelude::IteratorRandom;
use tabled::settings::Style;
use tabled::{Table, Tabled};
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::task::JoinHandle;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

use contract_client::Worker;
use router_controller::messages::{
Expand All @@ -18,10 +23,11 @@ use router_controller::messages::{
use subsquid_network_transport::{MsgContent, PeerId};

use crate::config::{Config, DatasetId};
use crate::PING_TOPIC;

type Message = subsquid_network_transport::Message<Box<[u8]>>;

const WORKER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(30);
const WORKER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(120);
const WORKER_GREYLIST_TIME: Duration = Duration::from_secs(600);
const DEFAULT_QUERY_TIMEOUT: Duration = Duration::from_secs(60);

Expand Down Expand Up @@ -113,6 +119,46 @@ impl DatasetState {
}
self.worker_ranges.insert(peer_id, state);
}

pub fn highest_indexable_block(&self) -> u32 {
let range_set: RangeSet = self
.worker_ranges
.values()
.cloned()
.flat_map(|r| r.ranges)
.into();
match range_set.ranges.get(0) {
Some(range) if range.begin == 0 => range.end,
_ => 0,
}
}
}

#[derive(Tabled)]
struct DatasetSummary<'a> {
#[tabled(rename = "dataset")]
name: &'a String,
#[tabled(rename = "highest indexable block")]
highest_indexable_block: u32,
#[tabled(rename = "highest seen block")]
highest_seen_block: u32,
}

impl<'a> DatasetSummary<'a> {
pub fn new(name: &'a String, state: Option<&DatasetState>) -> Self {
match state {
None => Self {
name,
highest_indexable_block: 0,
highest_seen_block: 0,
},
Some(state) => Self {
name,
highest_indexable_block: state.highest_indexable_block(),
highest_seen_block: state.height,
},
}
}
}

#[derive(Default)]
Expand Down Expand Up @@ -194,6 +240,12 @@ impl NetworkState {
.get(dataset_id)
.map(|state| state.height)
}

pub fn summary(&self) -> impl Iterator<Item = DatasetSummary> {
self.available_datasets
.iter()
.map(|(name, id)| DatasetSummary::new(name, self.dataset_states.get(id)))
}
}

struct QueryHandler {
Expand All @@ -212,8 +264,10 @@ struct QueryHandler {

impl QueryHandler {
async fn run(mut self) {
let mut summary_timer = IntervalStream::new(interval(Duration::from_secs(30))).fuse();
loop {
let _ = tokio::select! {
_ = summary_timer.select_next_some() => self.print_summary().await,
Some(query) = self.query_receiver.recv() => self.handle_query(query)
.await
.map_err(|e| log::error!("Error handling query: {e:?}")),
Expand All @@ -229,6 +283,13 @@ impl QueryHandler {
}
}

async fn print_summary(&self) -> Result<(), ()> {
let mut summary = Table::new(self.network_state.read().await.summary());
summary.with(Style::sharp());
log::info!("Datasets summary:\n{summary}");
Ok(())
}

fn generate_query_id() -> String {
uuid::Uuid::new_v4().to_string()
}
Expand Down Expand Up @@ -337,12 +398,8 @@ impl QueryHandler {
let Envelope { msg } = Envelope::decode(content.as_slice())?;
match msg {
Some(Msg::QueryResult(result)) => self.query_result(peer_id, result).await?,
Some(Msg::Ping(ping)) => self.ping(peer_id, ping).await,
Some(Msg::DatasetState(state)) => {
// TODO: This is a legacy message. Remove it.
let dataset_id = topic.ok_or_else(|| anyhow::anyhow!("Message topic missing"))?;
self.update_dataset_state(peer_id, DatasetId(dataset_id), state)
.await;
Some(Msg::Ping(ping)) if topic.is_some_and(|t| t == PING_TOPIC) => {
self.ping(peer_id, ping).await
}
_ => log::warn!("Unexpected message received: {msg:?}"),
}
Expand All @@ -361,19 +418,6 @@ impl QueryHandler {
}
}

async fn update_dataset_state(
&mut self,
peer_id: PeerId,
dataset_id: DatasetId,
state: RangeSet,
) {
log::debug!("Updating dataset state. worker_id={peer_id} dataset_id={dataset_id}");
self.network_state
.write()
.await
.update_dataset_state(peer_id, dataset_id, state)
}

async fn query_result(
&mut self,
peer_id: PeerId,
Expand Down
4 changes: 4 additions & 0 deletions crates/query-gateway/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::path::PathBuf;
use std::time::Duration;

use clap::Parser;
use env_logger::Env;
Expand Down Expand Up @@ -86,6 +87,9 @@ async fn main() -> anyhow::Result<()> {
)
.await?;

// Wait one worker ping cycle before starting to serve
tokio::time::sleep(Duration::from_secs(20)).await;

// Start HTTP server
http_server::run_server(query_client, &http_listen_addr).await
}
10 changes: 8 additions & 2 deletions crates/router-controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ impl Controller {
};
if next_block > c.first_block() {
log::error!("Received overlapping chunks: {} and {}", p, c);
return false
return false;
} else {
log::error!("There is a gap between {} and {}", p, c);
return false
return false;
}
}
next_block = c.last_block() + 1
Expand Down Expand Up @@ -533,6 +533,8 @@ mod tests {
worker_url: w.to_string(),
state: Some(Default::default()),
pause: false,
stored_bytes: 0,
version: "".to_string(),
});
}

Expand All @@ -545,6 +547,8 @@ mod tests {
worker_url: w.to_string(),
state: Some(Default::default()),
pause: false,
stored_bytes: 0,
version: "".to_string(),
})
})
.collect();
Expand All @@ -557,6 +561,8 @@ mod tests {
worker_url: w.to_string(),
state: Some(state.deref().clone()),
pause: false,
stored_bytes: 0,
version: "".to_string(),
});
}

Expand Down

0 comments on commit a43c4a6

Please sign in to comment.