Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(index): add RangeReader trait #4718

Merged
merged 4 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/common/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ workspace = true

[dependencies]
anymap = "1.0.0-beta.2"
async-trait.workspace = true
bitvec = "1.0"
bytes.workspace = true
common-error.workspace = true
common-macro.workspace = true
futures.workspace = true
paste = "1.0"
serde = { version = "1.0", features = ["derive"] }
snafu.workspace = true
Expand Down
6 changes: 6 additions & 0 deletions src/common/base/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ impl From<Vec<u8>> for Bytes {
}
}

impl From<Bytes> for Vec<u8> {
fn from(bytes: Bytes) -> Vec<u8> {
bytes.0.into()
}
}

impl Deref for Bytes {
type Target = [u8];

Expand Down
1 change: 1 addition & 0 deletions src/common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod bit_vec;
pub mod buffer;
pub mod bytes;
pub mod plugins;
pub mod range_read;
#[allow(clippy::all)]
pub mod readable_size;
pub mod secrets;
Expand Down
92 changes: 92 additions & 0 deletions src/common/base/src/range_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;
use std::ops::Range;

use async_trait::async_trait;
use futures::{AsyncReadExt, AsyncSeekExt};

use crate::buffer::BufferMut;
evenyag marked this conversation as resolved.
Show resolved Hide resolved
use crate::bytes::Bytes;

/// `Metadata` contains the metadata of a source.
pub struct Metadata {
/// The length of the source in bytes.
pub content_length: u64,
}

/// `RangeReader` reads a range of bytes from a source.
#[async_trait]
pub trait RangeReader: Send + Unpin {
/// Returns the metadata of the source.
async fn metadata(&mut self) -> io::Result<Metadata>;

/// Reads the bytes in the given range.
async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes>;

/// Reads the bytes in the given range into the buffer.
///
/// Handles the buffer based on its capacity:
/// - If the buffer is insufficient to hold the bytes, it will either:
/// - Allocate additional space (e.g., for `Vec<u8>`)
/// - Return an error (e.g., for `&mut [u8]`)
async fn read_into(
&mut self,
range: Range<u64>,
buf: &mut (impl BufferMut + Send),
) -> io::Result<()> {
let bytes = self.read(range).await?;
let res = buf.write_from_slice(&bytes);

if let Err(err) = res {
let kind = match err {
crate::buffer::Error::Eof { .. } => io::ErrorKind::UnexpectedEof,
_ => io::ErrorKind::Other,
};

return Err(io::Error::new(kind, err));
}

Ok(())
}

/// Reads the bytes in the given ranges.
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
let mut result = Vec::with_capacity(ranges.len());
for range in ranges {
result.push(self.read(range.clone()).await?);
}
Ok(result)
}
}

/// Implement `RangeReader` for a type that implements `AsyncRead + AsyncSeek`.
///
/// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`.
/// Until the codebase is fully ported to `RangeReader`, remove this implementation.
#[async_trait]
impl<R: futures::AsyncRead + futures::AsyncSeek + Send + Unpin> RangeReader for R {
async fn metadata(&mut self) -> io::Result<Metadata> {
let content_length = self.seek(io::SeekFrom::End(0)).await?;
Ok(Metadata { content_length })
}

async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
let mut buf = vec![0; (range.end - range.start) as usize];
self.seek(io::SeekFrom::Start(range.start)).await?;
self.read_exact(&mut buf).await?;
Ok(Bytes::from(buf))
}
}
29 changes: 14 additions & 15 deletions src/index/src/inverted_index/format/reader/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::SeekFrom;
use std::sync::Arc;

use async_trait::async_trait;
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use common_base::range_read::RangeReader;
use greptime_proto::v1::index::InvertedIndexMetas;
use snafu::{ensure, ResultExt};

use crate::inverted_index::error::{ReadSnafu, Result, SeekSnafu, UnexpectedBlobSizeSnafu};
use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::MIN_BLOB_SIZE;
Expand Down Expand Up @@ -49,28 +48,28 @@ impl<R> InvertedIndexBlobReader<R> {
}

#[async_trait]
impl<R: AsyncRead + AsyncSeek + Unpin + Send> InvertedIndexReader for InvertedIndexBlobReader<R> {
impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
async fn read_all(&mut self, dest: &mut Vec<u8>) -> Result<usize> {
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
self.source
.seek(SeekFrom::Start(0))
.read_into(0..metadata.content_length, dest)
.await
.context(SeekSnafu)?;
self.source.read_to_end(dest).await.context(ReadSnafu)
.context(CommonIoSnafu)?;
Ok(metadata.content_length as usize)
}

async fn seek_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>> {
self.source
.seek(SeekFrom::Start(offset))
let buf = self
.source
.read(offset..offset + size as u64)
.await
.context(SeekSnafu)?;
let mut buf = vec![0u8; size as usize];
self.source.read(&mut buf).await.context(ReadSnafu)?;
Ok(buf)
.context(CommonIoSnafu)?;
Ok(buf.into())
}

async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>> {
let end = SeekFrom::End(0);
let blob_size = self.source.seek(end).await.context(SeekSnafu)?;
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
let blob_size = metadata.content_length;
Self::validate_blob_size(blob_size)?;

let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size);
Expand Down
51 changes: 26 additions & 25 deletions src/index/src/inverted_index/format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,59 +12,57 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::SeekFrom;

use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use common_base::range_read::RangeReader;
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
use prost::Message;
use snafu::{ensure, ResultExt};

use crate::inverted_index::error::{
DecodeProtoSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu,
CommonIoSnafu, DecodeProtoSnafu, Result, UnexpectedFooterPayloadSizeSnafu,
UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu,
};
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;

/// InvertedIndeFooterReader is for reading the footer section of the blob.
pub struct InvertedIndeFooterReader<R> {
source: R,
pub struct InvertedIndeFooterReader<'a, R> {
source: &'a mut R,
blob_size: u64,
}

impl<R> InvertedIndeFooterReader<R> {
pub fn new(source: R, blob_size: u64) -> Self {
impl<'a, R> InvertedIndeFooterReader<'a, R> {
pub fn new(source: &'a mut R, blob_size: u64) -> Self {
Self { source, blob_size }
}
}

impl<R: AsyncRead + AsyncSeek + Unpin> InvertedIndeFooterReader<R> {
impl<'a, R: RangeReader> InvertedIndeFooterReader<'a, R> {
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
let payload_size = self.read_payload_size().await?;
let metas = self.read_payload(payload_size).await?;
Ok(metas)
}

async fn read_payload_size(&mut self) -> Result<u64> {
let size_offset = SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE);
self.source.seek(size_offset).await.context(SeekSnafu)?;
let size_buf = &mut [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize];
self.source.read_exact(size_buf).await.context(ReadSnafu)?;

let payload_size = u32::from_le_bytes(*size_buf) as u64;
let mut size_buf = [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize];
let end = self.blob_size;
let start = end - FOOTER_PAYLOAD_SIZE_SIZE;
self.source
.read_into(start..end, &mut &mut size_buf[..])
.await
.context(CommonIoSnafu)?;

let payload_size = u32::from_le_bytes(size_buf) as u64;
self.validate_payload_size(payload_size)?;

Ok(payload_size)
}

async fn read_payload(&mut self, payload_size: u64) -> Result<InvertedIndexMetas> {
let payload_offset =
SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size);
self.source.seek(payload_offset).await.context(SeekSnafu)?;

let payload = &mut vec![0u8; payload_size as usize];
self.source.read_exact(payload).await.context(ReadSnafu)?;
let end = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE;
let start = end - payload_size;
let bytes = self.source.read(start..end).await.context(CommonIoSnafu)?;

let metas = InvertedIndexMetas::decode(&payload[..]).context(DecodeProtoSnafu)?;
let metas = InvertedIndexMetas::decode(&*bytes).context(DecodeProtoSnafu)?;
self.validate_metas(&metas, payload_size)?;

Ok(metas)
Expand Down Expand Up @@ -144,7 +142,8 @@ mod tests {

let payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size);
let mut cursor = Cursor::new(payload_buf);
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let metas = reader.read_payload(payload_size).await.unwrap();
Expand All @@ -164,7 +163,8 @@ mod tests {
let mut payload_buf = create_test_payload(meta);
payload_buf.push(0xff); // Add an extra byte to corrupt the footer
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size);
let mut cursor = Cursor::new(payload_buf);
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);

let payload_size_result = reader.read_payload_size().await;
assert!(payload_size_result.is_err());
Expand All @@ -181,7 +181,8 @@ mod tests {

let payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size);
let mut cursor = Cursor::new(payload_buf);
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let payload_result = reader.read_payload(payload_size).await;
Expand Down