From 50ea8246b105d65018e24fb6afc8b80514f4127e Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 6 Sep 2024 11:58:46 +0800 Subject: [PATCH 1/5] feat(spec): Impl the ManifestFile read functionality --- crates/paimon/Cargo.toml | 1 + crates/paimon/src/spec/data_file.rs | 74 +++++- crates/paimon/src/spec/manifest_entry.rs | 154 ++++++++++++ crates/paimon/src/spec/manifest_file_meta.rs | 59 +---- crates/paimon/src/spec/manifest_list.rs | 111 --------- crates/paimon/src/spec/mod.rs | 7 +- crates/paimon/src/spec/objects_file.rs | 224 ++++++++++++++++++ crates/paimon/src/spec/stats.rs | 77 ++++++ ...est-8ded1f09-fcda-489e-9167-582ac0f9f846-0 | Bin 0 -> 1787 bytes 9 files changed, 524 insertions(+), 183 deletions(-) create mode 100644 crates/paimon/src/spec/manifest_entry.rs delete mode 100644 crates/paimon/src/spec/manifest_list.rs create mode 100644 crates/paimon/src/spec/objects_file.rs create mode 100644 crates/paimon/src/spec/stats.rs create mode 100644 crates/paimon/tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0 diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 959475f..ceb6100 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -44,6 +44,7 @@ serde = { version = "1", features = ["derive"] } serde_bytes = "0.11.15" serde_json = "1.0.120" serde_with = "3.9.0" +serde_repr = "0.1" snafu = "0.8.3" typed-builder = "^0.19" opendal = { version = "0.49", features = ["services-fs"] } diff --git a/crates/paimon/src/spec/data_file.rs b/crates/paimon/src/spec/data_file.rs index 37165e6..124b38c 100644 --- a/crates/paimon/src/spec/data_file.rs +++ b/crates/paimon/src/spec/data_file.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use crate::spec::stats::BinaryTableStats; use crate::spec::RowType; +use chrono::serde::ts_milliseconds::deserialize as from_millis; +use chrono::serde::ts_milliseconds::serialize as to_millis; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; @@ -48,12 +51,6 @@ impl BinaryRow { } } -/// TODO: implement me. -/// The statistics for columns, supports the following stats. -/// -/// Impl References: -type SimpleStats = (); - /// The Source of a file. /// TODO: move me to the manifest module. /// @@ -72,25 +69,43 @@ pub enum FileSource { #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct DataFileMeta { + #[serde(rename = "_FILE_NAME")] pub file_name: String, + #[serde(rename = "_FILE_SIZE")] pub file_size: i64, // row_count tells the total number of rows (including add & delete) in this file. + #[serde(rename = "_ROW_COUNT")] pub row_count: i64, - pub min_key: BinaryRow, - pub max_key: BinaryRow, - pub key_stats: SimpleStats, - pub value_stats: SimpleStats, + #[serde(rename = "_MIN_KEY", with = "serde_bytes")] + pub min_key: Vec, + #[serde(rename = "_MAX_KEY", with = "serde_bytes")] + pub max_key: Vec, + #[serde(rename = "_KEY_STATS")] + pub key_stats: BinaryTableStats, + #[serde(rename = "_VALUE_STATS")] + pub value_stats: BinaryTableStats, + #[serde(rename = "_MIN_SEQUENCE_NUMBER")] pub min_sequence_number: i64, + #[serde(rename = "_MAX_SEQUENCE_NUMBER")] pub max_sequence_number: i64, + #[serde(rename = "_SCHEMA_ID")] pub schema_id: i64, + #[serde(rename = "_LEVEL")] pub level: i32, + #[serde(rename = "_EXTRA_FILES")] pub extra_files: Vec, + #[serde( + rename = "_CREATION_TIME", + serialize_with = "to_millis", + deserialize_with = "from_millis" + )] pub creation_time: DateTime, + #[serde(rename = "_DELETE_ROW_COUNT")] // rowCount = add_row_count + delete_row_count. pub delete_row_count: Option, // file index filter bytes, if it is small, store in data file meta + #[serde(rename = "_EMBEDDED_FILE_INDEX", with = "serde_bytes")] pub embedded_index: Option>, - pub file_source: Option, } impl Display for DataFileMeta { @@ -99,7 +114,44 @@ impl Display for DataFileMeta { } } +#[allow(clippy::too_many_arguments)] impl DataFileMeta { // TODO: implement me pub const SCHEMA: RowType = RowType::new(vec![]); + + pub fn new( + file_name: String, + file_size: i64, + row_count: i64, + min_key: Vec, + max_key: Vec, + key_stats: BinaryTableStats, + value_stats: BinaryTableStats, + min_sequence_number: i64, + max_sequence_number: i64, + schema_id: i64, + level: i32, + extra_files: Vec, + creation_time: DateTime, + delete_row_count: Option, + embedded_index: Option>, + ) -> Self { + DataFileMeta { + file_name, + file_size, + row_count, + min_key, + max_key, + key_stats, + value_stats, + min_sequence_number, + max_sequence_number, + schema_id, + level, + extra_files, + creation_time, + delete_row_count, + embedded_index, + } + } } diff --git a/crates/paimon/src/spec/manifest_entry.rs b/crates/paimon/src/spec/manifest_entry.rs new file mode 100644 index 0000000..be2989a --- /dev/null +++ b/crates/paimon/src/spec/manifest_entry.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::spec::DataFileMeta; +use serde::Deserialize; +use serde_repr::{Deserialize_repr, Serialize_repr}; +use serde_with::serde_derive::Serialize; + +/// Entry representing a file. +/// +/// Impl Reference: +#[allow(dead_code)] +pub trait FileEntry { + fn kind(&self) -> &FileKind; + + fn partition(&self) -> &Vec; + + fn bucket(&self) -> i32; + + fn level(&self) -> i32; + + fn file_name(&self) -> &str; + + fn min_key(&self) -> &Vec; + + fn max_key(&self) -> &Vec; + + fn identifier(&self) -> Identifier; +} + +/// The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data file. +/// +/// Impl Reference: +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Identifier { + pub partition: Vec, + pub bucket: i32, + pub level: i32, + pub file_name: String, +} +/// Kind of a file. +/// Impl Reference: +#[derive(PartialEq, Eq, Debug, Clone, Serialize_repr, Deserialize_repr)] +#[repr(u8)] +pub enum FileKind { + Add = 0, + Delete = 1, +} + +/// Entry of a manifest file, representing an addition / deletion of a data file. +/// Impl Reference: +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ManifestEntry { + #[serde(rename = "_KIND")] + kind: FileKind, + + #[serde(rename = "_PARTITION", with = "serde_bytes")] + partition: Vec, + + #[serde(rename = "_BUCKET")] + bucket: i32, + + #[serde(rename = "_TOTAL_BUCKETS")] + total_buckets: i32, + + #[serde(rename = "_FILE")] + file: DataFileMeta, + + #[serde(rename = "_VERSION")] + version: i32, +} + +#[allow(dead_code)] +impl FileEntry for ManifestEntry { + fn kind(&self) -> &FileKind { + &self.kind + } + + fn partition(&self) -> &Vec { + &self.partition + } + + fn bucket(&self) -> i32 { + self.bucket + } + + fn level(&self) -> i32 { + self.file.level + } + + fn file_name(&self) -> &str { + &self.file.file_name + } + + fn min_key(&self) -> &Vec { + &self.file.min_key + } + + fn max_key(&self) -> &Vec { + &self.file.max_key + } + + fn identifier(&self) -> Identifier { + Identifier { + partition: self.partition.clone(), + bucket: self.bucket, + level: self.file.level, + file_name: self.file.file_name.clone(), + } + } +} + +#[allow(dead_code)] +impl ManifestEntry { + pub fn total_buckets(&self) -> i32 { + self.total_buckets + } + + pub fn file(&self) -> &DataFileMeta { + &self.file + } + + pub fn new( + kind: FileKind, + partition: Vec, + bucket: i32, + total_buckets: i32, + file: DataFileMeta, + version: i32, + ) -> Self { + ManifestEntry { + kind, + partition, + bucket, + total_buckets, + file, + version, + } + } +} diff --git a/crates/paimon/src/spec/manifest_file_meta.rs b/crates/paimon/src/spec/manifest_file_meta.rs index 382d579..36f92b9 100644 --- a/crates/paimon/src/spec/manifest_file_meta.rs +++ b/crates/paimon/src/spec/manifest_file_meta.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::spec::stats::BinaryTableStats; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; @@ -128,61 +129,3 @@ impl Display for ManifestFileMeta { ) } } - -/// The statistics for columns, supports the following stats. -/// -/// All statistics are stored in the form of a Binary, which can significantly reduce its memory consumption, but the cost is that the column type needs to be known when getting. -/// -/// Impl Reference: -#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] -pub struct BinaryTableStats { - /// the minimum values of the columns - #[serde(rename = "_MIN_VALUES", with = "serde_bytes")] - min_values: Vec, - - /// the maximum values of the columns - #[serde(rename = "_MAX_VALUES", with = "serde_bytes")] - max_values: Vec, - - /// the number of nulls of the columns - #[serde(rename = "_NULL_COUNTS")] - null_counts: Vec, -} - -impl BinaryTableStats { - /// Get the minimum values of the columns - #[inline] - pub fn min_values(&self) -> &[u8] { - &self.min_values - } - - /// Get the maximum values of the columns - #[inline] - pub fn max_values(&self) -> &[u8] { - &self.max_values - } - - /// Get the number of nulls of the columns - #[inline] - pub fn null_counts(&self) -> &Vec { - &self.null_counts - } - - pub fn new( - min_values: Vec, - max_values: Vec, - null_counts: Vec, - ) -> BinaryTableStats { - Self { - min_values, - max_values, - null_counts, - } - } -} - -impl Display for BinaryTableStats { - fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result { - todo!() - } -} diff --git a/crates/paimon/src/spec/manifest_list.rs b/crates/paimon/src/spec/manifest_list.rs deleted file mode 100644 index 2cffd5c..0000000 --- a/crates/paimon/src/spec/manifest_list.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use super::manifest_file_meta::ManifestFileMeta; -use crate::io::FileIO; -use crate::{Error, Result}; -use apache_avro::types::Value; -use apache_avro::{from_value, Reader}; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(transparent)] -/// This file includes several [`ManifestFileMeta`], representing all data of the whole table at the corresponding snapshot. -pub struct ManifestList { - entries: Vec, -} - -impl ManifestList { - pub fn entries(&self) -> &Vec { - &self.entries - } - - pub fn from_avro_bytes(bytes: &[u8]) -> Result { - let reader = Reader::new(bytes).map_err(Error::from)?; - let records = reader - .collect::, _>>() - .map_err(Error::from)?; - let values = Value::Array(records); - from_value::(&values).map_err(Error::from) - } -} - -pub struct ManifestListFactory { - file_io: FileIO, -} - -/// The factory to read and write [`ManifestList`] -impl ManifestListFactory { - pub fn new(file_io: FileIO) -> ManifestListFactory { - Self { file_io } - } - - /// Write several [`ManifestFileMeta`]s into a manifest list. - /// - /// NOTE: This method is atomic. - pub fn write(&mut self, _metas: Vec) -> &str { - todo!() - } - - /// Read [`ManifestList`] from the manifest file. - pub async fn read(&self, path: &str) -> Result { - let bs = self.file_io.new_input(path)?.read().await?; - // todo support other formats - ManifestList::from_avro_bytes(bs.as_ref()) - } -} - -#[cfg(test)] -mod tests { - use crate::spec::{BinaryTableStats, ManifestFileMeta, ManifestList}; - - #[tokio::test] - async fn test_read_manifest_list() { - let workdir = - std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}")); - let path = workdir - .join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0"); - let v = std::fs::read(path.to_str().unwrap()).unwrap(); - let res = ManifestList::from_avro_bytes(&v).unwrap(); - let value_bytes = vec![ - 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, - ]; - assert_eq!( - res, - ManifestList { - entries: vec![ - ManifestFileMeta::new( - "manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(), - 10, - 10, - 10, - BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), - 1 - ), - ManifestFileMeta::new( - "manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(), - 11, - 0, - 10, - BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), - 2 - ) - ], - } - ); - } -} diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index 80fb47d..59c7d00 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -34,8 +34,9 @@ pub use snapshot::*; mod manifest_file_meta; pub use manifest_file_meta::*; -mod manifest_list; -pub use manifest_list::*; - +mod manifest_entry; +mod objects_file; +mod stats; mod types; + pub use types::*; diff --git a/crates/paimon/src/spec/objects_file.rs b/crates/paimon/src/spec/objects_file.rs new file mode 100644 index 0000000..dabec24 --- /dev/null +++ b/crates/paimon/src/spec/objects_file.rs @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::io::FileIO; +use crate::spec::manifest_entry::ManifestEntry; +use crate::spec::ManifestFileMeta; +use crate::Error; +use apache_avro::types::Value; +use apache_avro::{from_value, Reader}; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +/// This file includes several [T] +pub struct ObjectsFile { + entries: Vec, +} + +#[allow(dead_code)] +impl ObjectsFile +where + T: DeserializeOwned, +{ + pub fn entries(&self) -> &Vec { + &self.entries + } + + pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result> { + let reader = Reader::new(bytes).map_err(Error::from)?; + let records = reader + .collect::, _>>() + .map_err(Error::from)?; + let values = Value::Array(records); + from_value::>(&values).map_err(Error::from) + } +} + +pub struct ObjectsFileFactory { + file_io: FileIO, +} + +/// The factory to read and write [`ObjectsFile`] +#[allow(dead_code)] +impl ObjectsFileFactory { + pub fn new(file_io: FileIO) -> ObjectsFileFactory { + Self { file_io } + } + + /// Write several [`ManifestFileMeta`]s into a manifest list. + /// + /// NOTE: This method is atomic. + pub fn write(&mut self, _metas: Vec) -> &str { + todo!() + } + + /// Read [`ManifestList`] from the manifest file. + pub async fn read_manifest_file_meta( + &self, + path: &str, + ) -> crate::Result> { + let bs = self.file_io.new_input(path)?.read().await?; + // todo support other formats + ObjectsFile::::from_avro_bytes(bs.as_ref()) + } + + /// Read [`ManifestEntry`] from the manifest file. + pub async fn read_manifest_entry( + &self, + path: &str, + ) -> crate::Result> { + let bs = self.file_io.new_input(path)?.read().await?; + // todo support other formats + ObjectsFile::::from_avro_bytes(bs.as_ref()) + } +} + +#[cfg(test)] +mod tests { + use crate::spec::manifest_entry::{FileKind, ManifestEntry}; + use crate::spec::objects_file::ObjectsFile; + use crate::spec::stats::BinaryTableStats; + use crate::spec::{DataFileMeta, ManifestFileMeta}; + use chrono::{DateTime, Utc}; + + #[tokio::test] + async fn test_read_manifest_list() { + let workdir = + std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}")); + let path = workdir + .join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0"); + let v = std::fs::read(path.to_str().unwrap()).unwrap(); + let res = ObjectsFile::::from_avro_bytes(&v).unwrap(); + let value_bytes = vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, + ]; + assert_eq!( + res, + ObjectsFile { + entries: vec![ + ManifestFileMeta::new( + "manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(), + 10, + 10, + 10, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), + 1 + ), + ManifestFileMeta::new( + "manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(), + 11, + 0, + 10, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), + 2 + ) + ], + } + ); + } + + #[tokio::test] + async fn test_read_manifest_entry() { + let workdir = + std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}")); + let path = + workdir.join("tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0"); + let v = std::fs::read(path.to_str().unwrap()).unwrap(); + let res = ObjectsFile::::from_avro_bytes(&v).unwrap(); + let value_bytes = vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, 1, 0, 0, 0, 0, 0, 0, 0, + ]; + let single_value = vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]; + assert_eq!( + res, + ObjectsFile { + entries: vec![ + ManifestEntry::new( + FileKind::Delete, + single_value.clone(), + 1, + 10, + DataFileMeta::new( + "f1.parquet".to_string(), + 10, + 100, + single_value.clone(), + single_value.clone(), + BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + 1, + 100, + 0, + 1, + vec![], + "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + Some(0), + None, + ), + 2 + ), + ManifestEntry::new( + FileKind::Add, + single_value.clone(), + 2, + 10, + DataFileMeta::new( + "f2.parquet".to_string(), + 10, + 100, + single_value.clone(), + single_value.clone(), + BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + 1, + 100, + 0, + 1, + vec![], + "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + Some(1), + None, + ), + 2 + ), + ] + } + ) + } +} diff --git a/crates/paimon/src/spec/stats.rs b/crates/paimon/src/spec/stats.rs new file mode 100644 index 0000000..98923ce --- /dev/null +++ b/crates/paimon/src/spec/stats.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Deserialize, Serialize}; +use std::fmt::{Display, Formatter}; + +/// The statistics for columns, supports the following stats. +/// +/// All statistics are stored in the form of a Binary, which can significantly reduce its memory consumption, but the cost is that the column type needs to be known when getting. +/// +/// Impl Reference: +#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] +pub struct BinaryTableStats { + /// the minimum values of the columns + #[serde(rename = "_MIN_VALUES", with = "serde_bytes")] + min_values: Vec, + + /// the maximum values of the columns + #[serde(rename = "_MAX_VALUES", with = "serde_bytes")] + max_values: Vec, + + /// the number of nulls of the columns + #[serde(rename = "_NULL_COUNTS")] + null_counts: Vec, +} + +impl BinaryTableStats { + /// Get the minimum values of the columns + #[inline] + pub fn min_values(&self) -> &[u8] { + &self.min_values + } + + /// Get the maximum values of the columns + #[inline] + pub fn max_values(&self) -> &[u8] { + &self.max_values + } + + /// Get the number of nulls of the columns + #[inline] + pub fn null_counts(&self) -> &Vec { + &self.null_counts + } + + pub fn new( + min_values: Vec, + max_values: Vec, + null_counts: Vec, + ) -> BinaryTableStats { + Self { + min_values, + max_values, + null_counts, + } + } +} + +impl Display for BinaryTableStats { + fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} diff --git a/crates/paimon/tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0 b/crates/paimon/tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0 new file mode 100644 index 0000000000000000000000000000000000000000..57a7ef44bdbd3f0091b686221da3a3496518e16c GIT binary patch literal 1787 zcmds2L5tHs6rRK!EV$r8mAwcu2NA4=dXjo++Kw2SrkbX;yUQ}nrnB8FNt#Vkp)5TJ z;?=8vLGb9oljl8o^WtSMLP7iiUR<0cZIU)EU9Un8&Ae~E_uiN9y|xZdIN>&VU3d_)?`q1se0!Z&_Rg<&@g2S;-y#_yZ5gD02EC)j|S#gvyYgkm=U*zaH z<%VgBoPwk7V~YPzqTma+(TfgJYRvteW)-Q4C$p|)mb^N;U3-R_B9>AMBf}`8E|Qas z%j*X9G*t|THZe{7<(AA6k865g)l?(~3khpdYO*sX2OG*|0z3R;hT$0olycX>>+N0l4#=~sJTF{U0avY@b1ck%`E&8(bDw8W bq5Ba4ft!Lyms1cuTT_7txwjAAEEn<@4XHLQ literal 0 HcmV?d00001 From 3335114317b761280611fda8cffd4bdda4dd88b1 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 6 Sep 2024 12:44:41 +0800 Subject: [PATCH 2/5] fix comments --- crates/paimon/src/spec/data_file.rs | 44 +---- crates/paimon/src/spec/objects_file.rs | 226 +++++++++++-------------- 2 files changed, 103 insertions(+), 167 deletions(-) diff --git a/crates/paimon/src/spec/data_file.rs b/crates/paimon/src/spec/data_file.rs index 124b38c..17fab24 100644 --- a/crates/paimon/src/spec/data_file.rs +++ b/crates/paimon/src/spec/data_file.rs @@ -16,7 +16,6 @@ // under the License. use crate::spec::stats::BinaryTableStats; -use crate::spec::RowType; use chrono::serde::ts_milliseconds::deserialize as from_millis; use chrono::serde::ts_milliseconds::serialize as to_millis; use chrono::{DateTime, Utc}; @@ -114,44 +113,5 @@ impl Display for DataFileMeta { } } -#[allow(clippy::too_many_arguments)] -impl DataFileMeta { - // TODO: implement me - pub const SCHEMA: RowType = RowType::new(vec![]); - - pub fn new( - file_name: String, - file_size: i64, - row_count: i64, - min_key: Vec, - max_key: Vec, - key_stats: BinaryTableStats, - value_stats: BinaryTableStats, - min_sequence_number: i64, - max_sequence_number: i64, - schema_id: i64, - level: i32, - extra_files: Vec, - creation_time: DateTime, - delete_row_count: Option, - embedded_index: Option>, - ) -> Self { - DataFileMeta { - file_name, - file_size, - row_count, - min_key, - max_key, - key_stats, - value_stats, - min_sequence_number, - max_sequence_number, - schema_id, - level, - extra_files, - creation_time, - delete_row_count, - embedded_index, - } - } -} +#[allow(dead_code)] +impl DataFileMeta {} diff --git a/crates/paimon/src/spec/objects_file.rs b/crates/paimon/src/spec/objects_file.rs index dabec24..dd0bf48 100644 --- a/crates/paimon/src/spec/objects_file.rs +++ b/crates/paimon/src/spec/objects_file.rs @@ -22,32 +22,14 @@ use crate::Error; use apache_avro::types::Value; use apache_avro::{from_value, Reader}; use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(transparent)] -/// This file includes several [T] -pub struct ObjectsFile { - entries: Vec, -} - -#[allow(dead_code)] -impl ObjectsFile -where - T: DeserializeOwned, -{ - pub fn entries(&self) -> &Vec { - &self.entries - } - - pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result> { - let reader = Reader::new(bytes).map_err(Error::from)?; - let records = reader - .collect::, _>>() - .map_err(Error::from)?; - let values = Value::Array(records); - from_value::>(&values).map_err(Error::from) - } +pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result> { + let reader = Reader::new(bytes).map_err(Error::from)?; + let records = reader + .collect::, _>>() + .map_err(Error::from)?; + let values = Value::Array(records); + from_value::>(&values).map_err(Error::from) } pub struct ObjectsFileFactory { @@ -72,27 +54,24 @@ impl ObjectsFileFactory { pub async fn read_manifest_file_meta( &self, path: &str, - ) -> crate::Result> { + ) -> crate::Result> { let bs = self.file_io.new_input(path)?.read().await?; // todo support other formats - ObjectsFile::::from_avro_bytes(bs.as_ref()) + from_avro_bytes::(bs.as_ref()) } /// Read [`ManifestEntry`] from the manifest file. - pub async fn read_manifest_entry( - &self, - path: &str, - ) -> crate::Result> { + pub async fn read_manifest_entry(&self, path: &str) -> crate::Result> { let bs = self.file_io.new_input(path)?.read().await?; // todo support other formats - ObjectsFile::::from_avro_bytes(bs.as_ref()) + from_avro_bytes::(bs.as_ref()) } } #[cfg(test)] mod tests { use crate::spec::manifest_entry::{FileKind, ManifestEntry}; - use crate::spec::objects_file::ObjectsFile; + use crate::spec::objects_file::from_avro_bytes; use crate::spec::stats::BinaryTableStats; use crate::spec::{DataFileMeta, ManifestFileMeta}; use chrono::{DateTime, Utc}; @@ -104,32 +83,30 @@ mod tests { let path = workdir .join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0"); let v = std::fs::read(path.to_str().unwrap()).unwrap(); - let res = ObjectsFile::::from_avro_bytes(&v).unwrap(); + let res = from_avro_bytes::(&v).unwrap(); let value_bytes = vec![ 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, ]; assert_eq!( res, - ObjectsFile { - entries: vec![ - ManifestFileMeta::new( - "manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(), - 10, - 10, - 10, - BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), - 1 - ), - ManifestFileMeta::new( - "manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(), - 11, - 0, - 10, - BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), - 2 - ) - ], - } + vec![ + ManifestFileMeta::new( + "manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(), + 10, + 10, + 10, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), + 1 + ), + ManifestFileMeta::new( + "manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(), + 11, + 0, + 10, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), + 2 + ) + ], ); } @@ -140,85 +117,84 @@ mod tests { let path = workdir.join("tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0"); let v = std::fs::read(path.to_str().unwrap()).unwrap(); - let res = ObjectsFile::::from_avro_bytes(&v).unwrap(); + let res = from_avro_bytes::(&v).unwrap(); let value_bytes = vec![ 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, 1, 0, 0, 0, 0, 0, 0, 0, ]; let single_value = vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]; assert_eq!( res, - ObjectsFile { - entries: vec![ - ManifestEntry::new( - FileKind::Delete, - single_value.clone(), - 1, - 10, - DataFileMeta::new( - "f1.parquet".to_string(), - 10, - 100, - single_value.clone(), - single_value.clone(), - BinaryTableStats::new( - value_bytes.clone(), - value_bytes.clone(), - vec![1, 2] - ), - BinaryTableStats::new( - value_bytes.clone(), - value_bytes.clone(), - vec![1, 2] - ), - 1, - 100, - 0, - 1, - vec![], - "2024-09-06T07:45:55.039+00:00" - .parse::>() - .unwrap(), - Some(0), - None, + vec![ + ManifestEntry::new( + FileKind::Delete, + single_value.clone(), + 1, + 10, + DataFileMeta { + file_name: "f1.parquet".to_string(), + + file_size: 10, + row_count: 100, + min_key: single_value.clone(), + max_key: single_value.clone(), + key_stats: BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + value_stats: BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + min_sequence_number: 1, + max_sequence_number: 100, + schema_id: 0, + level: 1, + extra_files: vec![], + creation_time: "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + delete_row_count: Some(0), + embedded_index: None, + }, + 2 + ), + ManifestEntry::new( + FileKind::Add, + single_value.clone(), + 2, + 10, + DataFileMeta { + file_name: "f2.parquet".to_string(), + file_size: 10, + row_count: 100, + min_key: single_value.clone(), + max_key: single_value.clone(), + key_stats: BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] ), - 2 - ), - ManifestEntry::new( - FileKind::Add, - single_value.clone(), - 2, - 10, - DataFileMeta::new( - "f2.parquet".to_string(), - 10, - 100, - single_value.clone(), - single_value.clone(), - BinaryTableStats::new( - value_bytes.clone(), - value_bytes.clone(), - vec![1, 2] - ), - BinaryTableStats::new( - value_bytes.clone(), - value_bytes.clone(), - vec![1, 2] - ), - 1, - 100, - 0, - 1, - vec![], - "2024-09-06T07:45:55.039+00:00" - .parse::>() - .unwrap(), - Some(1), - None, + value_stats: BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] ), - 2 - ), - ] - } + min_sequence_number: 1, + max_sequence_number: 100, + schema_id: 0, + level: 1, + extra_files: vec![], + creation_time: "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + delete_row_count: Some(1), + embedded_index: None, + }, + 2 + ), + ] ) } } From a16af9322d1e1437653d9128323d21754b0f015f Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 6 Sep 2024 12:46:31 +0800 Subject: [PATCH 3/5] remove ObjectsFileFactory --- crates/paimon/src/spec/objects_file.rs | 36 -------------------------- 1 file changed, 36 deletions(-) diff --git a/crates/paimon/src/spec/objects_file.rs b/crates/paimon/src/spec/objects_file.rs index dd0bf48..93b3a26 100644 --- a/crates/paimon/src/spec/objects_file.rs +++ b/crates/paimon/src/spec/objects_file.rs @@ -32,42 +32,6 @@ pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result>(&values).map_err(Error::from) } -pub struct ObjectsFileFactory { - file_io: FileIO, -} - -/// The factory to read and write [`ObjectsFile`] -#[allow(dead_code)] -impl ObjectsFileFactory { - pub fn new(file_io: FileIO) -> ObjectsFileFactory { - Self { file_io } - } - - /// Write several [`ManifestFileMeta`]s into a manifest list. - /// - /// NOTE: This method is atomic. - pub fn write(&mut self, _metas: Vec) -> &str { - todo!() - } - - /// Read [`ManifestList`] from the manifest file. - pub async fn read_manifest_file_meta( - &self, - path: &str, - ) -> crate::Result> { - let bs = self.file_io.new_input(path)?.read().await?; - // todo support other formats - from_avro_bytes::(bs.as_ref()) - } - - /// Read [`ManifestEntry`] from the manifest file. - pub async fn read_manifest_entry(&self, path: &str) -> crate::Result> { - let bs = self.file_io.new_input(path)?.read().await?; - // todo support other formats - from_avro_bytes::(bs.as_ref()) - } -} - #[cfg(test)] mod tests { use crate::spec::manifest_entry::{FileKind, ManifestEntry}; From 178d73112f175cf5387e99a4a93401c060023550 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 6 Sep 2024 12:47:30 +0800 Subject: [PATCH 4/5] fmt --- crates/paimon/src/spec/objects_file.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/paimon/src/spec/objects_file.rs b/crates/paimon/src/spec/objects_file.rs index 93b3a26..5135c32 100644 --- a/crates/paimon/src/spec/objects_file.rs +++ b/crates/paimon/src/spec/objects_file.rs @@ -15,14 +15,12 @@ // specific language governing permissions and limitations // under the License. -use crate::io::FileIO; -use crate::spec::manifest_entry::ManifestEntry; -use crate::spec::ManifestFileMeta; use crate::Error; use apache_avro::types::Value; use apache_avro::{from_value, Reader}; use serde::de::DeserializeOwned; +#[allow(dead_code)] pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result> { let reader = Reader::new(bytes).map_err(Error::from)?; let records = reader From ff724e2af151bf23cd7f033d3c566dcd46c12528 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 6 Sep 2024 12:51:00 +0800 Subject: [PATCH 5/5] remove FileEntry --- crates/paimon/src/spec/manifest_entry.rs | 28 ++---------------------- 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/crates/paimon/src/spec/manifest_entry.rs b/crates/paimon/src/spec/manifest_entry.rs index be2989a..848d19b 100644 --- a/crates/paimon/src/spec/manifest_entry.rs +++ b/crates/paimon/src/spec/manifest_entry.rs @@ -20,28 +20,6 @@ use serde::Deserialize; use serde_repr::{Deserialize_repr, Serialize_repr}; use serde_with::serde_derive::Serialize; -/// Entry representing a file. -/// -/// Impl Reference: -#[allow(dead_code)] -pub trait FileEntry { - fn kind(&self) -> &FileKind; - - fn partition(&self) -> &Vec; - - fn bucket(&self) -> i32; - - fn level(&self) -> i32; - - fn file_name(&self) -> &str; - - fn min_key(&self) -> &Vec; - - fn max_key(&self) -> &Vec; - - fn identifier(&self) -> Identifier; -} - /// The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data file. /// /// Impl Reference: @@ -52,6 +30,7 @@ pub struct Identifier { pub level: i32, pub file_name: String, } + /// Kind of a file. /// Impl Reference: #[derive(PartialEq, Eq, Debug, Clone, Serialize_repr, Deserialize_repr)] @@ -85,7 +64,7 @@ pub struct ManifestEntry { } #[allow(dead_code)] -impl FileEntry for ManifestEntry { +impl ManifestEntry { fn kind(&self) -> &FileKind { &self.kind } @@ -122,10 +101,7 @@ impl FileEntry for ManifestEntry { file_name: self.file.file_name.clone(), } } -} -#[allow(dead_code)] -impl ManifestEntry { pub fn total_buckets(&self) -> i32 { self.total_buckets }