Skip to content

Commit

Permalink
fix(scheduler): add mutex to avoid job overrun (#1396)
Browse files Browse the repository at this point in the history
  • Loading branch information
wsxiaoys authored Feb 6, 2024
1 parent 2495018 commit f0f9bd7
Showing 1 changed file with 66 additions and 52 deletions.
118 changes: 66 additions & 52 deletions crates/tabby-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use anyhow::Result;
use tabby_common::config::{RepositoryAccess, RepositoryConfig};
use tokio::io::AsyncBufReadExt;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{error, info};
use tracing::{error, info, warn};

pub async fn scheduler<T: RepositoryAccess + 'static>(
now: bool,
Expand All @@ -26,63 +26,77 @@ pub async fn scheduler<T: RepositoryAccess + 'static>(
let args = args.to_owned();
let access = Arc::new(access);
let scheduler = JobScheduler::new().await?;
let scheduler_mutex = Arc::new(tokio::sync::Mutex::new(()));

// Every 10 minutes
scheduler
.add(Job::new_async("* 1/10 * * * * *", move |_, _| {
let access = access.clone();
let args = args.clone();
Box::pin(async move {
info!("Running scheduler job...");
let exe = std::env::current_exe().unwrap();
let job_id = access
.create_job_run("scheduler".to_owned())
.await
.unwrap_or_default();
.add(Job::new_async(
"* 1/10 * * * * *",
move |uuid, mut scheduler| {
let access = access.clone();
let args = args.clone();
let scheduler_mutex = scheduler_mutex.clone();
Box::pin(async move {
let Ok(_guard) = scheduler_mutex.try_lock() else {
warn!("Scheduler job overlapped, skipping...");
return;
};

let mut child = tokio::process::Command::new(exe)
.arg("scheduler")
.arg("--now")
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
info!("Running scheduler job...");
let exe = std::env::current_exe().unwrap();
let job_id = access
.create_job_run("scheduler".to_owned())
.await
.unwrap_or_default();

{
// Pipe stdout
let access = access.clone();
let stdout = child.stdout.take().unwrap();
tokio::spawn(async move {
let stdout = tokio::io::BufReader::new(stdout);
let mut stdout = stdout.lines();
while let Ok(Some(line)) = stdout.next_line().await {
println!("{line}");
let _ = access.update_job_stdout(job_id, line + "\n").await;
}
});
}
let mut child = tokio::process::Command::new(exe)
.arg("scheduler")
.arg("--now")
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();

{
// Pipe stderr
let access = access.clone();
let stderr = child.stderr.take().unwrap();
tokio::spawn(async move {
let stderr = tokio::io::BufReader::new(stderr);
let mut stdout = stderr.lines();
while let Ok(Some(line)) = stdout.next_line().await {
eprintln!("{line}");
let _ = access.update_job_stderr(job_id, line + "\n").await;
}
});
}
if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) {
let _ = access.complete_job_run(job_id, exit_code).await;
} else {
let _ = access.complete_job_run(job_id, -1).await;
}
})
})?)
{
// Pipe stdout
let access = access.clone();
let stdout = child.stdout.take().unwrap();
tokio::spawn(async move {
let stdout = tokio::io::BufReader::new(stdout);
let mut stdout = stdout.lines();
while let Ok(Some(line)) = stdout.next_line().await {
println!("{line}");
let _ = access.update_job_stdout(job_id, line + "\n").await;
}
});
}

{
// Pipe stderr
let access = access.clone();
let stderr = child.stderr.take().unwrap();
tokio::spawn(async move {
let stderr = tokio::io::BufReader::new(stderr);
let mut stdout = stderr.lines();
while let Ok(Some(line)) = stdout.next_line().await {
eprintln!("{line}");
let _ = access.update_job_stderr(job_id, line + "\n").await;
}
});
}
if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) {
let _ = access.complete_job_run(job_id, exit_code).await;
} else {
let _ = access.complete_job_run(job_id, -1).await;
}

if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await {
info!("Next time for scheduler job is {:?}", next_tick);
}
})
},
)?)
.await?;

info!("Scheduler activated...");
Expand Down

0 comments on commit f0f9bd7

Please sign in to comment.