Skip to content

Commit

Permalink
Use chunk_id from query
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed Nov 14, 2024
1 parent 635431f commit 10c226e
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 123 deletions.
3 changes: 2 additions & 1 deletion src/controller/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,10 @@ impl<EventStream: Stream<Item = WorkerEvent>> P2PController<EventStream> {
let result = self
.worker
.run_query(
query.query.clone(),
&query.query,
query.dataset.clone(),
block_range,
&query.chunk_id,
Some(peer_id),
)
.await;
Expand Down
31 changes: 15 additions & 16 deletions src/controller/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ impl Worker {

pub async fn run_query(
&self,
query_str: String,
query_str: &str,
dataset: Dataset,
block_range: Option<(u64, u64)>,
chunk_id: &str,
client_id: Option<PeerId>,
) -> QueryResult {
let before = self.queries_running.fetch_add(1, Ordering::SeqCst);
Expand All @@ -88,13 +89,15 @@ impl Worker {
if before >= self.max_parallel_queries {
return Err(QueryError::ServiceOverloaded);
}

tracing::debug!(
"Running query from {}",
client_id
.map(|id| id.to_string())
.unwrap_or("{unknown}".to_string())
);
self.execute_query(query_str, dataset, block_range).await
self.execute_query(query_str, dataset, block_range, chunk_id)
.await
}

pub async fn run(&self, cancellation_token: CancellationToken) {
Expand All @@ -105,28 +108,24 @@ impl Worker {

async fn execute_query(
&self,
query_str: String,
dataset: String,
query_str: &str,
dataset: Dataset,
block_range: Option<(u64, u64)>,
chunk_id: &str,
) -> QueryResult {
let Ok(chunk) = chunk_id.parse() else {
return Err(QueryError::BadRequest(format!(
"Can't parse chunk id '{chunk_id}'"
)));
};
let mut query = sqd_query::Query::from_json_bytes(query_str.as_bytes())
.map_err(|e| QueryError::BadRequest(format!("Couldn't parse query: {e:?}")))?;
if let Some((from_block, to_block)) = block_range {
query.set_first_block(Some(from_block));
query.set_last_block(Some(to_block));
}

// First block may be either set by the `block_range` arg or defined in the query.
let Some(first_block) = query.first_block() else {
return Err(QueryError::BadRequest(
"Query without first_block".to_owned(),
));
};
let chunk_guard = self
.state_manager
.clone()
.find_chunk(&dataset, first_block.into())?;
if chunk_guard.is_none() {
let Some(chunk_guard) = self.state_manager.clone().get_chunk(dataset, chunk) else {
return Err(QueryError::NotFound);
};

Expand All @@ -135,7 +134,7 @@ impl Worker {
let result = (move || {
let start_time = std::time::Instant::now();

let chunk = ParquetChunk::new(chunk_guard.as_ref().unwrap().as_str());
let chunk = ParquetChunk::new(chunk_guard.as_str());
let plan = query.compile();
let data = Vec::with_capacity(1024 * 1024);
let mut writer = sqd_query::JsonLinesWriter::new(data);
Expand Down
5 changes: 4 additions & 1 deletion src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ async fn run_query(
Path(dataset): Path<Dataset>,
query_str: String,
) -> Response {
let result = worker.run_query(query_str, dataset, None, None).await;
// TODO: remove HTTP transport
let result = worker
.run_query(&query_str, dataset, None, "<unimplemented>", None)
.await;
metrics::query_executed(&result);
result.map(|result| result.data).into_response()
}
Expand Down
25 changes: 11 additions & 14 deletions src/storage/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use tracing::{debug, info, instrument, warn};
use crate::{
metrics,
types::{
dataset,
dataset::{self, Dataset},
state::{to_ranges, ChunkRef, ChunkSet, Ranges},
},
};

use super::{
datasets_index::DatasetsIndex,
downloader::ChunkDownloader,
layout::{self, BlockNumber, DataChunk},
layout::{self, DataChunk},
local_fs::{add_temp_prefix, LocalFs},
state::{State, UpdateStatus},
Filesystem,
Expand Down Expand Up @@ -162,22 +162,19 @@ impl StateManager {
}
}

pub fn find_chunk(
pub fn get_chunk(
self: Arc<Self>,
encoded_dataset: &str,
block_number: BlockNumber,
) -> Result<scopeguard::ScopeGuard<Option<PathBuf>, impl FnOnce(Option<PathBuf>)>> {
let dataset = dataset::decode_dataset(encoded_dataset)
.with_context(|| format!("Couldn't decode dataset: {encoded_dataset}"))?;
dataset: Dataset,
chunk: DataChunk,
) -> Option<scopeguard::ScopeGuard<PathBuf, impl FnOnce(PathBuf)>> {
let encoded_dataset = dataset::encode_dataset(&dataset);
let chunk = self
.state
.lock()
.find_and_lock_chunk(Arc::new(dataset), block_number);
let path = chunk
.as_ref()
.map(|chunk| self.fs.root.join(encoded_dataset).join(chunk.chunk.path()));
let guard = scopeguard::guard(path, move |_| self.state.lock().release_chunks(chunk));
Ok(guard)
.get_and_lock_chunk(Arc::new(dataset), chunk)?;
let path = self.fs.root.join(encoded_dataset).join(chunk.chunk.path());
let guard = scopeguard::guard(path, move |_| self.state.lock().unlock_chunk(&chunk));
Some(guard)
}

#[instrument(err, skip(self))]
Expand Down
100 changes: 9 additions & 91 deletions src/storage/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use itertools::Itertools;
use std::{collections::BTreeMap, sync::Arc};
use tracing::{info, instrument};

use super::layout::{BlockNumber, DataChunk};
use super::layout::DataChunk;
use crate::{
metrics,
types::{
Expand Down Expand Up @@ -120,29 +120,18 @@ impl State {
}
}

pub fn find_and_lock_chunk(
pub fn get_and_lock_chunk(
&mut self,
dataset: Arc<Dataset>,
block_number: BlockNumber,
chunk: DataChunk,
) -> Option<ChunkRef> {
let from = ChunkRef {
dataset: dataset.clone(),
chunk: DataChunk {
last_block: block_number,
first_block: BlockNumber::from(0),
..Default::default()
},
};
let chunk_ref = self.available.range(from..).next()?.clone();
let chunk_ref = self.available.get(&ChunkRef { dataset, chunk }).cloned();

if chunk_ref.dataset != dataset || chunk_ref.chunk.first_block > block_number {
return None;
if let Some(chunk_ref) = chunk_ref.as_ref() {
self.lock_chunk(chunk_ref);
}
assert!(chunk_ref.chunk.last_block >= block_number);

self.lock_chunk(&chunk_ref);

Some(chunk_ref)
chunk_ref
}

pub fn release_chunks(&mut self, chunks: impl IntoIterator<Item = ChunkRef>) {
Expand All @@ -159,7 +148,7 @@ impl State {
}
}

fn unlock_chunk(&mut self, chunk: &ChunkRef) {
pub fn unlock_chunk(&mut self, chunk: &ChunkRef) {
let remove = self
.locks
.get_mut(chunk)
Expand Down Expand Up @@ -200,10 +189,7 @@ mod tests {

use itertools::Itertools;

use crate::{
storage::layout::{BlockNumber, DataChunk},
types::state::ChunkRef,
};
use crate::{storage::layout::DataChunk, types::state::ChunkRef};

use super::State;

Expand Down Expand Up @@ -246,72 +232,4 @@ mod tests {
);
assert_eq!(state.status().downloading.into_iter().collect_vec(), &[]);
}

#[test]
fn test_data_chunk_comparison() {
// Chunks lookup depends on sorting by last_block
assert!(
DataChunk {
first_block: 1.into(),
last_block: 2.into(),
..Default::default()
} < DataChunk {
first_block: 0.into(),
last_block: 3.into(),
..Default::default()
}
)
}

#[test]
fn test_search() {
let ds0 = Arc::new("ds0".to_owned());
let ds1 = Arc::new("ds1".to_owned());
let chunk_ref = |ds: &Arc<String>, path| ChunkRef {
dataset: ds.clone(),
chunk: DataChunk::from_path(path).unwrap(),
};
let a = chunk_ref(&ds0, "0000000000/0000000000-0000000009-00000000");
let b = chunk_ref(&ds0, "0000000000/0000000010-0000000019-00000000");
let c = chunk_ref(&ds0, "0000000000/0000000100-0000000109-00000000");
let d = chunk_ref(&ds1, "0000000000/0000000000-0000000009-00000000");

let mut state = State::new(
[a.clone(), b.clone(), c.clone(), d.clone()]
.into_iter()
.collect(),
);
assert_eq!(
state.find_and_lock_chunk(ds0.clone(), BlockNumber::from(0)),
Some(a.clone())
);
assert_eq!(
state.find_and_lock_chunk(ds0.clone(), BlockNumber::from(8)),
Some(a.clone())
);
assert_eq!(
state.find_and_lock_chunk(ds0.clone(), BlockNumber::from(9)),
Some(a.clone())
);
assert_eq!(
state.find_and_lock_chunk(ds0.clone(), BlockNumber::from(10)),
Some(b.clone())
);
assert_eq!(
state.find_and_lock_chunk(ds0.clone(), BlockNumber::from(19)),
Some(b.clone())
);
assert_eq!(
state.find_and_lock_chunk(ds0.clone(), BlockNumber::from(99)),
None
);
assert_eq!(
state.find_and_lock_chunk(ds0.clone(), BlockNumber::from(100)),
Some(c.clone())
);
assert_eq!(
state.find_and_lock_chunk(ds0.clone(), BlockNumber::from(110)),
None
);
}
}

0 comments on commit 10c226e

Please sign in to comment.