diff --git a/src/cli.rs b/src/cli.rs index 25d1e0e..307bb9e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -58,14 +58,6 @@ fn default_max_buffer_size() -> usize { 100 } -fn default_default_chunk_timeout() -> Duration { - Duration::from_secs(30) -} - -fn default_default_request_multiplier() -> usize { - 1 -} - fn default_default_retries() -> usize { 3 } @@ -127,16 +119,6 @@ pub struct Config { #[serde(default = "default_max_buffer_size")] pub max_buffer_size: usize, - #[serde_as(as = "DurationSeconds")] - #[serde( - rename = "default_chunk_timeout_sec", - default = "default_default_chunk_timeout" - )] - pub default_chunk_timeout: Duration, - - #[serde(default = "default_default_request_multiplier")] - pub default_request_multiplier: usize, - #[serde(default = "default_default_retries")] pub default_retries: usize, diff --git a/src/controller/stream.rs b/src/controller/stream.rs index cf7bf0b..698879c 100644 --- a/src/controller/stream.rs +++ b/src/controller/stream.rs @@ -26,6 +26,7 @@ pub struct StreamController { timeouts: Arc, stats: Arc, span: tracing::Span, + last_error: Option, } struct Slot { @@ -84,6 +85,7 @@ impl StreamController { timeouts: Arc::new(TimeoutManager::new(timeout_quantile)), stats: Arc::new(StreamStats::new()), span: tracing::Span::current(), + last_error: None, }; controller.try_fill_slots(); if controller.buffer.total_size() == 0 { @@ -96,6 +98,8 @@ impl StreamController { &mut self, ctx: &mut Context<'_>, ) -> Poll>> { + self.last_error = None; + // extract this field to be able to pass both its values and `&mut self` to the method let mut buffer = std::mem::take(&mut self.buffer); let mut updated = false; @@ -120,6 +124,10 @@ impl StreamController { self.try_fill_slots(); + if let Poll::Ready(Some(Err(e))) = &result { + self.last_error = Some(e.to_string()); + } + result } @@ -272,6 +280,7 @@ impl StreamController { } Err(e) => { tracing::debug!("Couldn't schedule request: {e:?}"); + self.last_error = Some(e.to_string()); break; } } @@ -356,7 +365,7 @@ impl StreamController { impl Drop for StreamController { fn drop(&mut self) { let _enter = self.span.enter(); - self.stats.write_summary(); + self.stats.write_summary(&self.request, self.last_error.take()); } } diff --git a/src/http_server.rs b/src/http_server.rs index 1760f15..7c2e365 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use axum::{ async_trait, @@ -9,10 +9,10 @@ use axum::{ routing::{get, post}, Extension, RequestExt, Router, }; -use sqd_contract_client::PeerId; use futures::StreamExt; use itertools::Itertools; use prometheus_client::registry::Registry; +use sqd_contract_client::PeerId; use sqd_messages::query_result; use crate::{ @@ -102,9 +102,7 @@ async fn execute_stream_restricted( dataset_id: raw_request.dataset_id, buffer_size: raw_request.buffer_size.min(config.max_buffer_size), max_chunks: config.max_chunks_per_stream, - chunk_timeout: config.default_chunk_timeout, timeout_quantile: config.default_timeout_quantile, - request_multiplier: config.default_request_multiplier, retries: config.default_retries, }; execute_stream(Extension(task_manager), request).await @@ -211,18 +209,6 @@ where } None => config.default_buffer_size, }; - let chunk_timeout = match params.get("chunk_timeout") { - Some(value) => match value.parse() { - Ok(seconds) => Duration::from_secs(seconds), - Err(e) => { - return Err(RequestError::BadRequest(format!( - "Couldn't parse chunk_timeout: {e}" - )) - .into_response()) - } - }, - None => config.default_chunk_timeout, - }; let timeout_quantile = match params.get("timeout_quantile") { Some(value) => match value.parse() { Ok(quantile) => quantile, @@ -235,18 +221,6 @@ where }, None => config.default_timeout_quantile, }; - let request_multiplier = match params.get("request_multiplier") { - Some(value) => match value.parse() { - Ok(value) => value, - Err(e) => { - return Err(RequestError::BadRequest(format!( - "Couldn't parse request_multiplier: {e}" - )) - .into_response()) - } - }, - None => config.default_request_multiplier, - }; let retries = match params.get("retries") { Some(value) => match value.parse() { Ok(value) => value, @@ -265,9 +239,7 @@ where query, buffer_size, max_chunks: config.max_chunks_per_stream, - chunk_timeout, timeout_quantile, - request_multiplier, retries, }) } diff --git a/src/metrics.rs b/src/metrics.rs index 6206809..2d13907 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -10,6 +10,7 @@ lazy_static::lazy_static! { pub static ref VALID_PINGS: Counter = Default::default(); pub static ref IGNORED_PINGS: Counter = Default::default(); pub static ref QUERIES_SENT: Counter = Default::default(); + pub static ref QUERIES_RUNNING: Gauge = Default::default(); static ref QUERY_RESULTS: Family, Counter> = Default::default(); pub static ref KNOWN_WORKERS: Gauge = Default::default(); pub static ref ACTIVE_STREAMS: Gauge = Default::default(); @@ -70,6 +71,11 @@ pub fn register_metrics(registry: &mut Registry) { "Number of sent queries", QUERIES_SENT.clone(), ); + registry.register( + "queries_running", + "Number of sent queries", + QUERIES_RUNNING.clone(), + ); registry.register( "queries_responded", "Number of received responses", diff --git a/src/network/client.rs b/src/network/client.rs index adaf1cd..dc43a35 100644 --- a/src/network/client.rs +++ b/src/network/client.rs @@ -6,7 +6,8 @@ use serde::Serialize; use sqd_contract_client::{Client as ContractClient, PeerId}; use sqd_messages::{query_result, Ping, Query, QueryResult}; use sqd_network_transport::{ - get_agent_info, AgentInfo, GatewayConfig, GatewayEvent, GatewayTransportHandle, P2PTransportBuilder, QueueFull, TransportArgs + get_agent_info, AgentInfo, GatewayConfig, GatewayEvent, GatewayTransportHandle, + P2PTransportBuilder, QueueFull, TransportArgs, }; use tokio::{sync::oneshot, time::Instant}; use tokio_util::sync::CancellationToken; @@ -23,6 +24,7 @@ use super::{NetworkState, StorageClient}; lazy_static::lazy_static! { static ref SUPPORTED_VERSIONS: semver::VersionReq = "~1.2.0".parse().expect("Invalid version requirement"); } +const MAX_CONCURRENT_QUERIES: usize = 1000; /// Tracks the network state and handles p2p communication pub struct NetworkClient { @@ -141,8 +143,14 @@ impl NetworkClient { }); } GatewayEvent::QueryDropped { query_id } => { - // No good way to handle this yet, just drop the response sender - if self.tasks.lock().remove_entry(&query_id).is_none() { + if let Some(task) = self.tasks.lock().remove(&query_id) { + metrics::QUERIES_RUNNING.dec(); + task.result_tx + .send(query_result::Result::ServerError( + "Outbound queue overloaded".to_string(), + )) + .ok(); + } else { tracing::error!("Not expecting response for query {query_id}"); } } @@ -173,28 +181,42 @@ impl NetworkClient { query: String, ) -> Result, QueueFull> { let query_id = generate_query_id(); + tracing::trace!("Sending query {query_id} to {worker}"); - self.transport_handle.send_query( - *worker, - Query { - dataset: Some(dataset.to_string()), - query_id: Some(query_id.clone()), - query: Some(query), - client_state_json: Some("{}".to_string()), // This is a placeholder field - profiling: Some(false), - ..Default::default() - }, - )?; - tracing::trace!("Sent query {query_id} to {worker}"); - metrics::QUERIES_SENT.inc(); self.network_state.lock().lease_worker(*worker); let (result_tx, result_rx) = oneshot::channel(); + let task = QueryTask { result_tx, worker_id: *worker, }; - self.tasks.lock().insert(query_id, task); + let mut tasks = self.tasks.lock(); + if tasks.len() >= MAX_CONCURRENT_QUERIES { + return Err(QueueFull); + } + tasks.insert(query_id.clone(), task); + drop(tasks); + + self.transport_handle + .send_query( + *worker, + Query { + dataset: Some(dataset.to_string()), + query_id: Some(query_id.clone()), + query: Some(query), + client_state_json: Some("{}".to_string()), // This is a placeholder field + profiling: Some(false), + ..Default::default() + }, + ) + .map_err(|e| { + self.tasks.lock().remove(&query_id); + e + })?; + + metrics::QUERIES_RUNNING.inc(); + metrics::QUERIES_SENT.inc(); Ok(result_rx) } @@ -221,11 +243,13 @@ impl NetworkClient { tracing::trace!("Got result for query {query_id}"); metrics::report_query_result(&result); - let (_query_id, task) = self - .tasks - .lock() + let mut tasks = self.tasks.lock(); + let (_query_id, task) = tasks .remove_entry(&query_id) .ok_or_else(|| anyhow::anyhow!("Not expecting response for query {query_id}"))?; + metrics::QUERIES_RUNNING.set(tasks.len() as i64); + drop(tasks); + if peer_id != task.worker_id { self.network_state.lock().report_query_error(peer_id); anyhow::bail!( diff --git a/src/types/request.rs b/src/types/request.rs index f8b7dda..028d5b0 100644 --- a/src/types/request.rs +++ b/src/types/request.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, time::Duration}; +use std::str::FromStr; use super::{BlockRange, DatasetId}; @@ -8,9 +8,7 @@ pub struct ClientRequest { pub query: ParsedQuery, pub buffer_size: usize, pub max_chunks: Option, - pub chunk_timeout: Duration, pub timeout_quantile: f32, - pub request_multiplier: usize, pub retries: usize, } @@ -29,6 +27,10 @@ impl ParsedQuery { .and_then(|v| v.as_u64()) .ok_or(anyhow::anyhow!("fromBlock is required"))?; let last_block = json.get("toBlock").and_then(|v| v.as_u64()); + anyhow::ensure!( + last_block.is_none() || last_block >= Some(first_block), + "toBlock must be greater or equal to fromBlock" + ); Ok(Self { json, first_block, @@ -60,6 +62,10 @@ impl ParsedQuery { }; (begin <= end).then_some(begin..=end) } + + pub fn to_string(&self) -> String { + serde_json::to_string(&self.json).expect("Couldn't serialize query") + } } impl FromStr for ParsedQuery { diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 39c3749..4b0431a 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -5,6 +5,8 @@ use parking_lot::Mutex; use tokio::time::{Duration, Instant}; use tracing::Instrument; +use crate::types::ClientRequest; + const LOG_INTERVAL: Duration = Duration::from_secs(5); pub struct StreamStats { @@ -54,13 +56,22 @@ impl StreamStats { } } - pub fn write_summary(&self) { + pub fn write_summary(&self, request: &ClientRequest, error: Option) { + // tracing::debug!( + // dataset = %request.dataset_id, + // query = request.query.to_string(), + // "Query processed" + // ); tracing::info!( + dataset = %request.dataset_id, + first_block = request.query.first_block(), + last_block = request.query.last_block(), queries_sent = self.queries_sent.load(Ordering::Relaxed), chunks_downloaded = self.chunks_downloaded.load(Ordering::Relaxed), blocks_streamed = self.response_blocks.load(Ordering::Relaxed), bytes_streamed = self.response_bytes.load(Ordering::Relaxed), total_time = ?self.start_time.elapsed(), + error = error.unwrap_or_else(|| "-".to_string()), "Stream finished" ); }