Skip to content

Commit

Permalink
Improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed Oct 15, 2024
1 parent e480616 commit 6e03cea
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 73 deletions.
18 changes: 0 additions & 18 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,6 @@ fn default_max_buffer_size() -> usize {
100
}

fn default_default_chunk_timeout() -> Duration {
Duration::from_secs(30)
}

fn default_default_request_multiplier() -> usize {
1
}

fn default_default_retries() -> usize {
3
}
Expand Down Expand Up @@ -127,16 +119,6 @@ pub struct Config {
#[serde(default = "default_max_buffer_size")]
pub max_buffer_size: usize,

#[serde_as(as = "DurationSeconds")]
#[serde(
rename = "default_chunk_timeout_sec",
default = "default_default_chunk_timeout"
)]
pub default_chunk_timeout: Duration,

#[serde(default = "default_default_request_multiplier")]
pub default_request_multiplier: usize,

#[serde(default = "default_default_retries")]
pub default_retries: usize,

Expand Down
11 changes: 10 additions & 1 deletion src/controller/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct StreamController {
timeouts: Arc<TimeoutManager>,
stats: Arc<StreamStats>,
span: tracing::Span,
last_error: Option<String>,
}

struct Slot {
Expand Down Expand Up @@ -84,6 +85,7 @@ impl StreamController {
timeouts: Arc::new(TimeoutManager::new(timeout_quantile)),
stats: Arc::new(StreamStats::new()),
span: tracing::Span::current(),
last_error: None,
};
controller.try_fill_slots();
if controller.buffer.total_size() == 0 {
Expand All @@ -96,6 +98,8 @@ impl StreamController {
&mut self,
ctx: &mut Context<'_>,
) -> Poll<Option<Result<ResponseChunk, RequestError>>> {
self.last_error = None;

// extract this field to be able to pass both its values and `&mut self` to the method
let mut buffer = std::mem::take(&mut self.buffer);
let mut updated = false;
Expand All @@ -120,6 +124,10 @@ impl StreamController {

self.try_fill_slots();

if let Poll::Ready(Some(Err(e))) = &result {
self.last_error = Some(e.to_string());
}

result
}

Expand Down Expand Up @@ -272,6 +280,7 @@ impl StreamController {
}
Err(e) => {
tracing::debug!("Couldn't schedule request: {e:?}");
self.last_error = Some(e.to_string());
break;
}
}
Expand Down Expand Up @@ -356,7 +365,7 @@ impl StreamController {
impl Drop for StreamController {
fn drop(&mut self) {
let _enter = self.span.enter();
self.stats.write_summary();
self.stats.write_summary(&self.request, self.last_error.take());
}
}

Expand Down
32 changes: 2 additions & 30 deletions src/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use std::{collections::HashMap, net::SocketAddr, sync::Arc};

use axum::{
async_trait,
Expand All @@ -9,10 +9,10 @@ use axum::{
routing::{get, post},
Extension, RequestExt, Router,
};
use sqd_contract_client::PeerId;
use futures::StreamExt;
use itertools::Itertools;
use prometheus_client::registry::Registry;
use sqd_contract_client::PeerId;
use sqd_messages::query_result;

use crate::{
Expand Down Expand Up @@ -102,9 +102,7 @@ async fn execute_stream_restricted(
dataset_id: raw_request.dataset_id,
buffer_size: raw_request.buffer_size.min(config.max_buffer_size),
max_chunks: config.max_chunks_per_stream,
chunk_timeout: config.default_chunk_timeout,
timeout_quantile: config.default_timeout_quantile,
request_multiplier: config.default_request_multiplier,
retries: config.default_retries,
};
execute_stream(Extension(task_manager), request).await
Expand Down Expand Up @@ -211,18 +209,6 @@ where
}
None => config.default_buffer_size,
};
let chunk_timeout = match params.get("chunk_timeout") {
Some(value) => match value.parse() {
Ok(seconds) => Duration::from_secs(seconds),
Err(e) => {
return Err(RequestError::BadRequest(format!(
"Couldn't parse chunk_timeout: {e}"
))
.into_response())
}
},
None => config.default_chunk_timeout,
};
let timeout_quantile = match params.get("timeout_quantile") {
Some(value) => match value.parse() {
Ok(quantile) => quantile,
Expand All @@ -235,18 +221,6 @@ where
},
None => config.default_timeout_quantile,
};
let request_multiplier = match params.get("request_multiplier") {
Some(value) => match value.parse() {
Ok(value) => value,
Err(e) => {
return Err(RequestError::BadRequest(format!(
"Couldn't parse request_multiplier: {e}"
))
.into_response())
}
},
None => config.default_request_multiplier,
};
let retries = match params.get("retries") {
Some(value) => match value.parse() {
Ok(value) => value,
Expand All @@ -265,9 +239,7 @@ where
query,
buffer_size,
max_chunks: config.max_chunks_per_stream,
chunk_timeout,
timeout_quantile,
request_multiplier,
retries,
})
}
Expand Down
6 changes: 6 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ lazy_static::lazy_static! {
pub static ref VALID_PINGS: Counter = Default::default();
pub static ref IGNORED_PINGS: Counter = Default::default();
pub static ref QUERIES_SENT: Counter = Default::default();
pub static ref QUERIES_RUNNING: Gauge = Default::default();
static ref QUERY_RESULTS: Family<Vec<(String, String)>, Counter> = Default::default();
pub static ref KNOWN_WORKERS: Gauge = Default::default();
pub static ref ACTIVE_STREAMS: Gauge = Default::default();
Expand Down Expand Up @@ -70,6 +71,11 @@ pub fn register_metrics(registry: &mut Registry) {
"Number of sent queries",
QUERIES_SENT.clone(),
);
registry.register(
"queries_running",
"Number of sent queries",
QUERIES_RUNNING.clone(),
);
registry.register(
"queries_responded",
"Number of received responses",
Expand Down
64 changes: 44 additions & 20 deletions src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use serde::Serialize;
use sqd_contract_client::{Client as ContractClient, PeerId};
use sqd_messages::{query_result, Ping, Query, QueryResult};
use sqd_network_transport::{
get_agent_info, AgentInfo, GatewayConfig, GatewayEvent, GatewayTransportHandle, P2PTransportBuilder, QueueFull, TransportArgs
get_agent_info, AgentInfo, GatewayConfig, GatewayEvent, GatewayTransportHandle,
P2PTransportBuilder, QueueFull, TransportArgs,
};
use tokio::{sync::oneshot, time::Instant};
use tokio_util::sync::CancellationToken;
Expand All @@ -23,6 +24,7 @@ use super::{NetworkState, StorageClient};
lazy_static::lazy_static! {
static ref SUPPORTED_VERSIONS: semver::VersionReq = "~1.2.0".parse().expect("Invalid version requirement");
}
const MAX_CONCURRENT_QUERIES: usize = 1000;

/// Tracks the network state and handles p2p communication
pub struct NetworkClient {
Expand Down Expand Up @@ -141,8 +143,14 @@ impl NetworkClient {
});
}
GatewayEvent::QueryDropped { query_id } => {
// No good way to handle this yet, just drop the response sender
if self.tasks.lock().remove_entry(&query_id).is_none() {
if let Some(task) = self.tasks.lock().remove(&query_id) {
metrics::QUERIES_RUNNING.dec();
task.result_tx
.send(query_result::Result::ServerError(
"Outbound queue overloaded".to_string(),
))
.ok();
} else {
tracing::error!("Not expecting response for query {query_id}");
}
}
Expand Down Expand Up @@ -173,28 +181,42 @@ impl NetworkClient {
query: String,
) -> Result<oneshot::Receiver<query_result::Result>, QueueFull> {
let query_id = generate_query_id();
tracing::trace!("Sending query {query_id} to {worker}");

self.transport_handle.send_query(
*worker,
Query {
dataset: Some(dataset.to_string()),
query_id: Some(query_id.clone()),
query: Some(query),
client_state_json: Some("{}".to_string()), // This is a placeholder field
profiling: Some(false),
..Default::default()
},
)?;
tracing::trace!("Sent query {query_id} to {worker}");
metrics::QUERIES_SENT.inc();
self.network_state.lock().lease_worker(*worker);

let (result_tx, result_rx) = oneshot::channel();

let task = QueryTask {
result_tx,
worker_id: *worker,
};
self.tasks.lock().insert(query_id, task);
let mut tasks = self.tasks.lock();
if tasks.len() >= MAX_CONCURRENT_QUERIES {
return Err(QueueFull);
}
tasks.insert(query_id.clone(), task);
drop(tasks);

self.transport_handle
.send_query(
*worker,
Query {
dataset: Some(dataset.to_string()),
query_id: Some(query_id.clone()),
query: Some(query),
client_state_json: Some("{}".to_string()), // This is a placeholder field
profiling: Some(false),
..Default::default()
},
)
.map_err(|e| {
self.tasks.lock().remove(&query_id);
e
})?;

metrics::QUERIES_RUNNING.inc();
metrics::QUERIES_SENT.inc();
Ok(result_rx)
}

Expand All @@ -221,11 +243,13 @@ impl NetworkClient {
tracing::trace!("Got result for query {query_id}");
metrics::report_query_result(&result);

let (_query_id, task) = self
.tasks
.lock()
let mut tasks = self.tasks.lock();
let (_query_id, task) = tasks
.remove_entry(&query_id)
.ok_or_else(|| anyhow::anyhow!("Not expecting response for query {query_id}"))?;
metrics::QUERIES_RUNNING.set(tasks.len() as i64);
drop(tasks);

if peer_id != task.worker_id {
self.network_state.lock().report_query_error(peer_id);
anyhow::bail!(
Expand Down
12 changes: 9 additions & 3 deletions src/types/request.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{str::FromStr, time::Duration};
use std::str::FromStr;

use super::{BlockRange, DatasetId};

Expand All @@ -8,9 +8,7 @@ pub struct ClientRequest {
pub query: ParsedQuery,
pub buffer_size: usize,
pub max_chunks: Option<usize>,
pub chunk_timeout: Duration,
pub timeout_quantile: f32,
pub request_multiplier: usize,
pub retries: usize,
}

Expand All @@ -29,6 +27,10 @@ impl ParsedQuery {
.and_then(|v| v.as_u64())
.ok_or(anyhow::anyhow!("fromBlock is required"))?;
let last_block = json.get("toBlock").and_then(|v| v.as_u64());
anyhow::ensure!(
last_block.is_none() || last_block >= Some(first_block),
"toBlock must be greater or equal to fromBlock"
);
Ok(Self {
json,
first_block,
Expand Down Expand Up @@ -60,6 +62,10 @@ impl ParsedQuery {
};
(begin <= end).then_some(begin..=end)
}

pub fn to_string(&self) -> String {
serde_json::to_string(&self.json).expect("Couldn't serialize query")
}
}

impl FromStr for ParsedQuery {
Expand Down
13 changes: 12 additions & 1 deletion src/utils/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use parking_lot::Mutex;
use tokio::time::{Duration, Instant};
use tracing::Instrument;

use crate::types::ClientRequest;

const LOG_INTERVAL: Duration = Duration::from_secs(5);

pub struct StreamStats {
Expand Down Expand Up @@ -54,13 +56,22 @@ impl StreamStats {
}
}

pub fn write_summary(&self) {
pub fn write_summary(&self, request: &ClientRequest, error: Option<String>) {
// tracing::debug!(
// dataset = %request.dataset_id,
// query = request.query.to_string(),
// "Query processed"
// );
tracing::info!(
dataset = %request.dataset_id,
first_block = request.query.first_block(),
last_block = request.query.last_block(),
queries_sent = self.queries_sent.load(Ordering::Relaxed),
chunks_downloaded = self.chunks_downloaded.load(Ordering::Relaxed),
blocks_streamed = self.response_blocks.load(Ordering::Relaxed),
bytes_streamed = self.response_bytes.load(Ordering::Relaxed),
total_time = ?self.start_time.elapsed(),
error = error.unwrap_or_else(|| "-".to_string()),
"Stream finished"
);
}
Expand Down

0 comments on commit 6e03cea

Please sign in to comment.