Skip to content

Commit

Permalink
support parquet writer
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jan 31, 2024
1 parent d9d6cfc commit 8a08774
Show file tree
Hide file tree
Showing 10 changed files with 762 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ arrow-schema = { version = ">=46" }
async-trait = "0.1"
bimap = "0.6"
bitvec = "1.0.1"
bytes = "1.5"
chrono = "0.4"
derive_builder = "0.12.0"
either = "1"
Expand All @@ -53,6 +54,7 @@ opendal = "0.44"
ordered-float = "4.0.0"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
parquet = { version = ">=46", features = ["async"] }
reqwest = { version = "^0.11", features = ["json"] }
rust_decimal = "1.31.0"
serde = { version = "^1.0", features = ["rc"] }
Expand Down
4 changes: 3 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
bitvec = { workspace = true }
bytes ={ workspace = true }
chrono = { workspace = true }
derive_builder = { workspace = true }
either = { workspace = true }
Expand All @@ -47,6 +48,7 @@ murmur3 = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true }
ordered-float = { workspace = true }
parquet ={ workspace = true }
reqwest = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
Expand All @@ -55,6 +57,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
tokio = { workspace = true }
typed-builder = { workspace = true }
url = { workspace = true }
urlencoding = { workspace = true }
Expand All @@ -64,4 +67,3 @@ uuid = { workspace = true }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }
tokio = { workspace = true }
4 changes: 4 additions & 0 deletions crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub enum ErrorKind {
///
/// This error is returned when given iceberg feature is not supported.
FeatureUnsupported,
/// Iceberg meets an unexpected parameter. Please refer the related doc or error message to
/// check if the parameter is valid.
UnexpectedParameter,
}

impl ErrorKind {
Expand All @@ -59,6 +62,7 @@ impl From<ErrorKind> for &'static str {
ErrorKind::Unexpected => "Unexpected",
ErrorKind::DataInvalid => "DataInvalid",
ErrorKind::FeatureUnsupported => "FeatureUnsupported",
ErrorKind::UnexpectedParameter => "UnexpectedParameter",
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions crates/iceberg/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use once_cell::sync::Lazy;
use opendal::{Operator, Scheme};
use tokio::io::AsyncWrite as TokioAsyncWrite;
use url::Url;

/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
Expand Down Expand Up @@ -240,9 +241,9 @@ impl InputFile {
}

/// Trait for writing file.
pub trait FileWrite: AsyncWrite {}
pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}

impl<T> FileWrite for T where T: AsyncWrite {}
impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}

/// Output file is used for writing to files..
#[derive(Debug)]
Expand Down Expand Up @@ -278,8 +279,10 @@ impl OutputFile {
}

/// Creates output file for writing.
pub async fn writer(&self) -> Result<impl FileWrite> {
Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
Ok(Box::new(
self.op.writer(&self.path[self.relative_path_pos..]).await?,
))
}
}

Expand Down
17 changes: 10 additions & 7 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl FileScanTask {
mod tests {
use crate::io::{FileIO, OutputFile};
use crate::spec::{
DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest,
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
};
Expand Down Expand Up @@ -351,14 +351,15 @@ mod tests {
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFile::builder()
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &fixture.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build(),
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
Expand All @@ -367,14 +368,15 @@ mod tests {
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFile::builder()
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &fixture.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build(),
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
Expand All @@ -383,14 +385,15 @@ mod tests {
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFile::builder()
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &fixture.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build(),
.build()
.unwrap(),
)
.build(),
],
Expand Down
34 changes: 17 additions & 17 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,34 +924,34 @@ impl TryFrom<i32> for ManifestStatus {
}

/// Data file carries data file path, partition tuple, metrics, …
#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)]
#[derive(Debug, PartialEq, Clone, Eq, Builder)]
pub struct DataFile {
/// field id: 134
///
/// Type of content stored by the data file: data, equality deletes,
/// or position deletes (all v1 files are data files)
content: DataContentType,
pub(crate) content: DataContentType,
/// field id: 100
///
/// Full URI for the file with FS scheme
file_path: String,
pub(crate) file_path: String,
/// field id: 101
///
/// String file format name, avro, orc or parquet
file_format: DataFileFormat,
pub(crate) file_format: DataFileFormat,
/// field id: 102
///
/// Partition data tuple, schema based on the partition spec output using
/// partition field ids for the struct field ids
partition: Struct,
pub(crate) partition: Struct,
/// field id: 103
///
/// Number of records in this file
record_count: u64,
pub(crate) record_count: u64,
/// field id: 104
///
/// Total file size in bytes
file_size_in_bytes: u64,
pub(crate) file_size_in_bytes: u64,
/// field id: 108
/// key field id: 117
/// value field id: 118
Expand All @@ -960,29 +960,29 @@ pub struct DataFile {
/// store the column. Does not include bytes necessary to read other
/// columns, like footers. Leave null for row-oriented formats (Avro)
#[builder(default)]
column_sizes: HashMap<i32, u64>,
pub(crate) column_sizes: HashMap<i32, u64>,
/// field id: 109
/// key field id: 119
/// value field id: 120
///
/// Map from column id to number of values in the column (including null
/// and NaN values)
#[builder(default)]
value_counts: HashMap<i32, u64>,
pub(crate) value_counts: HashMap<i32, u64>,
/// field id: 110
/// key field id: 121
/// value field id: 122
///
/// Map from column id to number of null values in the column
#[builder(default)]
null_value_counts: HashMap<i32, u64>,
pub(crate) null_value_counts: HashMap<i32, u64>,
/// field id: 137
/// key field id: 138
/// value field id: 139
///
/// Map from column id to number of NaN values in the column
#[builder(default)]
nan_value_counts: HashMap<i32, u64>,
pub(crate) nan_value_counts: HashMap<i32, u64>,
/// field id: 125
/// key field id: 126
/// value field id: 127
Expand All @@ -995,7 +995,7 @@ pub struct DataFile {
///
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
#[builder(default)]
lower_bounds: HashMap<i32, Literal>,
pub(crate) lower_bounds: HashMap<i32, Literal>,
/// field id: 128
/// key field id: 129
/// value field id: 130
Expand All @@ -1008,19 +1008,19 @@ pub struct DataFile {
///
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
#[builder(default)]
upper_bounds: HashMap<i32, Literal>,
pub(crate) upper_bounds: HashMap<i32, Literal>,
/// field id: 131
///
/// Implementation-specific key metadata for encryption
#[builder(default)]
key_metadata: Vec<u8>,
pub(crate) key_metadata: Vec<u8>,
/// field id: 132
/// element field id: 133
///
/// Split offsets for the data file. For example, all row group offsets
/// in a Parquet file. Must be sorted ascending
#[builder(default)]
split_offsets: Vec<i64>,
pub(crate) split_offsets: Vec<i64>,
/// field id: 135
/// element field id: 136
///
Expand All @@ -1029,7 +1029,7 @@ pub struct DataFile {
/// otherwise. Fields with ids listed in this column must be present
/// in the delete file
#[builder(default)]
equality_ids: Vec<i32>,
pub(crate) equality_ids: Vec<i32>,
/// field id: 140
///
/// ID representing sort order for this file.
Expand All @@ -1041,7 +1041,7 @@ pub struct DataFile {
/// order id to null. Readers must ignore sort order id for position
/// delete files.
#[builder(default, setter(strip_option))]
sort_order_id: Option<i32>,
pub(crate) sort_order_id: Option<i32>,
}

/// Type of content stored by the data file: data, equality deletes, or
Expand Down
Loading

0 comments on commit 8a08774

Please sign in to comment.