Skip to content

Commit 7f7d0f2

Browse files
fix for invalid parquet issue (#829)
Detect the 0 sized parquet, log the error and delete the parquet and retain the grouped arrow files. The arrow files will be converted to parquet in the next sync cycle (every 60 secs)
1 parent 4c0d2a8 commit 7f7d0f2

File tree

1 file changed

+16
-14
lines changed

1 file changed

+16
-14
lines changed

server/src/storage/staging.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -255,34 +255,36 @@ pub fn convert_disk_files_to_parquet(
255255
custom_partition_fields.insert(custom_partition_field.to_string(), index);
256256
}
257257
}
258-
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
259258
let props = parquet_writer_props(
260259
time_partition.clone(),
261260
index_time_partition,
262261
custom_partition_fields,
263262
)
264263
.build();
265-
266264
schemas.push(merged_schema.clone());
267265
let schema = Arc::new(merged_schema);
268-
let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?;
266+
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
267+
let mut writer = ArrowWriter::try_new(&parquet_file, schema.clone(), Some(props))?;
269268
for ref record in record_reader.merged_iter(schema, time_partition.clone()) {
270269
writer.write(record)?;
271270
}
272271

273272
writer.close()?;
274-
275-
for file in files {
276-
let file_size = file.metadata().unwrap().len();
277-
let file_type = file.extension().unwrap().to_str().unwrap();
278-
279-
if fs::remove_file(file.clone()).is_err() {
280-
log::error!("Failed to delete file. Unstable state");
281-
process::abort()
273+
if parquet_file.metadata().unwrap().len() == 0 {
274+
log::error!("Invalid parquet file detected, removing it");
275+
fs::remove_file(parquet_path).unwrap();
276+
} else {
277+
for file in files {
278+
let file_size = file.metadata().unwrap().len();
279+
let file_type = file.extension().unwrap().to_str().unwrap();
280+
if fs::remove_file(file.clone()).is_err() {
281+
log::error!("Failed to delete file. Unstable state");
282+
process::abort()
283+
}
284+
metrics::STORAGE_SIZE
285+
.with_label_values(&["staging", stream, file_type])
286+
.sub(file_size as i64);
282287
}
283-
metrics::STORAGE_SIZE
284-
.with_label_values(&["staging", stream, file_type])
285-
.sub(file_size as i64);
286288
}
287289
}
288290

0 commit comments

Comments
 (0)