Skip to content

Commit

Permalink
fix: disk cache deduped get_ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Sep 15, 2023
1 parent c3d17bf commit 8d5d761
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 2 deletions.
5 changes: 3 additions & 2 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,7 @@ mod test {
use upstream::local::LocalFileSystem;

use super::*;
use crate::test_util::MemoryStore;

struct StoreWithCacheDir {
inner: DiskCacheStore,
Expand All @@ -951,8 +952,7 @@ mod test {
cap: usize,
partition_bits: usize,
) -> StoreWithCacheDir {
let local_path = tempdir().unwrap();
let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
let local_store = Arc::new(MemoryStore::default());

let cache_dir = tempdir().unwrap();
let store = DiskCacheStore::try_new(
Expand Down Expand Up @@ -1103,6 +1103,7 @@ mod test {
}

let actual = futures::future::join_all(tasks).await;
println!("get_counts, {}", store.inner.underlying_store);
for (actual, (_, expected)) in actual.into_iter().zip(testcases.into_iter()) {
assert_eq!(actual.unwrap(), Bytes::from(expected))
}
Expand Down
2 changes: 2 additions & 0 deletions components/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ pub mod multipart;
pub mod obkv;
pub mod prefix;
pub mod s3;
#[cfg(test)]
pub mod test_util;

pub type ObjectStoreRef = Arc<dyn ObjectStore>;
172 changes: 172 additions & 0 deletions components/object_store/src/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, fmt::Display, ops::Range, sync::RwLock};

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::{self, BoxStream};
use tokio::io::AsyncWrite;
use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};

#[derive(Debug)]
struct StoreError {
path: Path,
msg: String,
}

impl Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StoreError")
.field("path", &self.path)
.field("msg", &self.msg)
.finish()
}
}

impl std::error::Error for StoreError {}

/// A memory based object store implementation, mainly used for testing.
#[derive(Debug, Default)]
pub struct MemoryStore {
files: RwLock<HashMap<Path, Bytes>>,
get_range_counts: RwLock<HashMap<Path, usize>>,
}

impl Display for MemoryStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemoryStore")
.field("counts", &self.get_counts())
.finish()
}
}

impl MemoryStore {
pub fn get_counts(&self) -> HashMap<Path, usize> {
let counts = self.get_range_counts.read().unwrap();
counts.clone().into_iter().collect()
}
}

#[async_trait]
impl ObjectStore for MemoryStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let mut files = self.files.write().unwrap();
files.insert(location.clone(), bytes);
Ok(())
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let files = self.files.read().unwrap();
if let Some(bs) = files.get(location) {
let bs = bs.clone();
Ok(GetResult::Stream(Box::pin(stream::once(
async move { Ok(bs) },
))))
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "get",
source,
})
}
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
{
let mut counts = self.get_range_counts.write().unwrap();
counts
.entry(location.clone())
.and_modify(|c| *c += 1)
.or_insert(1);
}

let files = self.files.read().unwrap();
if let Some(bs) = files.get(location) {
Ok(bs.slice(range))
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "get_range",
source,
})
}
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let files = self.files.read().unwrap();

if let Some(bs) = files.get(location) {
Ok(ObjectMeta {
location: location.clone(),
size: bs.len(),
last_modified: Default::default(),
})
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "head",
source,
})
}
}

async fn put_multipart(
&self,
_location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
unimplemented!()
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
unimplemented!()
}

async fn delete(&self, _location: &Path) -> Result<()> {
unimplemented!()
}

async fn list(&self, _prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
unimplemented!()
}

async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result<ListResult> {
unimplemented!()
}

async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn rename(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}
}

0 comments on commit 8d5d761

Please sign in to comment.