Skip to content

Commit

Permalink
adding tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Jan 16, 2025
1 parent d59d1c8 commit 5b70982
Showing 1 changed file with 198 additions and 27 deletions.
225 changes: 198 additions & 27 deletions datafusion/physical-plan/src/sorts/row_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub enum CompressionType {
/// +----------------+------------------+
#[derive(Debug)]
pub struct RowWriter {
writer: BufWriter<File>,
writer: Option<BufWriter<File>>,
block_offsets: Vec<u64>,
current_offset: u64,
compression: CompressionType,
Expand All @@ -90,28 +90,39 @@ impl RowWriter {
compression: Option<CompressionType>,
) -> Result<Self, DataFusionError> {
let file = File::create(path).map_err(|e| {
DataFusionError::Execution(format!("Failed to create file at {path:?}: {e}"))
DataFusionError::IoError(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to create file at {path:?}: {e}"),
))
})?;

Ok(Self {
writer: BufWriter::new(file),
writer: Some(BufWriter::new(file)),
block_offsets: Vec::new(),
current_offset: 0,
compression: compression.unwrap_or(CompressionType::UNCOMPRESSED),
})
}

pub fn write_rows(&mut self, rows: &Rows) -> Result<(), DataFusionError> {
self.block_offsets.push(self.current_offset);
let (row_data, row_offsets) = self.prepare_row_data(rows)?;
let compressed_data = self.compress(&row_data)?;
let metadata_size = self.metadata_size(&row_offsets);

let writer = self.writer.as_mut().ok_or_else(|| {
DataFusionError::Internal("Cannot write to finished RowWriter".to_string())
})?;

self.writer.write_all(&compressed_data)?;
self.block_offsets.push(self.current_offset);

writer.write_all(&compressed_data)?;

self.write_block_metadata(&row_offsets)?;
for &offset in &row_offsets {
writer.write_all(&offset.to_le_bytes())?;
}
writer.write_all(&(row_offsets.len() as u32).to_le_bytes())?;

self.current_offset +=
(compressed_data.len() + self.metadata_size(&row_offsets)) as u64;
self.current_offset += (compressed_data.len() + metadata_size) as u64;

Ok(())
}
Expand All @@ -134,31 +145,22 @@ impl RowWriter {
Ok((row_data, row_offsets))
}

fn write_block_metadata(
&mut self,
row_offsets: &[u32],
) -> Result<(), DataFusionError> {
for &offset in row_offsets {
self.writer.write_all(&offset.to_le_bytes())?;
}
self.writer
.write_all(&(row_offsets.len() as u32).to_le_bytes())?;
Ok(())
}

fn metadata_size(&self, row_offsets: &[u32]) -> usize {
4 + // row count
row_offsets.len() * 4 // row offsets
4 + row_offsets.len() * 4
}

pub fn finish(mut self) -> Result<(), DataFusionError> {
let writer = self.writer.take().ok_or_else(|| {
DataFusionError::Internal("RowWriter is already finished".to_string())
})?;

let metadata = self.prepare_file_metadata()?;
let compressed_metadata = self.compress(&metadata)?;

self.writer.write_all(&compressed_metadata)?;
self.writer
.write_all(&(compressed_metadata.len() as u32).to_le_bytes())?;
self.writer.flush()?;
let mut writer = writer;
writer.write_all(&compressed_metadata)?;
writer.write_all(&(compressed_metadata.len() as u32).to_le_bytes())?;
writer.flush()?;

Ok(())
}
Expand Down Expand Up @@ -330,7 +332,7 @@ mod tests {
compute::concat_batches,
row::{RowConverter, SortField},
};
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::DataFusionError;
use std::sync::Arc;
Expand Down Expand Up @@ -402,6 +404,175 @@ mod tests {
b.as_any().downcast_ref::<Int32Array>().unwrap()
);

Ok(())
}
#[test]
fn test_multiple_write_calls() -> Result<(), DataFusionError> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let converter =
Arc::new(RowConverter::new(vec![SortField::new(DataType::Int32)])?);
let temp_file = NamedTempFile::new()?;
let mut row_writer = RowWriter::new(temp_file.path(), None)?;

// Write multiple batches
let batches = vec![
RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![1, 2]))],
)?,
RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![3, 4]))],
)?,
];

for batch in &batches {
let rows = converter.convert_columns(batch.columns())?;
row_writer.write_rows(&rows)?;
}
row_writer.finish()?;

// Read and verify
let row_reader = RowReader::new(temp_file.path(), None, Arc::clone(&converter))?;
let mut read_rows = 0;
for rows in row_reader {
let rows = rows?;
let columns = converter.convert_rows(&rows)?;
let batch = RecordBatch::try_new(Arc::clone(&schema), columns)?;
read_rows += batch.num_rows();
}
assert_eq!(read_rows, 4);
Ok(())
}

#[test]
fn test_single_row() -> Result<(), DataFusionError> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let converter =
Arc::new(RowConverter::new(vec![SortField::new(DataType::Int32)])?);
let temp_file = NamedTempFile::new()?;
let mut row_writer = RowWriter::new(temp_file.path(), None)?;

// Write single row
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![42]))],
)?;
let rows = converter.convert_columns(batch.columns())?;
row_writer.write_rows(&rows)?;
row_writer.finish()?;

// Read and verify
let row_reader = RowReader::new(temp_file.path(), None, Arc::clone(&converter))?;
let mut read_value = None;
for rows in row_reader {
let rows = rows?;
let columns = converter.convert_rows(&rows)?;
let batch = RecordBatch::try_new(Arc::clone(&schema), columns)?;
read_value = Some(
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(0),
);
}
assert_eq!(read_value, Some(42));
Ok(())
}

#[test]
fn test_repeated_single_row() -> Result<(), DataFusionError> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let converter =
Arc::new(RowConverter::new(vec![SortField::new(DataType::Int32)])?);
let temp_file = NamedTempFile::new()?;
let mut row_writer = RowWriter::new(temp_file.path(), None)?;

// Write same row multiple times
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![99]))],
)?;

for _ in 0..3 {
let rows = converter.convert_columns(batch.columns())?;
row_writer.write_rows(&rows)?;
}
row_writer.finish()?;

// Read and verify
let row_reader = RowReader::new(temp_file.path(), None, Arc::clone(&converter))?;
let mut read_count = 0;
for rows in row_reader {
let rows = rows?;
let columns = converter.convert_rows(&rows)?;
let batch = RecordBatch::try_new(Arc::clone(&schema), columns)?;
assert_eq!(
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(0),
99
);
read_count += 1;
}
assert_eq!(read_count, 3);
Ok(())
}

#[test]
fn test_variable_length_fields() -> Result<(), DataFusionError> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let converter = Arc::new(RowConverter::new(vec![
SortField::new(DataType::Int32),
SortField::new(DataType::Utf8),
])?);
let temp_file = NamedTempFile::new()?;
let mut row_writer = RowWriter::new(temp_file.path(), None)?;

// Write batch with string data
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["hello", "world"])),
],
)?;

let rows = converter.convert_columns(batch.columns())?;
row_writer.write_rows(&rows)?;
row_writer.finish()?;

// Read and verify
let row_reader = RowReader::new(temp_file.path(), None, Arc::clone(&converter))?;
let mut read_batches = Vec::new();

for rows in row_reader {
let rows = rows?;
let columns = converter.convert_rows(&rows)?;
let batch = RecordBatch::try_new(Arc::clone(&schema), columns)?;
read_batches.push(batch);
}

let read_batch = concat_batches(&schema, &read_batches)?;
assert_eq!(read_batch.num_rows(), 2);

let string_array = read_batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

assert_eq!(string_array.value(0), "hello");
assert_eq!(string_array.value(1), "world");

Ok(())
}
}

0 comments on commit 5b70982

Please sign in to comment.