Skip to content

Commit 6346928

Browse files
author
Devdutt Shenoi
authored
fix: ignore corrupt parquet files (#1169)
first write to `.part` file, then rename to `.parquet` only on success
1 parent f5193cd commit 6346928

File tree

1 file changed

+24
-10
lines changed

1 file changed

+24
-10
lines changed

src/staging/streams.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,15 @@ use itertools::Itertools;
3434
use parquet::{
3535
arrow::ArrowWriter,
3636
basic::Encoding,
37-
file::properties::{WriterProperties, WriterPropertiesBuilder},
37+
file::{
38+
properties::{WriterProperties, WriterPropertiesBuilder},
39+
FOOTER_SIZE,
40+
},
3841
format::SortingColumn,
3942
schema::types::ColumnPath,
4043
};
4144
use rand::distributions::DistString;
42-
use tracing::error;
45+
use tracing::{error, trace};
4346

4447
use crate::{
4548
cli::Options,
@@ -218,7 +221,10 @@ impl<'a> Stream<'a> {
218221

219222
dir.flatten()
220223
.map(|file| file.path())
221-
.filter(|file| file.extension().is_some_and(|ext| ext.eq("parquet")))
224+
.filter(|file| {
225+
file.extension().is_some_and(|ext| ext.eq("parquet"))
226+
&& std::fs::metadata(file).is_ok_and(|meta| meta.len() > FOOTER_SIZE as u64)
227+
})
222228
.collect()
223229
}
224230

@@ -350,24 +356,32 @@ impl<'a> Stream<'a> {
350356
.build();
351357
schemas.push(merged_schema.clone());
352358
let schema = Arc::new(merged_schema);
353-
let parquet_file = OpenOptions::new()
359+
let mut part_path = parquet_path.to_owned();
360+
part_path.set_extension("part");
361+
let mut part_file = OpenOptions::new()
354362
.create(true)
355363
.append(true)
356-
.open(&parquet_path)
364+
.open(&part_path)
357365
.map_err(|_| StagingError::Create)?;
358-
let mut writer = ArrowWriter::try_new(&parquet_file, schema.clone(), Some(props))?;
366+
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props))?;
359367
for ref record in record_reader.merged_iter(schema, time_partition.cloned()) {
360368
writer.write(record)?;
361369
}
362-
363370
writer.close()?;
364-
if parquet_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 {
371+
372+
if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 {
365373
error!(
366374
"Invalid parquet file {:?} detected for stream {}, removing it",
367-
&parquet_path, &self.stream_name
375+
&part_path, &self.stream_name
368376
);
369-
remove_file(parquet_path).unwrap();
377+
remove_file(part_path).unwrap();
370378
} else {
379+
trace!("Parquet file successfully constructed");
380+
if let Err(e) = std::fs::rename(&part_path, &parquet_path) {
381+
error!(
382+
"Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"
383+
);
384+
}
371385
for file in arrow_files {
372386
// warn!("file-\n{file:?}\n");
373387
let file_size = file.metadata().unwrap().len();

0 commit comments

Comments
 (0)