Skip to content

Commit

Permalink
Add force and check all columns options for CheckParquet251Command.
Browse files Browse the repository at this point in the history
  • Loading branch information
yaogai.zhu committed Jun 25, 2024
1 parent fb9c174 commit 309cdf4
Showing 1 changed file with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public CheckParquet251Command(Logger console) {
@Parameter(description = "<files>", required = true)
List<String> files;

@Parameter(
names = {"-f", "--force"},
description = "Force to check the parquet file.")
boolean force;

@Parameter(
names = {"-a", "--all"},
description = "Check all columns in the parquet file.")
boolean checkAllColumns;

@Override
public int run() throws IOException {
boolean badFiles = false;
Expand All @@ -90,7 +100,7 @@ private String check(String file) throws IOException {

FileMetaData meta = footer.getFileMetaData();
String createdBy = meta.getCreatedBy();
if (CorruptStatistics.shouldIgnoreStatistics(createdBy, BINARY)) {
if (force || CorruptStatistics.shouldIgnoreStatistics(createdBy, BINARY)) {
// create fake metadata that will read corrupt stats and return them
FileMetaData fakeMeta =
new FileMetaData(meta.getSchema(), meta.getKeyValueMetaData(), Version.FULL_VERSION);
Expand All @@ -101,17 +111,17 @@ private String check(String file) throws IOException {
columns, Iterables.filter(meta.getSchema().getColumns(), new Predicate<ColumnDescriptor>() {
@Override
public boolean apply(@Nullable ColumnDescriptor input) {
return input != null && input.getType() == BINARY;
return checkAllColumns || (input != null && input.getType() == BINARY);
}
}));

// now check to see if the data is actually corrupt
try (ParquetFileReader reader =
new ParquetFileReader(getConf(), fakeMeta, path, footer.getBlocks(), columns)) {
new ParquetFileReader(getConf(), fakeMeta, path, footer.getBlocks(), columns)) {
PageStatsValidator validator = new PageStatsValidator();
for (PageReadStore pages = reader.readNextRowGroup();
pages != null;
pages = reader.readNextRowGroup()) {
pages != null;
pages = reader.readNextRowGroup()) {
validator.validate(columns, pages);
pages.close();
}
Expand All @@ -125,7 +135,10 @@ public boolean apply(@Nullable ColumnDescriptor input) {

@Override
public List<String> getExamples() {
return Arrays.asList("# Check file1.parquet for corrupt page and column stats", "file1.parquet");
return Arrays.asList(
"# Check file1.parquet for corrupt page and column stats", "file1.parquet",
"# Force to check all columns in file1.parquet for corrupt page and column stats", "file1.parquet -f -a"
);
}

public static class BadStatsException extends RuntimeException {
Expand Down Expand Up @@ -287,7 +300,7 @@ public void addBinary(Binary value) {
}

private static final DynConstructors.Ctor<ColumnReader> COL_READER_CTOR = new DynConstructors.Builder(
ColumnReader.class)
ColumnReader.class)
.hiddenImpl(
"org.apache.parquet.column.impl.ColumnReaderImpl",
ColumnDescriptor.class,
Expand Down Expand Up @@ -336,7 +349,7 @@ private void validateStatsForPage(DataPage page, DictionaryPage dict, ColumnDesc
column.consume();
}

if (numNulls != stats.getNumNulls()) {
if (stats.isNumNullsSet() && numNulls != stats.getNumNulls()) {
throw new BadStatsException("Number of nulls doesn't match.");
}

Expand Down

0 comments on commit 309cdf4

Please sign in to comment.