Skip to content

Commit

Permalink
return 404 if requested block is lower than dataset start block
Browse files Browse the repository at this point in the history
  • Loading branch information
tmcgroul committed Sep 13, 2024
1 parent 6a11ba4 commit f0880c0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 11 deletions.
24 changes: 15 additions & 9 deletions crates/router-controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub struct Controller {
workers_rate: WorkersRate,
request_cache: RequestCache,
datasets_height: HashMap<Dataset, AtomicI64>,
managed_datasets: HashMap<String, Dataset>,
managed_datasets: HashMap<String, (Dataset, Option<u32>)>,
managed_workers: HashSet<WorkerId>,
data_replication: usize,
data_management_unit: usize
Expand All @@ -84,13 +84,19 @@ unsafe impl Sync for Controller {}
impl Controller {
#[tracing::instrument(level="debug", skip(self))]
pub fn get_worker(&self, dataset_name: &str, first_block: u32) -> Result<Option<Url>, String> {
let dataset = match self.managed_datasets.get(dataset_name) {
let (dataset, start_block) = match self.managed_datasets.get(dataset_name) {
Some(ds) => ds,
None => {
return Err("unknown dataset".to_string())
}
};

if let Some(start_block) = start_block {
if first_block < *start_block {
return Err(format!("{} dataset starts from {}", dataset_name, first_block))
}
}

let now = SystemTime::now();

let select_candidate = |w: &Worker| {
Expand Down Expand Up @@ -176,7 +182,7 @@ impl Controller {
}

pub fn get_height(&self, dataset_name: &str) -> Result<Option<i32>, String> {
let dataset = match self.managed_datasets.get(dataset_name) {
let (dataset, _) = match self.managed_datasets.get(dataset_name) {
Some(ds) => ds,
None => return Err("unknown dataset".to_string())
};
Expand Down Expand Up @@ -468,7 +474,7 @@ impl Controller {


pub struct ControllerBuilder {
managed_datasets: HashMap<String, Dataset>,
managed_datasets: HashMap<String, (Dataset, Option<u32>)>,
managed_workers: HashSet<WorkerId>,
replication: usize,
data_management_unit: usize,
Expand Down Expand Up @@ -508,13 +514,13 @@ impl ControllerBuilder {
self
}

pub fn add_dataset(&mut self, name: String, dataset: Dataset) -> &mut Self {
pub fn add_dataset(&mut self, name: String, dataset: (Dataset, Option<u32>)) -> &mut Self {
self.managed_datasets.insert(name, dataset);
self
}

pub fn set_datasets<I>(&mut self, datasets: I) -> &mut Self
where I: IntoIterator<Item = (String, Dataset)>
where I: IntoIterator<Item = (String, (Dataset, Option<u32>))>
{
self.managed_datasets.clear();
self.managed_datasets.extend(datasets);
Expand All @@ -525,12 +531,12 @@ impl ControllerBuilder {
Controller {
schedule: parking_lot::Mutex::new(Schedule {
datasets: self.managed_datasets.iter()
.map(|(_name, ds)| (ds.clone(), Vec::new()))
.map(|(_name, (ds, _))| (ds.clone(), Vec::new()))
.collect(),
assignment: HashMap::new()
}),
datasets_height: self.managed_datasets.values()
.map(|name| (name.clone(), AtomicI64::new(INITIAL_VALUE)))
.map(|(name, _)| (name.clone(), AtomicI64::new(INITIAL_VALUE)))
.collect(),
workers: Atom::new(Arc::new(Vec::new())),
workers_rate: WorkersRate::new(),
Expand Down Expand Up @@ -558,7 +564,7 @@ mod tests {
.set_data_management_unit(1)
.set_data_replication(2)
.set_workers((0..8).map(|i| i.to_string()))
.set_datasets((0..2).map(|i| (i.to_string(), i.to_string())))
.set_datasets((0..2).map(|i| (i.to_string(), (i.to_string(), None))))
.build();

let chunks = vec![
Expand Down
4 changes: 2 additions & 2 deletions crates/router/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ impl Dataset {
}
}

impl From<&Dataset> for (String, String) {
impl From<&Dataset> for (String, (String, Option<u32>)) {
fn from(value: &Dataset) -> Self {
(value.name.clone(), value.url.clone())
(value.name.clone(), (value.url.clone(), value.start_block))
}
}

0 comments on commit f0880c0

Please sign in to comment.