diff --git a/Cargo.toml b/Cargo.toml index dccc6bdf1..3234bd07a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,9 +34,11 @@ apache-avro = "0.16" arrow-arith = { version = ">=46" } arrow-array = { version = ">=46" } arrow-schema = { version = ">=46" } +async-stream = "0.3.5" async-trait = "0.1" bimap = "0.6" bitvec = "1.0.1" +bytes = "1.5" chrono = "0.4" derive_builder = "0.20.0" either = "1" @@ -52,6 +54,7 @@ murmur3 = "0.5.2" once_cell = "1" opendal = "0.45" ordered-float = "4.0.0" +parquet = "50" pretty_assertions = "1.4.0" port_scanner = "0.1.5" reqwest = { version = "^0.11", features = ["json"] } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 6af68a106..2231607df 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -34,9 +34,11 @@ apache-avro = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true } arrow-schema = { workspace = true } +async-stream = { workspace = true } async-trait = { workspace = true } bimap = { workspace = true } bitvec = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true } derive_builder = { workspace = true } either = { workspace = true } @@ -48,6 +50,7 @@ murmur3 = { workspace = true } once_cell = { workspace = true } opendal = { workspace = true } ordered-float = { workspace = true } +parquet = { workspace = true, features = ["async"] } reqwest = { workspace = true } rust_decimal = { workspace = true } serde = { workspace = true } @@ -56,6 +59,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } serde_repr = { workspace = true } serde_with = { workspace = true } +tokio = { workspace = true } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs index 3a7c85f42..410d87076 100644 --- a/crates/iceberg/src/io.rs +++ b/crates/iceberg/src/io.rs @@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind}; use futures::{AsyncRead, AsyncSeek, AsyncWrite}; use once_cell::sync::Lazy; use opendal::{Operator, Scheme}; +use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek}; use url::Url; /// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3). @@ -215,9 +216,12 @@ pub struct InputFile { } /// Trait for reading file. -pub trait FileRead: AsyncRead + AsyncSeek {} +pub trait FileRead: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek {} -impl FileRead for T where T: AsyncRead + AsyncSeek {} +impl FileRead for T where + T: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek +{ +} impl InputFile { /// Absolute path to root uri. diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 0a3b9a915..99ddaa097 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -22,8 +22,14 @@ use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, Tab use crate::table::Table; use crate::{Error, ErrorKind}; use arrow_array::RecordBatch; +use async_stream::try_stream; use futures::stream::{iter, BoxStream}; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; +use parquet::arrow::arrow_reader::RowSelection; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; + +/// Default arrow record batch size +const DEFAULT_BATCH_SIZE: usize = 1024; /// Builder to create table scan. pub struct TableScanBuilder<'a> { @@ -31,6 +37,7 @@ pub struct TableScanBuilder<'a> { // Empty column names means to select all columns column_names: Vec, snapshot_id: Option, + batch_size: Option, } impl<'a> TableScanBuilder<'a> { @@ -39,6 +46,7 @@ impl<'a> TableScanBuilder<'a> { table, column_names: vec![], snapshot_id: None, + batch_size: None, } } @@ -63,6 +71,11 @@ impl<'a> TableScanBuilder<'a> { self } + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = Some(batch_size); + self + } + /// Build the table scan. pub fn build(self) -> crate::Result { let snapshot = match self.snapshot_id { @@ -110,6 +123,7 @@ impl<'a> TableScanBuilder<'a> { table_metadata: self.table.metadata_ref(), column_names: self.column_names, schema, + batch_size: self.batch_size, }) } } @@ -123,6 +137,7 @@ pub struct TableScan { file_io: FileIO, column_names: Vec, schema: SchemaRef, + batch_size: Option, } /// A stream of [`FileScanTask`]. @@ -163,6 +178,54 @@ impl TableScan { Ok(iter(file_scan_tasks).boxed()) } + + /// Transforms a stream of FileScanTasks from plan_files into a stream of + /// Arrow RecordBatches. + pub fn open(&self, mut tasks: FileScanTaskStream) -> crate::Result { + let file_io = self.file_io.clone(); + let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + let projection_mask = self.get_arrow_projection_mask(); + let row_selection = self.get_arrow_row_selection(); + + Ok( + try_stream! { + while let Some(Ok(task)) = tasks.next().await { + let parquet_reader = file_io + .new_input(task.data_file().file_path())? + .reader() + .await?; + + let mut batch_stream = ParquetRecordBatchStreamBuilder::new(parquet_reader) + .await + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "failed to load parquet file").with_source(err) + })? + .with_batch_size(batch_size) + .with_offset(task.start() as usize) + .with_limit(task.length() as usize) + .with_projection(projection_mask.clone()) + .with_row_selection(row_selection.clone()) + .build() + .unwrap() + .map_err(|err| Error::new(ErrorKind::Unexpected, "Fail to read data").with_source(err)); + + while let Some(batch) = batch_stream.next().await { + yield batch?; + } + } + }.boxed() + ) + } + + fn get_arrow_projection_mask(&self) -> ProjectionMask { + // TODO, dummy implementation + todo!() + } + + fn get_arrow_row_selection(&self) -> RowSelection { + // TODO, dummy implementation + todo!() + } } /// A task to scan part of file. @@ -178,9 +241,16 @@ pub struct FileScanTask { pub type ArrowRecordBatchStream = BoxStream<'static, crate::Result>; impl FileScanTask { - /// Returns a stream of arrow record batches. - pub async fn execute(&self) -> crate::Result { - todo!() + pub fn data_file(&self) -> ManifestEntryRef { + self.data_file.clone() + } + + pub fn start(&self) -> u64 { + self.start + } + + pub fn length(&self) -> u64 { + self.length } } @@ -194,8 +264,15 @@ mod tests { }; use crate::table::Table; use crate::TableIdent; + use arrow_array::{ArrayRef, Int64Array, RecordBatch}; use futures::TryStreamExt; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; + use std::collections::HashMap; use std::fs; + use std::fs::File; + use std::sync::Arc; use tempfile::TempDir; use tera::{Context, Tera}; use uuid::Uuid; @@ -257,6 +334,125 @@ mod tests { )) .unwrap() } + + async fn setup_manifest_files(&mut self) { + let current_snapshot = self.table.metadata().current_snapshot().unwrap(); + let parent_snapshot = current_snapshot + .parent_snapshot(self.table.metadata()) + .unwrap(); + let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); + let current_partition_spec = self.table.metadata().default_partition_spec().unwrap(); + + // Write data files + let data_file_manifest = ManifestWriter::new( + self.next_manifest_file(), + current_snapshot.snapshot_id(), + vec![], + ) + .write(Manifest::new( + ManifestMetadata::builder() + .schema((*current_schema).clone()) + .content(ManifestContentType::Data) + .format_version(FormatVersion::V2) + .partition_spec((**current_partition_spec).clone()) + .schema_id(current_schema.schema_id()) + .build(), + vec![ + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFile::builder() + .content(DataContentType::Data) + .file_path(format!("{}/1.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build(), + ) + .build(), + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(parent_snapshot.snapshot_id()) + .sequence_number(parent_snapshot.sequence_number()) + .file_sequence_number(parent_snapshot.sequence_number()) + .data_file( + DataFile::builder() + .content(DataContentType::Data) + .file_path(format!("{}/2.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(200))])) + .build(), + ) + .build(), + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(parent_snapshot.snapshot_id()) + .sequence_number(parent_snapshot.sequence_number()) + .file_sequence_number(parent_snapshot.sequence_number()) + .data_file( + DataFile::builder() + .content(DataContentType::Data) + .file_path(format!("{}/3.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build(), + ) + .build(), + ], + )) + .await + .unwrap(); + + // Write to manifest list + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(current_snapshot.manifest_list()) + .unwrap(), + current_snapshot.snapshot_id(), + current_snapshot + .parent_snapshot_id() + .unwrap_or(EMPTY_SNAPSHOT_ID), + current_snapshot.sequence_number(), + ); + manifest_list_write + .add_manifest_entries(vec![data_file_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + + // prepare data + let schema = { + let fields = + vec![ + arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap(); + + // Write the Parquet files + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &self.table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + } } #[test] @@ -324,97 +520,8 @@ mod tests { #[tokio::test] async fn test_plan_files_no_deletions() { - let fixture = TableTestFixture::new(); - - let current_snapshot = fixture.table.metadata().current_snapshot().unwrap(); - let parent_snapshot = current_snapshot - .parent_snapshot(fixture.table.metadata()) - .unwrap(); - let current_schema = current_snapshot.schema(fixture.table.metadata()).unwrap(); - let current_partition_spec = fixture.table.metadata().default_partition_spec().unwrap(); - - // Write data files - let data_file_manifest = ManifestWriter::new( - fixture.next_manifest_file(), - current_snapshot.snapshot_id(), - vec![], - ) - .write(Manifest::new( - ManifestMetadata::builder() - .schema((*current_schema).clone()) - .content(ManifestContentType::Data) - .format_version(FormatVersion::V2) - .partition_spec((**current_partition_spec).clone()) - .schema_id(current_schema.schema_id()) - .build(), - vec![ - ManifestEntry::builder() - .status(ManifestStatus::Added) - .data_file( - DataFile::builder() - .content(DataContentType::Data) - .file_path(format!("{}/1.parquet", &fixture.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(100))])) - .build(), - ) - .build(), - ManifestEntry::builder() - .status(ManifestStatus::Deleted) - .snapshot_id(parent_snapshot.snapshot_id()) - .sequence_number(parent_snapshot.sequence_number()) - .file_sequence_number(parent_snapshot.sequence_number()) - .data_file( - DataFile::builder() - .content(DataContentType::Data) - .file_path(format!("{}/2.parquet", &fixture.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(200))])) - .build(), - ) - .build(), - ManifestEntry::builder() - .status(ManifestStatus::Existing) - .snapshot_id(parent_snapshot.snapshot_id()) - .sequence_number(parent_snapshot.sequence_number()) - .file_sequence_number(parent_snapshot.sequence_number()) - .data_file( - DataFile::builder() - .content(DataContentType::Data) - .file_path(format!("{}/3.parquet", &fixture.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(300))])) - .build(), - ) - .build(), - ], - )) - .await - .unwrap(); - - // Write to manifest list - let mut manifest_list_write = ManifestListWriter::v2( - fixture - .table - .file_io() - .new_output(current_snapshot.manifest_list()) - .unwrap(), - current_snapshot.snapshot_id(), - current_snapshot - .parent_snapshot_id() - .unwrap_or(EMPTY_SNAPSHOT_ID), - current_snapshot.sequence_number(), - ); - manifest_list_write - .add_manifest_entries(vec![data_file_manifest].into_iter()) - .unwrap(); - manifest_list_write.close().await.unwrap(); + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; // Create table scan for current snapshot and plan files let table_scan = fixture.table.scan().build().unwrap(); @@ -445,4 +552,23 @@ mod tests { format!("{}/3.parquet", &fixture.table_location) ); } + + #[tokio::test] + async fn test_open_parquet_no_deletions() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Create table scan for current snapshot and plan files + let table_scan = fixture.table.scan().build().unwrap(); + let tasks = table_scan.plan_files().await.unwrap(); + + let batch_stream = table_scan.open(tasks).unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + let col = batches[0].column_by_name("col").unwrap(); + + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 1); + } } diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 7af8a1bbd..76dd2a5d8 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -855,6 +855,11 @@ impl ManifestEntry { self.data_file.content } + /// Content type of this manifest entry. + pub fn file_format(&self) -> DataFileFormat { + self.data_file.file_format + } + /// Data file path of this manifest entry. pub fn file_path(&self) -> &str { &self.data_file.file_path