Skip to content

Commit

Permalink
fix: fix parse partitions in manifest_list (apache#122)
Browse files Browse the repository at this point in the history
* fix parse partitions in manifest_list

* support init default in manifest

* add error context

---------

Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
ZENOTME and ZENOTME authored Dec 19, 2023
1 parent d98c325 commit 7d06a85
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 41 deletions.
12 changes: 6 additions & 6 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,8 +1104,8 @@ mod _serde {
Ok(ManifestEntry {
status: self.status.try_into()?,
snapshot_id: Some(self.snapshot_id),
sequence_number: None,
file_sequence_number: None,
sequence_number: Some(0),
file_sequence_number: Some(0),
data_file: self.data_file.try_into(partition_type, schema)?,
})
}
Expand Down Expand Up @@ -1620,8 +1620,8 @@ mod tests {
entries: vec![ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: Some(0),
sequence_number: None,
file_sequence_number: None,
sequence_number: Some(0),
file_sequence_number: Some(0),
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
Expand Down Expand Up @@ -1690,8 +1690,8 @@ mod tests {
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: Some(0),
sequence_number: None,
file_sequence_number: None,
sequence_number: Some(0),
file_sequence_number: Some(0),
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(),
Expand Down
197 changes: 162 additions & 35 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ impl ManifestList {
pub fn parse_with_version(
bs: &[u8],
version: FormatVersion,
partition_type: &StructType,
partition_types: &HashMap<i32, StructType>,
) -> Result<ManifestList, Error> {
match version {
FormatVersion::V1 => {
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?;
let values = Value::Array(reader.collect::<Result<Vec<Value>, _>>()?);
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type)
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_types)
}
FormatVersion::V2 => {
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2, bs)?;
let values = Value::Array(reader.collect::<Result<Vec<Value>, _>>()?);
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type)
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_types)
}
}
}
Expand Down Expand Up @@ -657,6 +657,8 @@ pub struct FieldSummary {
/// and then converted into the [ManifestListEntry] struct. Serialization works the other way around.
/// [ManifestListEntryV1] and [ManifestListEntryV2] are internal struct that are only used for serialization and deserialization.
pub(super) mod _serde {
use std::collections::HashMap;

pub use serde_bytes::ByteBuf;
use serde_derive::{Deserialize, Serialize};

Expand All @@ -682,12 +684,26 @@ pub(super) mod _serde {
impl ManifestListV2 {
/// Converts the [ManifestListV2] into a [ManifestList].
/// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait.
pub fn try_into(self, partition_type: &StructType) -> Result<super::ManifestList, Error> {
pub fn try_into(
self,
partition_types: &HashMap<i32, StructType>,
) -> Result<super::ManifestList, Error> {
Ok(super::ManifestList {
entries: self
.entries
.into_iter()
.map(|v| v.try_into(partition_type))
.map(|v| {
let partition_spec_id = v.partition_spec_id;
let manifest_path = v.manifest_path.clone();
v.try_into(partition_types.get(&partition_spec_id))
.map_err(|err| {
err.with_context("manifest file path", manifest_path)
.with_context(
"partition spec id",
partition_spec_id.to_string(),
)
})
})
.collect::<Result<Vec<_>, _>>()?,
})
}
Expand All @@ -710,12 +726,26 @@ pub(super) mod _serde {
impl ManifestListV1 {
/// Converts the [ManifestListV1] into a [ManifestList].
/// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait.
pub fn try_into(self, partition_type: &StructType) -> Result<super::ManifestList, Error> {
pub fn try_into(
self,
partition_types: &HashMap<i32, StructType>,
) -> Result<super::ManifestList, Error> {
Ok(super::ManifestList {
entries: self
.entries
.into_iter()
.map(|v| v.try_into(partition_type))
.map(|v| {
let partition_spec_id = v.partition_spec_id;
let manifest_path = v.manifest_path.clone();
v.try_into(partition_types.get(&partition_spec_id))
.map_err(|err| {
err.with_context("manifest file path", manifest_path)
.with_context(
"partition spec id",
partition_spec_id.to_string(),
)
})
})
.collect::<Result<Vec<_>, _>>()?,
})
}
Expand Down Expand Up @@ -800,10 +830,10 @@ pub(super) mod _serde {

fn try_convert_to_field_summary(
partitions: Option<Vec<FieldSummary>>,
partition_type: &StructType,
partition_type: Option<&StructType>,
) -> Result<Vec<super::FieldSummary>, Error> {
Ok(partitions
.map(|partitions| {
if let Some(partitions) = partitions {
if let Some(partition_type) = partition_type {
let partition_types = partition_type.fields();
if partitions.len() != partition_types.len() {
return Err(Error::new(
Expand All @@ -820,15 +850,24 @@ pub(super) mod _serde {
.zip(partition_types)
.map(|(v, field)| v.try_into(&field.field_type))
.collect::<Result<Vec<_>, _>>()
})
.transpose()?
.unwrap_or_default())
} else {
Err(Error::new(
crate::ErrorKind::DataInvalid,
"Invalid partition spec. Partition type is required",
))
}
} else {
Ok(Vec::new())
}
}

impl ManifestListEntryV2 {
/// Converts the [ManifestListEntryV2] into a [ManifestListEntry].
/// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait.
pub fn try_into(self, partition_type: &StructType) -> Result<ManifestListEntry, Error> {
pub fn try_into(
self,
partition_type: Option<&StructType>,
) -> Result<ManifestListEntry, Error> {
let partitions = try_convert_to_field_summary(self.partitions, partition_type)?;
Ok(ManifestListEntry {
manifest_path: self.manifest_path,
Expand All @@ -853,7 +892,10 @@ pub(super) mod _serde {
impl ManifestListEntryV1 {
/// Converts the [ManifestListEntryV1] into a [ManifestListEntry].
/// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait.
pub fn try_into(self, partition_type: &StructType) -> Result<ManifestListEntry, Error> {
pub fn try_into(
self,
partition_type: Option<&StructType>,
) -> Result<ManifestListEntry, Error> {
let partitions = try_convert_to_field_summary(self.partitions, partition_type)?;
Ok(ManifestListEntry {
manifest_path: self.manifest_path,
Expand Down Expand Up @@ -1032,7 +1074,7 @@ pub(super) mod _serde {

#[cfg(test)]
mod test {
use std::{fs, sync::Arc};
use std::{collections::HashMap, fs, sync::Arc};

use tempfile::TempDir;

Expand Down Expand Up @@ -1090,12 +1132,9 @@ mod test {

let bs = fs::read(full_path).expect("read_file must succeed");

let parsed_manifest_list = ManifestList::parse_with_version(
&bs,
crate::spec::FormatVersion::V1,
&StructType::new(vec![]),
)
.unwrap();
let parsed_manifest_list =
ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1, &HashMap::new())
.unwrap();

assert_eq!(manifest_list, parsed_manifest_list);
}
Expand All @@ -1120,6 +1159,23 @@ mod test {
deleted_rows_count: Some(0),
partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}],
key_metadata: vec![],
},
ManifestListEntry {
manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
manifest_length: 6926,
partition_spec_id: 2,
content: ManifestContentType::Data,
sequence_number: 1,
min_sequence_number: 1,
added_snapshot_id: 377075049360453639,
added_data_files_count: Some(1),
existing_data_files_count: Some(0),
deleted_data_files_count: Some(0),
added_rows_count: Some(3),
existing_rows_count: Some(0),
deleted_rows_count: Some(0),
partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::float(1.1)), upper_bound: Some(Literal::float(2.1))}],
key_metadata: vec![],
}
]
};
Expand Down Expand Up @@ -1147,11 +1203,24 @@ mod test {
let parsed_manifest_list = ManifestList::parse_with_version(
&bs,
crate::spec::FormatVersion::V2,
&StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
&HashMap::from([
(
1,
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
),
(
2,
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Float),
))]),
),
]),
)
.unwrap();

Expand Down Expand Up @@ -1251,11 +1320,14 @@ mod test {
let manifest_list = ManifestList::parse_with_version(
&bs,
crate::spec::FormatVersion::V1,
&StructType::new(vec![Arc::new(NestedField::required(
&HashMap::from([(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
)]),
)
.unwrap();
assert_eq!(manifest_list, expected_manifest_list);
Expand Down Expand Up @@ -1302,11 +1374,14 @@ mod test {
let manifest_list = ManifestList::parse_with_version(
&bs,
crate::spec::FormatVersion::V2,
&StructType::new(vec![Arc::new(NestedField::required(
&HashMap::from([(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
)]),
)
.unwrap();
expected_manifest_list.entries[0].sequence_number = seq_num;
Expand All @@ -1315,4 +1390,56 @@ mod test {

temp_dir.close().unwrap();
}

#[tokio::test]
async fn test_manifest_list_writer_v1_as_v2() {
let expected_manifest_list = ManifestList {
entries: vec![ManifestListEntry {
manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
manifest_length: 5806,
partition_spec_id: 1,
content: ManifestContentType::Data,
sequence_number: 0,
min_sequence_number: 0,
added_snapshot_id: 1646658105718557341,
added_data_files_count: Some(3),
existing_data_files_count: Some(0),
deleted_data_files_count: Some(0),
added_rows_count: Some(3),
existing_rows_count: Some(0),
deleted_rows_count: Some(0),
partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}],
key_metadata: vec![],
}]
};

let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("manifest_list_v1.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();

let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1);
writer
.add_manifest_entries(expected_manifest_list.entries.clone().into_iter())
.unwrap();
writer.close().await.unwrap();

let bs = fs::read(path).unwrap();
let manifest_list = ManifestList::parse_with_version(
&bs,
crate::spec::FormatVersion::V2,
&HashMap::from([(
1,
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
)]),
)
.unwrap();
assert_eq!(manifest_list, expected_manifest_list);

temp_dir.close().unwrap();
}
}

0 comments on commit 7d06a85

Please sign in to comment.