Skip to content

Commit

Permalink
feat(index): add explicit adapter between RangeReader and `AsyncRea…
Browse files Browse the repository at this point in the history
…d` (#4724)

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Sep 18, 2024
1 parent d1dfffc commit 3b5b906
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 21 deletions.
35 changes: 30 additions & 5 deletions src/common/base/src/range_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,46 @@ pub trait RangeReader: Send + Unpin {
}
}

/// Implement `RangeReader` for a type that implements `AsyncRead + AsyncSeek`.
#[async_trait]
impl<R: RangeReader + Send + Unpin> RangeReader for &mut R {
async fn metadata(&mut self) -> io::Result<Metadata> {
(*self).metadata().await
}
async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
(*self).read(range).await
}
async fn read_into(
&mut self,
range: Range<u64>,
buf: &mut (impl BufMut + Send),
) -> io::Result<()> {
(*self).read_into(range, buf).await
}
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
(*self).read_vec(ranges).await
}
}

/// `RangeReaderAdapter` bridges `RangeReader` and `AsyncRead + AsyncSeek`.
pub struct RangeReaderAdapter<R>(pub R);

/// Implements `RangeReader` for a type that implements `AsyncRead + AsyncSeek`.
///
/// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`.
/// Until the codebase is fully ported to `RangeReader`, remove this implementation.
#[async_trait]
impl<R: futures::AsyncRead + futures::AsyncSeek + Send + Unpin> RangeReader for R {
impl<R: futures::AsyncRead + futures::AsyncSeek + Send + Unpin> RangeReader
for RangeReaderAdapter<R>
{
async fn metadata(&mut self) -> io::Result<Metadata> {
let content_length = self.seek(io::SeekFrom::End(0)).await?;
let content_length = self.0.seek(io::SeekFrom::End(0)).await?;
Ok(Metadata { content_length })
}

async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
let mut buf = vec![0; (range.end - range.start) as usize];
self.seek(io::SeekFrom::Start(range.start)).await?;
self.read_exact(&mut buf).await?;
self.0.seek(io::SeekFrom::Start(range.start)).await?;
self.0.read_exact(&mut buf).await?;
Ok(Bytes::from(buf))
}
}
10 changes: 7 additions & 3 deletions src/index/src/inverted_index/format/reader/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
#[cfg(test)]
mod tests {
use common_base::bit_vec::prelude::*;
use common_base::range_read::RangeReaderAdapter;
use fst::MapBuilder;
use futures::io::Cursor;
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
Expand Down Expand Up @@ -162,7 +163,8 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_metadata() {
let blob = create_inverted_index_blob();
let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob));
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);

let metas = blob_reader.metadata().await.unwrap();
assert_eq!(metas.metas.len(), 2);
Expand All @@ -189,7 +191,8 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_fst() {
let blob = create_inverted_index_blob();
let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob));
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);

let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
Expand Down Expand Up @@ -221,7 +224,8 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_bitmap() {
let blob = create_inverted_index_blob();
let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob));
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);

let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
Expand Down
23 changes: 12 additions & 11 deletions src/index/src/inverted_index/format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ use crate::inverted_index::error::{
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;

/// InvertedIndeFooterReader is for reading the footer section of the blob.
pub struct InvertedIndeFooterReader<'a, R> {
source: &'a mut R,
pub struct InvertedIndeFooterReader<R> {
source: R,
blob_size: u64,
}

impl<'a, R> InvertedIndeFooterReader<'a, R> {
pub fn new(source: &'a mut R, blob_size: u64) -> Self {
impl<R> InvertedIndeFooterReader<R> {
pub fn new(source: R, blob_size: u64) -> Self {
Self { source, blob_size }
}
}

impl<'a, R: RangeReader> InvertedIndeFooterReader<'a, R> {
impl<R: RangeReader> InvertedIndeFooterReader<R> {
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
let payload_size = self.read_payload_size().await?;
let metas = self.read_payload(payload_size).await?;
Expand Down Expand Up @@ -113,6 +113,7 @@ impl<'a, R: RangeReader> InvertedIndeFooterReader<'a, R> {

#[cfg(test)]
mod tests {
use common_base::range_read::RangeReaderAdapter;
use futures::io::Cursor;
use prost::Message;

Expand Down Expand Up @@ -142,8 +143,8 @@ mod tests {

let payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let mut cursor = Cursor::new(payload_buf);
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let metas = reader.read_payload(payload_size).await.unwrap();
Expand All @@ -163,8 +164,8 @@ mod tests {
let mut payload_buf = create_test_payload(meta);
payload_buf.push(0xff); // Add an extra byte to corrupt the footer
let blob_size = payload_buf.len() as u64;
let mut cursor = Cursor::new(payload_buf);
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);

let payload_size_result = reader.read_payload_size().await;
assert!(payload_size_result.is_err());
Expand All @@ -181,8 +182,8 @@ mod tests {

let payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let mut cursor = Cursor::new(payload_buf);
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let payload_result = reader.read_payload(payload_size).await;
Expand Down
5 changes: 3 additions & 2 deletions src/index/src/inverted_index/format/writer/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {

#[cfg(test)]
mod tests {
use common_base::range_read::RangeReaderAdapter;
use futures::io::Cursor;
use futures::stream;

Expand All @@ -119,7 +120,7 @@ mod tests {
.await
.unwrap();

let cursor = Cursor::new(blob);
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut reader = InvertedIndexBlobReader::new(cursor);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
Expand Down Expand Up @@ -160,7 +161,7 @@ mod tests {
.await
.unwrap();

let cursor = Cursor::new(blob);
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut reader = InvertedIndexBlobReader::new(cursor);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/sst/index/inverted_index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod builder;

use std::sync::Arc;

use common_base::range_read::RangeReaderAdapter;
use common_telemetry::warn;
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
Expand Down Expand Up @@ -108,6 +109,7 @@ impl InvertedIndexApplier {
self.remote_blob_reader(file_id).await?
}
};
let blob = RangeReaderAdapter(blob);

if let Some(index_cache) = &self.inverted_index_cache {
let mut index_reader = CachedInvertedIndexBlobReader::new(
Expand Down

0 comments on commit 3b5b906

Please sign in to comment.