Skip to content

Commit 8e0aaad

Browse files
xmakroetseidl
andauthored
Parquet: Verify 32-bit CRC checksum when decoding pages (#6290)
* Parquet: Verify 32-bit CRC checksum when decoding pages * Undo cargo toml * a * enable crc by default * a * Address comments * Add tests that verify crc checks * Document feature flag * Move documentation around * Update parquet/Cargo.toml Co-authored-by: Ed Seidl <[email protected]> * Update parquet/src/file/serialized_reader.rs Co-authored-by: Ed Seidl <[email protected]> * Add license * Run cargo +stable fmt --all * Revert MD034 * Applye readme suggestion --------- Co-authored-by: xmakro <makro@> Co-authored-by: Ed Seidl <[email protected]>
1 parent ebcc4a5 commit 8e0aaad

File tree

5 files changed

+89
-1
lines changed

5 files changed

+89
-1
lines changed

parquet/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ twox-hash = { version = "1.6", default-features = false }
6868
paste = { version = "1.0" }
6969
half = { version = "2.1", default-features = false, features = ["num-traits"] }
7070
sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] }
71+
crc32fast = { version = "1.4.2", optional = true, default-features = false }
7172

7273
[dev-dependencies]
7374
base64 = { version = "0.22", default-features = false, features = ["std"] }
@@ -117,6 +118,8 @@ object_store = ["dep:object_store", "async"]
117118
zstd = ["dep:zstd", "zstd-sys"]
118119
# Display memory in example/write_parquet.rs
119120
sysinfo = ["dep:sysinfo"]
121+
# Verify 32-bit CRC checksum when decoding parquet pages
122+
crc = ["dep:crc32fast"]
120123

121124
[[example]]
122125
name = "read_parquet"

parquet/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ The `parquet` crate provides the following features which may be enabled in your
6060
- `zstd` (default) - support for parquet using `zstd` compression
6161
- `snap` (default) - support for parquet using `snappy` compression
6262
- `cli` - parquet [CLI tools](https://github.com/apache/arrow-rs/tree/master/parquet/src/bin)
63+
- `crc` - enables functionality to automatically verify checksums of each page (if present) when decoding
6364
- `experimental` - Experimental APIs which may change, even between minor releases
6465

6566
## Parquet Feature Status
@@ -82,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your
8283

8384
## License
8485

85-
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
86+
Licensed under the Apache License, Version 2.0: <http://www.apache.org/licenses/LICENSE-2.0>.

parquet/src/file/serialized_reader.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,15 @@ pub(crate) fn decode_page(
390390
physical_type: Type,
391391
decompressor: Option<&mut Box<dyn Codec>>,
392392
) -> Result<Page> {
393+
// Verify the 32-bit CRC checksum of the page
394+
#[cfg(feature = "crc")]
395+
if let Some(expected_crc) = page_header.crc {
396+
let crc = crc32fast::hash(&buffer);
397+
if crc != expected_crc as u32 {
398+
return Err(general_err!("Page CRC checksum mismatch"));
399+
}
400+
}
401+
393402
// When processing data page v2, depending on enabled compression for the
394403
// page, we should account for uncompressed data ('offset') of
395404
// repetition and definition levels.
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This file contains an end to end test for verifying checksums when reading parquet files.
19+
20+
use std::path::PathBuf;
21+
22+
use arrow::util::test_util::parquet_test_data;
23+
use parquet::arrow::arrow_reader::ArrowReaderBuilder;
24+
25+
#[test]
26+
fn test_datapage_v1_corrupt_checksum() {
27+
let errors = read_file_batch_errors("datapage_v1-corrupt-checksum.parquet");
28+
assert_eq!(errors, [
29+
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
30+
Ok(()),
31+
Ok(()),
32+
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
33+
Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string())
34+
]);
35+
}
36+
37+
#[test]
38+
fn test_datapage_v1_uncompressed_checksum() {
39+
let errors = read_file_batch_errors("datapage_v1-uncompressed-checksum.parquet");
40+
assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]);
41+
}
42+
43+
#[test]
44+
fn test_datapage_v1_snappy_compressed_checksum() {
45+
let errors = read_file_batch_errors("datapage_v1-snappy-compressed-checksum.parquet");
46+
assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]);
47+
}
48+
49+
#[test]
50+
fn test_plain_dict_uncompressed_checksum() {
51+
let errors = read_file_batch_errors("plain-dict-uncompressed-checksum.parquet");
52+
assert_eq!(errors, [Ok(())]);
53+
}
54+
#[test]
55+
fn test_rle_dict_snappy_checksum() {
56+
let errors = read_file_batch_errors("rle-dict-snappy-checksum.parquet");
57+
assert_eq!(errors, [Ok(())]);
58+
}
59+
60+
/// Reads a file and returns a vector with one element per record batch.
61+
/// The record batch data is replaced with () and errors are stringified.
62+
fn read_file_batch_errors(name: &str) -> Vec<Result<(), String>> {
63+
let path = PathBuf::from(parquet_test_data()).join(name);
64+
println!("Reading file: {:?}", path);
65+
let file = std::fs::File::open(&path).unwrap();
66+
let reader = ArrowReaderBuilder::try_new(file).unwrap().build().unwrap();
67+
reader
68+
.map(|x| match x {
69+
Ok(_) => Ok(()),
70+
Err(e) => Err(e.to_string()),
71+
})
72+
.collect()
73+
}

parquet/tests/arrow_reader/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ use std::sync::Arc;
3636
use tempfile::NamedTempFile;
3737

3838
mod bad_data;
39+
#[cfg(feature = "crc")]
40+
mod checksum;
3941
mod statistics;
4042

4143
// returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values

0 commit comments

Comments
 (0)