Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(index): add explicit adapter between RangeReader and AsyncRead #4724

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions src/common/base/src/range_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,46 @@ pub trait RangeReader: Send + Unpin {
}
}

/// Implement `RangeReader` for a type that implements `AsyncRead + AsyncSeek`.
#[async_trait]
impl<R: RangeReader + Send + Unpin> RangeReader for &mut R {
async fn metadata(&mut self) -> io::Result<Metadata> {
(*self).metadata().await
}
async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
(*self).read(range).await
}
async fn read_into(
&mut self,
range: Range<u64>,
buf: &mut (impl BufMut + Send),
) -> io::Result<()> {
(*self).read_into(range, buf).await
}
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
(*self).read_vec(ranges).await
}
}

/// `RangeReaderAdapter` bridges `RangeReader` and `AsyncRead + AsyncSeek`.
pub struct RangeReaderAdapter<R>(pub R);

/// Implements `RangeReader` for a type that implements `AsyncRead + AsyncSeek`.
///
/// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`.
/// Until the codebase is fully ported to `RangeReader`, remove this implementation.
#[async_trait]
impl<R: futures::AsyncRead + futures::AsyncSeek + Send + Unpin> RangeReader for R {
impl<R: futures::AsyncRead + futures::AsyncSeek + Send + Unpin> RangeReader
for RangeReaderAdapter<R>
{
async fn metadata(&mut self) -> io::Result<Metadata> {
let content_length = self.seek(io::SeekFrom::End(0)).await?;
let content_length = self.0.seek(io::SeekFrom::End(0)).await?;
Ok(Metadata { content_length })
}

async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
let mut buf = vec![0; (range.end - range.start) as usize];
self.seek(io::SeekFrom::Start(range.start)).await?;
self.read_exact(&mut buf).await?;
self.0.seek(io::SeekFrom::Start(range.start)).await?;
self.0.read_exact(&mut buf).await?;
Ok(Bytes::from(buf))
}
}
10 changes: 7 additions & 3 deletions src/index/src/inverted_index/format/reader/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
#[cfg(test)]
mod tests {
use common_base::bit_vec::prelude::*;
use common_base::range_read::RangeReaderAdapter;
use fst::MapBuilder;
use futures::io::Cursor;
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
Expand Down Expand Up @@ -162,7 +163,8 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_metadata() {
let blob = create_inverted_index_blob();
let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob));
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);

let metas = blob_reader.metadata().await.unwrap();
assert_eq!(metas.metas.len(), 2);
Expand All @@ -189,7 +191,8 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_fst() {
let blob = create_inverted_index_blob();
let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob));
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);

let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
Expand Down Expand Up @@ -221,7 +224,8 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_bitmap() {
let blob = create_inverted_index_blob();
let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob));
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);

let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
Expand Down
23 changes: 12 additions & 11 deletions src/index/src/inverted_index/format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ use crate::inverted_index::error::{
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;

/// InvertedIndeFooterReader is for reading the footer section of the blob.
pub struct InvertedIndeFooterReader<'a, R> {
source: &'a mut R,
pub struct InvertedIndeFooterReader<R> {
source: R,
blob_size: u64,
}

impl<'a, R> InvertedIndeFooterReader<'a, R> {
pub fn new(source: &'a mut R, blob_size: u64) -> Self {
impl<R> InvertedIndeFooterReader<R> {
pub fn new(source: R, blob_size: u64) -> Self {
Self { source, blob_size }
}
}

impl<'a, R: RangeReader> InvertedIndeFooterReader<'a, R> {
impl<R: RangeReader> InvertedIndeFooterReader<R> {
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
let payload_size = self.read_payload_size().await?;
let metas = self.read_payload(payload_size).await?;
Expand Down Expand Up @@ -113,6 +113,7 @@ impl<'a, R: RangeReader> InvertedIndeFooterReader<'a, R> {

#[cfg(test)]
mod tests {
use common_base::range_read::RangeReaderAdapter;
use futures::io::Cursor;
use prost::Message;

Expand Down Expand Up @@ -142,8 +143,8 @@ mod tests {

let payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let mut cursor = Cursor::new(payload_buf);
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let metas = reader.read_payload(payload_size).await.unwrap();
Expand All @@ -163,8 +164,8 @@ mod tests {
let mut payload_buf = create_test_payload(meta);
payload_buf.push(0xff); // Add an extra byte to corrupt the footer
let blob_size = payload_buf.len() as u64;
let mut cursor = Cursor::new(payload_buf);
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);

let payload_size_result = reader.read_payload_size().await;
assert!(payload_size_result.is_err());
Expand All @@ -181,8 +182,8 @@ mod tests {

let payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let mut cursor = Cursor::new(payload_buf);
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let payload_result = reader.read_payload(payload_size).await;
Expand Down
5 changes: 3 additions & 2 deletions src/index/src/inverted_index/format/writer/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {

#[cfg(test)]
mod tests {
use common_base::range_read::RangeReaderAdapter;
use futures::io::Cursor;
use futures::stream;

Expand All @@ -119,7 +120,7 @@ mod tests {
.await
.unwrap();

let cursor = Cursor::new(blob);
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut reader = InvertedIndexBlobReader::new(cursor);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
Expand Down Expand Up @@ -160,7 +161,7 @@ mod tests {
.await
.unwrap();

let cursor = Cursor::new(blob);
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut reader = InvertedIndexBlobReader::new(cursor);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/sst/index/inverted_index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod builder;

use std::sync::Arc;

use common_base::range_read::RangeReaderAdapter;
use common_telemetry::warn;
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
Expand Down Expand Up @@ -108,6 +109,7 @@ impl InvertedIndexApplier {
self.remote_blob_reader(file_id).await?
}
};
let blob = RangeReaderAdapter(blob);

if let Some(index_cache) = &self.inverted_index_cache {
let mut index_reader = CachedInvertedIndexBlobReader::new(
Expand Down