Skip to content

Commit

Permalink
add timeout for requests
Browse files Browse the repository at this point in the history
  • Loading branch information
tmcgroul committed Aug 30, 2024
1 parent bcc3765 commit ba1eae9
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 14 deletions.
5 changes: 5 additions & 0 deletions crates/router-controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ impl Controller {

let managed_workers: Vec<_> = workers.iter().filter(|w| w.is_managed).cloned().collect();
if managed_workers.len() < self.managed_workers.len() {
log::warn!(
"{} out of {} workers available. Skipping scheduling",
managed_workers.len(),
self.managed_workers.len()
);
self.workers.set(Arc::new(workers));
return;
}
Expand Down
44 changes: 30 additions & 14 deletions crates/router/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::time::{Duration, Instant};

use tracing::{error, info};
use tracing::{error, info, debug};

use router_controller::controller::Controller;

Expand All @@ -19,20 +19,35 @@ async fn update_datasets(controller: &Arc<Controller>, datasets: &Vec<Dataset>)
.and_then(|height| u32::try_from(height).ok().map(|height| height + 1))
.unwrap_or(dataset.start_block().unwrap_or(0));

let chunks = match dataset.storage().get_chunks(next_block).await {
Ok(chunks) => {
info!("found new chunks in {}: {:?}", dataset.url(), chunks);
if let Some(chunk) = chunks.last() {
DATASET_HEIGHT
.with_label_values(&[&dataset.url()])
.set(chunk.last_block().into())
debug!("getting new chunks for {}", dataset.url());
let mut attempt = 0;
let chunks = loop {
let future = dataset.storage().get_chunks(next_block);
let duration = Duration::from_secs(300 + attempt * 60);
match tokio::time::timeout(duration, future).await {
Ok(Ok(chunks)) => {
info!("found new chunks in {}: {:?}", dataset.url(), chunks);
if let Some(chunk) = chunks.last() {
DATASET_HEIGHT
.with_label_values(&[&dataset.url()])
.set(chunk.last_block().into())
}
break chunks
},
Ok(Err(err)) => {
error!("failed to download new chunks for {}: {:?}", dataset.url(), err);
DATASET_SYNC_ERRORS.with_label_values(&[dataset.url()]).inc();
return
},
Err(_) => {
if attempt == 5 {
error!("failed to download new chunks for {} within {:?}", dataset.url(), duration);
DATASET_SYNC_ERRORS.with_label_values(&[dataset.url()]).inc();
return
}
attempt += 1;
info!("{}/5 retry to download {}", attempt, dataset.url());
}
chunks
},
Err(err) => {
error!("failed to download new chunks for {}: {:?}", dataset.url(), err);
DATASET_SYNC_ERRORS.with_label_values(&[dataset.url()]).inc();
return
}
};

Expand Down Expand Up @@ -60,6 +75,7 @@ pub fn start(

let now = Instant::now();
if let Some(duration) = schedule_time.checked_duration_since(now) {
debug!("waiting {} seconds before scheduling", duration.as_secs());
tokio::time::sleep(duration).await;
}
controller.schedule();
Expand Down

0 comments on commit ba1eae9

Please sign in to comment.