Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add force and check all columns options for CheckParquet251Command. #2934

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public CheckParquet251Command(Logger console) {
@Parameter(description = "<files>", required = true)
List<String> files;

@Parameter(
names = {"-f", "--force"},
description = "Force to check the parquet file.")
boolean force;

@Parameter(
names = {"-a", "--all"},
description = "Check all columns in the parquet file.")
boolean checkAllColumns;

@Override
public int run() throws IOException {
boolean badFiles = false;
Expand All @@ -90,7 +100,7 @@ private String check(String file) throws IOException {

FileMetaData meta = footer.getFileMetaData();
String createdBy = meta.getCreatedBy();
if (CorruptStatistics.shouldIgnoreStatistics(createdBy, BINARY)) {
if (force || CorruptStatistics.shouldIgnoreStatistics(createdBy, BINARY)) {
// create fake metadata that will read corrupt stats and return them
FileMetaData fakeMeta =
new FileMetaData(meta.getSchema(), meta.getKeyValueMetaData(), Version.FULL_VERSION);
Expand All @@ -101,17 +111,17 @@ private String check(String file) throws IOException {
columns, Iterables.filter(meta.getSchema().getColumns(), new Predicate<ColumnDescriptor>() {
@Override
public boolean apply(@Nullable ColumnDescriptor input) {
return input != null && input.getType() == BINARY;
return checkAllColumns || (input != null && input.getType() == BINARY);
}
}));

// now check to see if the data is actually corrupt
try (ParquetFileReader reader =
new ParquetFileReader(getConf(), fakeMeta, path, footer.getBlocks(), columns)) {
new ParquetFileReader(getConf(), fakeMeta, path, footer.getBlocks(), columns)) {
PageStatsValidator validator = new PageStatsValidator();
for (PageReadStore pages = reader.readNextRowGroup();
pages != null;
pages = reader.readNextRowGroup()) {
pages != null;
pages = reader.readNextRowGroup()) {
validator.validate(columns, pages);
pages.close();
}
Expand All @@ -125,7 +135,10 @@ public boolean apply(@Nullable ColumnDescriptor input) {

@Override
public List<String> getExamples() {
return Arrays.asList("# Check file1.parquet for corrupt page and column stats", "file1.parquet");
return Arrays.asList(
"# Check file1.parquet for corrupt page and column stats", "file1.parquet",
"# Force to check all columns in file1.parquet for corrupt page and column stats", "file1.parquet -f -a"
);
}

public static class BadStatsException extends RuntimeException {
Expand Down Expand Up @@ -287,7 +300,7 @@ public void addBinary(Binary value) {
}

private static final DynConstructors.Ctor<ColumnReader> COL_READER_CTOR = new DynConstructors.Builder(
ColumnReader.class)
ColumnReader.class)
.hiddenImpl(
"org.apache.parquet.column.impl.ColumnReaderImpl",
ColumnDescriptor.class,
Expand Down Expand Up @@ -336,7 +349,7 @@ private void validateStatsForPage(DataPage page, DictionaryPage dict, ColumnDesc
column.consume();
}

if (numNulls != stats.getNumNulls()) {
if (stats.isNumNullsSet() && numNulls != stats.getNumNulls()) {
throw new BadStatsException("Number of nulls doesn't match.");
}

Expand Down