Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
try fifo query queue
Browse files Browse the repository at this point in the history
  • Loading branch information
aidan-smith committed May 2, 2024
1 parent 5dafdf5 commit 8936b1e
Showing 1 changed file with 26 additions and 19 deletions.
45 changes: 26 additions & 19 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use crate::task::{
};
use dashmap::DashMap;
use datafusion::physical_plan::ExecutionPlan;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::SystemTime;
use tokio::sync::{Mutex, Notify, RwLock};

// Must implement here since generated TaskId does not derive Hash.
Expand All @@ -30,9 +30,9 @@ impl Copy for TaskId {}

#[derive(Debug)]
pub struct State {
// queue: Mutex<VecDeque<QueryKey>>,
// The queue used to order queries by executor usage.
queue: Mutex<BTreeMap<Duration, u64>>,
// queue: Mutex<BTreeMap<Duration, u64>>,
queue: Mutex<VecDeque<u64>>,
start_ts: SystemTime,

query_id_counter: AtomicU64,
Expand All @@ -47,7 +47,8 @@ pub struct State {
impl State {
pub fn new(notify: Arc<Notify>) -> Self {
Self {
queue: Mutex::new(BTreeMap::new()),
// queue: Mutex::new(BTreeMap::new()),
queue: Mutex::new(VecDeque::new()),
start_ts: SystemTime::now(),
query_id_counter: AtomicU64::new(0),
table: DashMap::new(),
Expand All @@ -67,7 +68,8 @@ impl State {
query.time = time;

self.table.insert(id, RwLock::new(query));
self.queue.lock().await.insert(time, id);
// self.queue.lock().await.insert(time, id);
self.queue.lock().await.push_back(id);

self.notify.notify_waiters();
id
Expand All @@ -93,7 +95,8 @@ impl State {
}

pub async fn next_task(&self) -> Option<(TaskId, Arc<dyn ExecutionPlan>)> {
let Some((duration, query_id)) = self.queue.lock().await.pop_first() else {
// let Some((duration, query_id)) = self.queue.lock().await.pop_first() else {
let Some(query_id) = self.queue.lock().await.pop_front() else {
return None;
};
let query = self.table.get(&query_id).unwrap();
Expand All @@ -109,7 +112,8 @@ impl State {
.update_stage_status(task.task_id.stage_id, StageStatus::Running(0))
.unwrap();
if let QueryQueueStatus::Available = guard.get_queue_status() {
self.queue.lock().await.insert(duration, query_id);
// self.queue.lock().await.insert(duration, query_id);
self.queue.lock().await.push_back(query_id);
self.notify.notify_waiters();
}

Expand All @@ -120,7 +124,7 @@ impl State {
pub async fn report_task(&self, task_id: TaskId, status: TaskStatus) {
if let Some((_, task)) = self.running_tasks.remove(&task_id) {
println!("Updating {:?} status to {:?}", task_id, status);
let TaskStatus::Running(ts) = task.status else {
let TaskStatus::Running(_ts) = task.status else {
println!("Task removed with status {:?}", task.status);
panic!("Task removed but is not running.");
};
Expand All @@ -136,14 +140,19 @@ impl State {
_ => unreachable!(),
}

let new_time = guard.time + SystemTime::now().duration_since(ts).unwrap();
let mut queue = self.queue.lock().await;
let _ = queue.remove(&guard.time);
if let QueryQueueStatus::Available = guard.get_queue_status() {
queue.insert(new_time, task_id.query_id);
self.notify.notify_waiters();
// let new_time = guard.time + SystemTime::now().duration_since(ts).unwrap();
// let _ = queue.remove(&guard.time);
// if let QueryQueueStatus::Available = guard.get_queue_status() {
// queue.insert(new_time, task_id.query_id);
// self.notify.notify_waiters();
// }
// guard.time = new_time;
if QueryQueueStatus::Available == guard.get_queue_status()
&& !queue.contains(&task_id.query_id)
{
queue.push_back(task_id.query_id);
}
guard.time = new_time;
}
}

Expand All @@ -159,9 +168,9 @@ mod tests {
use std::{fs, time::{Duration, SystemTime}};
use tokio::{sync::Notify, time::sleep};

use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph};
use crate::queue::State;
use crate::task::TaskStatus;
use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph};
use std::{cmp::min, sync::Arc};

// Deprecated, use test_queue_conc instead
Expand Down Expand Up @@ -290,9 +299,7 @@ mod tests {
// Add a bunch of queries with staggered submission time
let start_enqueue = SystemTime::now();
for plan in long_plans {
queue
.add_query(Arc::clone(&plan))
.await;
queue.add_query(Arc::clone(&plan)).await;
sleep(Duration::from_millis(10)).await;
}
let enq_time = SystemTime::now().duration_since(start_enqueue).unwrap();
Expand Down

0 comments on commit 8936b1e

Please sign in to comment.