Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implements a reader to make schema compatible #2326

13 changes: 13 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,18 @@ pub enum Error {
region_id: RegionId,
location: Location,
},

#[snafu(display(
"Failed to compat readers for region {}, reason: {}, location: {}",
region_id,
reason,
location
))]
CompatReader {
region_id: RegionId,
reason: String,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -484,6 +496,7 @@ impl ErrorExt for Error {
FlushRegion { source, .. } => source.status_code(),
RegionDropped { .. } => StatusCode::Cancelled,
RegionClosed { .. } => StatusCode::Cancelled,
CompatReader { .. } => StatusCode::Unexpected,
}
}

Expand Down
101 changes: 87 additions & 14 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

//! Common structs and utilities for reading data.

pub mod compat;
pub mod merge;
pub(crate) mod projection;
pub mod projection;
pub(crate) mod scan_region;
pub(crate) mod seq_scan;

use std::collections::HashSet;
use std::sync::Arc;

use api::v1::OpType;
Expand All @@ -34,17 +36,18 @@ use datatypes::vectors::{
BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber};

use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;

/// Storage internal representation of a batch of rows
/// for a primary key (time series).
/// Storage internal representation of a batch of rows for a primary key (time series).
///
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc.
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
#[derive(Debug, PartialEq, Clone)]
pub struct Batch {
/// Primary key encoded in a comparable form.
Expand Down Expand Up @@ -77,6 +80,17 @@ impl Batch {
.build()
}

/// Tries to set fields for the batch.
pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
Batch::new(
self.primary_key,
self.timestamps,
self.sequences,
self.op_types,
fields,
)
}

/// Returns primary key of the batch.
pub fn primary_key(&self) -> &[u8] {
&self.primary_key
Expand Down Expand Up @@ -150,6 +164,11 @@ impl Batch {
Some(self.get_sequence(self.sequences.len() - 1))
}

/// Replaces the primary key of the batch.
pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
self.primary_key = primary_key;
}

/// Slice the batch, returning a new batch.
///
/// # Panics
Expand Down Expand Up @@ -202,15 +221,22 @@ impl Batch {
reason: "batches have different primary key",
}
);
ensure!(
batches
.iter()
.skip(1)
.all(|b| b.fields().len() == first.fields().len()),
InvalidBatchSnafu {
reason: "batches have different field num",
for b in batches.iter().skip(1) {
ensure!(
b.fields.len() == first.fields.len(),
InvalidBatchSnafu {
reason: "batches have different field num",
}
);
for (l, r) in b.fields.iter().zip(&first.fields) {
ensure!(
l.column_id == r.column_id,
InvalidBatchSnafu {
reason: "batches have different fields",
}
);
}
);
}

// We take the primary key from the first batch.
let mut builder = BatchBuilder::new(primary_key);
Expand Down Expand Up @@ -311,6 +337,24 @@ impl Batch {
self.take_in_place(&indices)
}

/// Returns ids of fields in the [Batch] after applying the `projection`.
pub(crate) fn projected_fields(
metadata: &RegionMetadata,
projection: &[ColumnId],
) -> Vec<ColumnId> {
let projected_ids: HashSet<_> = projection.iter().copied().collect();
metadata
.field_columns()
.filter_map(|column| {
if projected_ids.contains(&column.column_id) {
Some(column.column_id)
} else {
None
}
})
.collect()
}

/// Takes the batch in place.
fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
Expand Down Expand Up @@ -566,8 +610,6 @@ impl Source {
/// The reader must guarantee [Batch]es returned by it have the same schema.
#[async_trait]
pub trait BatchReader: Send {
// TODO(yingwen): fields of the batch returned.

/// Fetch next [Batch].
///
/// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
Expand Down Expand Up @@ -729,6 +771,37 @@ mod tests {
);
}

#[test]
fn test_concat_different_fields() {
let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
let fields = vec![
batch1.fields()[0].clone(),
BatchColumn {
column_id: 2,
data: Arc::new(UInt64Vector::from_slice([2])),
},
];
// Batch 2 has more fields.
let batch2 = batch1.clone().with_fields(fields).unwrap();
let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);

// Batch 2 has different field.
let fields = vec![BatchColumn {
column_id: 2,
data: Arc::new(UInt64Vector::from_slice([2])),
}];
let batch2 = batch1.clone().with_fields(fields).unwrap();
let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);
}

#[test]
fn test_filter_deleted_empty() {
let mut batch = new_batch(&[], &[], &[], &[]);
Expand Down
Loading