diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index d868823df7..89723f6b47 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -27,7 +27,7 @@ use chrono::{DateTime, Utc}; use crc::{Crc, CRC_32_ISCSI}; use futures::stream::BoxStream; use hash_ext::SeaHasherBuilder; -use log::{debug, error, info, warn}; +use log::{debug, info, warn}; use lru::LruCache; use notifier::notifier::{ExecutionGuard, RequestNotifiers}; use partitioned_lock::PartitionedMutex; @@ -167,36 +167,83 @@ impl Manifest { } } -/// The encoder of the page file in the disk cache. +/// The writer of the page file in the disk cache. /// /// Following the payload, a footer [`PageFileEncoder::MAGIC_FOOTER`] is /// appended. -struct PageFileEncoder { - payload: Bytes, +struct PageFileWriter { + output: String, + tmp_file: String, + need_clean_tmpfile: bool, } -impl PageFileEncoder { +impl Drop for PageFileWriter { + fn drop(&mut self) { + if self.need_clean_tmpfile { + if let Err(e) = std::fs::remove_file(&self.tmp_file) { + warn!( + "Disk cache remove page tmp file failed, file:{}, err:{e}", + &self.tmp_file + ); + } + } + } +} + +impl PageFileWriter { const MAGIC_FOOTER: [u8; 8] = [0, 0, 0, 0, b'c', b'e', b'r', b'e']; - async fn encode_and_persist(&self, writer: &mut W, name: &str) -> Result<()> - where - W: AsyncWrite + std::marker::Unpin, - { + fn new(output: String) -> Self { + let tmp_file = Self::tmp_file(&output); + + Self { + output, + tmp_file, + need_clean_tmpfile: true, + } + } + + fn tmp_file(input: &str) -> String { + format!("{}.tmp", input) + } + + async fn write_inner(&self, bytes: Bytes) -> Result<()> { + let tmp_file = &self.tmp_file; + let mut writer = File::create(tmp_file) + .await + .context(Io { file: tmp_file })?; writer - .write_all(&self.payload[..]) + .write_all(&bytes) .await - .context(Io { file: name })?; + .context(Io { file: tmp_file })?; writer .write_all(&Self::MAGIC_FOOTER) .await - .context(Io { file: name })?; + .context(Io { file: tmp_file })?; + + writer.flush().await.context(Io { file: tmp_file })?; - writer.flush().await.context(Io { file: name })?; + tokio::fs::rename(tmp_file, &self.output) + .await + .context(Io { file: &self.output })?; Ok(()) } + // When write bytes to file, the cache lock is released, so when one thread is + // reading, another thread may update it, so we write to tmp file first, + // then rename to expected filename to avoid other threads see partial + // content. + async fn write_and_flush(mut self, bytes: Bytes) -> Result<()> { + let write_result = self.write_inner(bytes).await; + if write_result.is_ok() { + self.need_clean_tmpfile = false; + } + + write_result + } + #[inline] fn encoded_size(payload_len: usize) -> usize { payload_len + Self::MAGIC_FOOTER.len() @@ -262,7 +309,7 @@ impl DiskCache { async fn insert_data(&self, filename: String, value: Bytes) { let page_meta = { - let file_size = PageFileEncoder::encoded_size(value.len()); + let file_size = PageFileWriter::encoded_size(value.len()); PageMeta { file_size } }; let evicted_file = self.insert_page_meta(filename.clone(), page_meta); @@ -357,18 +404,16 @@ impl DiskCache { } async fn persist_bytes(&self, filename: &str, payload: Bytes) -> Result<()> { - let file_path = std::path::Path::new(&self.root_dir) + let dest_filepath = std::path::Path::new(&self.root_dir) .join(filename) .into_os_string() .into_string() .unwrap(); - let mut file = File::create(&file_path) - .await - .context(Io { file: &file_path })?; + let writer = PageFileWriter::new(dest_filepath); + writer.write_and_flush(payload).await?; - let encoding = PageFileEncoder { payload }; - encoding.encode_and_persist(&mut file, filename).await + Ok(()) } /// Read the bytes from the cached file. @@ -381,7 +426,7 @@ impl DiskCache { range: &Range, expect_file_size: usize, ) -> std::io::Result { - if PageFileEncoder::encoded_size(range.len()) > expect_file_size { + if PageFileWriter::encoded_size(range.len()) > expect_file_size { return Ok(ReadBytesResult::OutOfRange); } @@ -681,7 +726,7 @@ impl DiskCacheStore { } .fail(), ) { - error!("Failed to send notifier error result, err:{e:?}."); + warn!("Failed to send notifier error result, err:{e:?}."); } } } @@ -698,8 +743,10 @@ impl DiskCacheStore { { self.cache.insert_data(cache_key, bytes.clone()).await; for notifier in notifiers { - if let Err(e) = notifier.send(Ok(bytes.clone())) { - error!("Failed to send notifier success result, err:{e:?}."); + if notifier.send(Ok(bytes.clone())).is_err() { + // The error contains sent bytes, which maybe very large, + // so we don't log error. + warn!("Failed to send notifier success result"); } } } @@ -940,6 +987,7 @@ mod test { use upstream::local::LocalFileSystem; use super::*; + use crate::test_util::MemoryStore; struct StoreWithCacheDir { inner: DiskCacheStore, @@ -951,8 +999,7 @@ mod test { cap: usize, partition_bits: usize, ) -> StoreWithCacheDir { - let local_path = tempdir().unwrap(); - let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + let local_store = Arc::new(MemoryStore::default()); let cache_dir = tempdir().unwrap(); let store = DiskCacheStore::try_new( diff --git a/components/object_store/src/lib.rs b/components/object_store/src/lib.rs index 46bf5e50bf..d3114846a5 100644 --- a/components/object_store/src/lib.rs +++ b/components/object_store/src/lib.rs @@ -30,5 +30,7 @@ pub mod multipart; pub mod obkv; pub mod prefix; pub mod s3; +#[cfg(test)] +pub mod test_util; pub type ObjectStoreRef = Arc; diff --git a/components/object_store/src/test_util.rs b/components/object_store/src/test_util.rs new file mode 100644 index 0000000000..1f0eed3dbb --- /dev/null +++ b/components/object_store/src/test_util.rs @@ -0,0 +1,172 @@ +// Copyright 2023 The CeresDB Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::HashMap, fmt::Display, ops::Range, sync::RwLock}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream}; +use tokio::io::AsyncWrite; +use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result}; + +#[derive(Debug)] +struct StoreError { + path: Path, + msg: String, +} + +impl Display for StoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StoreError") + .field("path", &self.path) + .field("msg", &self.msg) + .finish() + } +} + +impl std::error::Error for StoreError {} + +/// A memory based object store implementation, mainly used for testing. +#[derive(Debug, Default)] +pub struct MemoryStore { + files: RwLock>, + get_range_counts: RwLock>, +} + +impl Display for MemoryStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemoryStore") + .field("counts", &self.get_counts()) + .finish() + } +} + +impl MemoryStore { + pub fn get_counts(&self) -> HashMap { + let counts = self.get_range_counts.read().unwrap(); + counts.clone().into_iter().collect() + } +} + +#[async_trait] +impl ObjectStore for MemoryStore { + async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + let mut files = self.files.write().unwrap(); + files.insert(location.clone(), bytes); + Ok(()) + } + + async fn get(&self, location: &Path) -> Result { + let files = self.files.read().unwrap(); + if let Some(bs) = files.get(location) { + let bs = bs.clone(); + Ok(GetResult::Stream(Box::pin(stream::once( + async move { Ok(bs) }, + )))) + } else { + let source = Box::new(StoreError { + msg: "not found".to_string(), + path: location.clone(), + }); + Err(upstream::Error::Generic { + store: "get", + source, + }) + } + } + + async fn get_range(&self, location: &Path, range: Range) -> Result { + { + let mut counts = self.get_range_counts.write().unwrap(); + counts + .entry(location.clone()) + .and_modify(|c| *c += 1) + .or_insert(1); + } + + let files = self.files.read().unwrap(); + if let Some(bs) = files.get(location) { + Ok(bs.slice(range)) + } else { + let source = Box::new(StoreError { + msg: "not found".to_string(), + path: location.clone(), + }); + Err(upstream::Error::Generic { + store: "get_range", + source, + }) + } + } + + async fn head(&self, location: &Path) -> Result { + let files = self.files.read().unwrap(); + + if let Some(bs) = files.get(location) { + Ok(ObjectMeta { + location: location.clone(), + size: bs.len(), + last_modified: Default::default(), + }) + } else { + let source = Box::new(StoreError { + msg: "not found".to_string(), + path: location.clone(), + }); + Err(upstream::Error::Generic { + store: "head", + source, + }) + } + } + + async fn put_multipart( + &self, + _location: &Path, + ) -> Result<(MultipartId, Box)> { + unimplemented!() + } + + async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { + unimplemented!() + } + + async fn delete(&self, _location: &Path) -> Result<()> { + unimplemented!() + } + + async fn list(&self, _prefix: Option<&Path>) -> Result>> { + unimplemented!() + } + + async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result { + unimplemented!() + } + + async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> { + unimplemented!() + } + + async fn rename(&self, _from: &Path, _to: &Path) -> Result<()> { + unimplemented!() + } + + async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { + unimplemented!() + } + + async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { + unimplemented!() + } +}