Skip to content

Commit d7a4156

Browse files
committed
Get basic tests passing
1 parent dd3a80e commit d7a4156

File tree

2 files changed

+217
-23
lines changed

2 files changed

+217
-23
lines changed

parquet/src/file/page_index/index.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,41 @@ impl<T: ParquetValueType> NativeIndex<T> {
168168
boundary_order: index.boundary_order,
169169
})
170170
}
171+
172+
pub(crate) fn to_column_index(&self) -> ColumnIndex {
173+
let min_values = self
174+
.indexes
175+
.iter()
176+
.map(|x| x.min_bytes().map(|x| x.to_vec()))
177+
.collect::<Option<Vec<_>>>()
178+
.unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
179+
180+
let max_values = self
181+
.indexes
182+
.iter()
183+
.map(|x| x.max_bytes().map(|x| x.to_vec()))
184+
.collect::<Option<Vec<_>>>()
185+
.unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
186+
187+
let null_counts = self
188+
.indexes
189+
.iter()
190+
.map(|x| x.null_count())
191+
.collect::<Option<Vec<_>>>()
192+
.unwrap_or_else(|| vec![0; self.indexes.len()]);
193+
194+
ColumnIndex {
195+
min_values,
196+
max_values,
197+
null_pages: self
198+
.indexes
199+
.iter()
200+
.map(|x| x.min().is_none())
201+
.collect(),
202+
null_counts: Some(null_counts),
203+
boundary_order: self.boundary_order,
204+
}
205+
}
171206
}
172207

173208
#[cfg(test)]

parquet/src/file/writer.rs

Lines changed: 182 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
//! using row group writers and column writers respectively.
2020
2121
use crate::bloom_filter::Sbbf;
22+
use crate::data_type::private::ParquetValueType;
23+
use crate::file::page_index::index::{Index, NativeIndex};
2224
use crate::format as parquet;
2325
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
2426
use crate::thrift::TSerializable;
@@ -260,7 +262,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
260262
None => Some(self.kv_metadatas.clone()),
261263
};
262264

263-
ParquetMetadataEncoder::write(
265+
let mut encoder = ParquetMetadataWriter::new(
264266
&mut self.buf,
265267
&self.schema,
266268
&self.descr,
@@ -269,9 +271,10 @@ impl<W: Write + Send> SerializedFileWriter<W> {
269271
&self.column_indexes,
270272
&self.offset_indexes,
271273
&key_value_metadata,
272-
self.props.created_by(),
274+
Some(self.props.created_by().to_string()),
273275
self.props.writer_version(),
274-
)
276+
);
277+
encoder.finish()
275278
}
276279

277280
#[inline]
@@ -675,20 +678,20 @@ impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> {
675678
}
676679
}
677680

678-
pub struct ParquetMetadataEncoder<'a, W: Write> {
681+
struct ParquetMetadataWriter<'a, W: Write> {
679682
buf: &'a mut TrackedWrite<W>,
680683
schema: &'a TypePtr,
681684
schema_descr: &'a SchemaDescPtr,
682-
row_groups: &'a Vec<RowGroupMetaDataPtr>,
683-
bloom_filters: &'a Vec<Vec<Option<Sbbf>>>,
684-
column_indexes: &'a Vec<Vec<Option<ColumnIndex>>>,
685-
offset_indexes: &'a Vec<Vec<Option<OffsetIndex>>>,
685+
row_groups: &'a [RowGroupMetaDataPtr],
686+
bloom_filters: &'a [Vec<Option<Sbbf>>],
687+
column_indexes: &'a [Vec<Option<ColumnIndex>>],
688+
offset_indexes: &'a [Vec<Option<OffsetIndex>>],
686689
key_value_metadata: &'a Option<Vec<KeyValue>>,
687-
created_by: &'a str,
690+
created_by: Option<String>,
688691
writer_version: WriterVersion,
689692
}
690693

691-
impl<'a, W: Write> ParquetMetadataEncoder<'a, W> {
694+
impl<'a, W: Write> ParquetMetadataWriter<'a, W> {
692695
/// Serialize all the offset index to the file
693696
fn write_offset_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
694697
// iter row group
@@ -768,12 +771,11 @@ impl<'a, W: Write> ParquetMetadataEncoder<'a, W> {
768771
}
769772

770773
/// Assembles and writes metadata at the end of the file.
771-
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
774+
pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
772775
let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();
773776

774777
let mut row_groups = self
775778
.row_groups
776-
.as_slice()
777779
.iter()
778780
.map(|v| v.to_thrift())
779781
.collect::<Vec<_>>();
@@ -802,7 +804,7 @@ impl<'a, W: Write> ParquetMetadataEncoder<'a, W> {
802804
key_value_metadata: self.key_value_metadata.clone(),
803805
version: self.writer_version.as_num(),
804806
schema: types::to_thrift(self.schema.as_ref())?,
805-
created_by: Some(self.created_by.to_string()),
807+
created_by: self.created_by.clone(),
806808
column_orders,
807809
encryption_algorithm: None,
808810
footer_signing_key_metadata: None,
@@ -824,19 +826,19 @@ impl<'a, W: Write> ParquetMetadataEncoder<'a, W> {
824826
Ok(file_metadata)
825827
}
826828

827-
pub(self) fn write(
829+
pub(self) fn new(
828830
buf: &'a mut TrackedWrite<W>,
829831
schema: &'a TypePtr,
830832
schema_descr: &'a SchemaDescPtr,
831-
row_groups: &'a Vec<RowGroupMetaDataPtr>,
832-
bloom_filters: &'a Vec<Vec<Option<Sbbf>>>,
833-
column_indexes: &'a Vec<Vec<Option<ColumnIndex>>>,
834-
offset_indexes: &'a Vec<Vec<Option<OffsetIndex>>>,
833+
row_groups: &'a [RowGroupMetaDataPtr],
834+
bloom_filters: &'a [Vec<Option<Sbbf>>],
835+
column_indexes: &'a [Vec<Option<ColumnIndex>>],
836+
offset_indexes: &'a [Vec<Option<OffsetIndex>>],
835837
key_value_metadata: &'a Option<Vec<KeyValue>>,
836-
created_by: &'a str,
838+
created_by: Option<String>,
837839
writer_version: WriterVersion,
838-
) -> Result<parquet::FileMetaData> {
839-
let mut encoder = Self {
840+
) -> Self {
841+
Self {
840842
buf,
841843
schema,
842844
schema_descr,
@@ -847,25 +849,140 @@ impl<'a, W: Write> ParquetMetadataEncoder<'a, W> {
847849
key_value_metadata,
848850
created_by,
849851
writer_version,
852+
}
853+
}
854+
}
855+
856+
struct ParquetMetadataEncoderBuilderOpt<'a, W: Write> {
857+
buf: TrackedWrite<W>,
858+
write_page_index: bool,
859+
bloom_filters: Option<&'a [Vec<Option<Sbbf>>]>,
860+
metadata: &'a ParquetMetaData,
861+
}
862+
863+
impl <'a, W: Write> ParquetMetadataEncoderBuilderOpt<'a, W> {
864+
pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
865+
Self {
866+
buf: TrackedWrite::new(buf),
867+
write_page_index: true,
868+
bloom_filters: None,
869+
metadata,
870+
}
871+
}
872+
873+
fn write_page_index(&mut self, write_page_index: bool) -> &mut Self {
874+
self.write_page_index = write_page_index;
875+
self
876+
}
877+
878+
fn with_bloom_filters(&mut self, bloom_filters: &'a [Vec<Option<Sbbf>>]) -> &mut Self {
879+
self.bloom_filters = Some(bloom_filters);
880+
self
881+
}
882+
883+
fn finish(&mut self) -> Result<()> {
884+
let file_metadata = self.metadata.file_metadata();
885+
886+
let schema = Arc::new(file_metadata.schema().clone());
887+
let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
888+
let created_by = file_metadata.created_by().map(str::to_string);
889+
let writer_version = WriterVersion::PARQUET_2_0; // TODO: how can we get this from ParquetMetadata?
890+
891+
let row_groups = self.metadata.row_groups().iter().map(|rg| Arc::new(rg.clone())).collect::<Vec<_>>();
892+
893+
let key_value_metadata = file_metadata.key_value_metadata().cloned();
894+
895+
let column_indexes = self.convert_column_indexes();
896+
let offset_indexes = self.convert_offset_index();
897+
898+
// if the outer bloom filters is None, iterate over the row groups and create a Vec of None
899+
let bloom_filters = match self.bloom_filters {
900+
Some(bloom_filters) => bloom_filters,
901+
None => {
902+
&row_groups.iter().map(|rg| {
903+
vec![None; rg.columns().len()]
904+
}).collect::<Vec<_>>()
905+
}
850906
};
851-
encoder.write_metadata()
907+
908+
let mut encoder = ParquetMetadataWriter::new(
909+
&mut self.buf,
910+
&schema,
911+
&schema_descr,
912+
&row_groups,
913+
bloom_filters,
914+
&column_indexes,
915+
&offset_indexes,
916+
&key_value_metadata,
917+
created_by,
918+
writer_version,
919+
);
920+
encoder.finish()?;
921+
922+
Ok(())
923+
}
924+
925+
fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
926+
if let Some(row_group_column_indexes) = self.metadata.column_index() {
927+
self.metadata.row_groups().iter().enumerate().map(|(rg_idx, rg)| {
928+
let column_indexes = &row_group_column_indexes[rg_idx];
929+
column_indexes.iter().map(|column_index| {
930+
match column_index {
931+
Index::NONE => None,
932+
Index::BOOLEAN(column_index) => Some(column_index.to_column_index()),
933+
Index::BYTE_ARRAY(column_index) => Some(column_index.to_column_index()),
934+
Index::DOUBLE(column_index) => Some(column_index.to_column_index()),
935+
Index::FIXED_LEN_BYTE_ARRAY(column_index) => Some(column_index.to_column_index()),
936+
Index::FLOAT(column_index) => Some(column_index.to_column_index()),
937+
Index::INT32(column_index) => Some(column_index.to_column_index()),
938+
Index::INT64(column_index) => Some(column_index.to_column_index()),
939+
Index::INT96(column_index) => Some(column_index.to_column_index()),
940+
}
941+
}).collect()
942+
}).collect()
943+
} else {
944+
// make a None for each row group, for each column
945+
self.metadata.row_groups().iter().enumerate().map(|(rg_idx, rg)| {
946+
std::iter::repeat(None).take(rg.columns().len()).collect()
947+
}).collect()
948+
}
949+
}
950+
951+
fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
952+
if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
953+
self.metadata.row_groups().iter().enumerate().map(|(rg_idx, rg)| {
954+
let column_indexes = &row_group_offset_indexes[rg_idx];
955+
column_indexes.iter().map(|column_index| {
956+
Some(OffsetIndex::new(column_index.clone()))
957+
}).collect()
958+
}).collect()
959+
} else {
960+
// make a None for each row group, for each column
961+
self.metadata.row_groups().iter().enumerate().map(|(rg_idx, rg)| {
962+
std::iter::repeat(None).take(rg.columns().len()).collect()
963+
}).collect()
964+
}
852965
}
853966
}
854967

855968
#[cfg(test)]
856969
mod tests {
857970
use super::*;
858971

859-
use bytes::Bytes;
972+
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
973+
use arrow_schema::{DataType as ArrowDataType, Field, Schema};
974+
use bytes::{Buf, BufMut, Bytes, BytesMut};
860975
use std::fs::File;
861976

977+
use crate::arrow::ArrowWriter;
862978
use crate::basic::{
863979
ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
864980
};
865981
use crate::column::page::{Page, PageReader};
866982
use crate::column::reader::get_typed_column_reader;
867983
use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
868984
use crate::data_type::{BoolType, Int32Type};
985+
use crate::file::footer::decode_metadata;
869986
use crate::file::page_index::index::Index;
870987
use crate::file::properties::EnabledStatistics;
871988
use crate::file::serialized_reader::ReadOptionsBuilder;
@@ -1889,4 +2006,46 @@ mod tests {
18892006
let b_idx = &column_index[0][1];
18902007
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
18912008
}
2009+
2010+
fn get_test_metadata() -> ParquetMetaData {
2011+
let mut buf = BytesMut::new().writer();
2012+
let schema: Arc<Schema> = Arc::new(
2013+
Schema::new(vec![Field::new("a", ArrowDataType::Int32, true)]),
2014+
);
2015+
2016+
let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
2017+
2018+
let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
2019+
2020+
let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
2021+
writer.write(&batch);
2022+
writer.close().unwrap();
2023+
2024+
let data = buf.into_inner().freeze();
2025+
2026+
let reader = SerializedFileReader::new(data).unwrap();
2027+
let metadata = reader.metadata().clone();
2028+
metadata
2029+
}
2030+
2031+
#[test]
2032+
fn test_encode_parquet_metadata() {
2033+
let metadata = get_test_metadata();
2034+
2035+
let mut buf = BytesMut::new().writer();
2036+
{
2037+
let mut writer = ParquetMetadataEncoderBuilderOpt::new(
2038+
&mut buf,
2039+
&metadata,
2040+
);
2041+
writer.finish();
2042+
}
2043+
2044+
let data = buf.into_inner().freeze();
2045+
2046+
let decoded_metadata = decode_metadata(&data).unwrap();
2047+
2048+
// TODO: decode the metadata and check that it matches
2049+
// This requires implementing a metadata decoder
2050+
}
18922051
}

0 commit comments

Comments
 (0)