Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-16739] Added hard limit for Avro data file, proper error messag… #406

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2017-2019 Cask Data, Inc.
* Copyright © 2017-2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand All @@ -16,11 +16,14 @@

package io.cdap.directives.parser;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.gson.internal.LazilyParsedNumber;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
Expand All @@ -37,13 +40,14 @@
import io.cdap.wrangler.api.lineage.Lineage;
import io.cdap.wrangler.api.lineage.Many;
import io.cdap.wrangler.api.lineage.Mutation;
import io.cdap.wrangler.api.parser.Bool;
import io.cdap.wrangler.api.parser.ColumnName;
import io.cdap.wrangler.api.parser.Numeric;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.dq.TypeInference;
import org.json.JSONException;

import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
Expand All @@ -67,14 +71,29 @@ public class JsParser implements Directive, Lineage {
// Max depth to which the JSON needs to be parsed.
private int depth;

// JSON parser.
private static final JsonParser parser = new JsonParser();
// Ignore error in parsing for record and proceed.
private boolean ignoreError;

// Gson parser with adapter to ensure integers are not represented as doubles.
private static final Gson parser = new GsonBuilder()
.setLenient()
.registerTypeAdapter(Double.class, new JsonSerializer<Double>() {
@Override
public JsonElement serialize(Double src, Type typeOfSrc, JsonSerializationContext context) {
if (src == src.longValue()) {
return new JsonPrimitive(src.longValue());
}
return new JsonPrimitive(src.doubleValue());
}
})
.create();

@Override
public UsageDefinition define() {
UsageDefinition.Builder builder = UsageDefinition.builder(NAME);
builder.define("column", TokenType.COLUMN_NAME);
builder.define("depth", TokenType.NUMERIC, Optional.TRUE);
builder.define("ignore-error", TokenType.BOOLEAN, Optional.TRUE);
return builder.build();
}

Expand All @@ -86,6 +105,11 @@ public void initialize(Arguments args) throws DirectiveParseException {
} else {
this.depth = Integer.MAX_VALUE;
}
if (args.contains("ignore-error")) {
this.ignoreError = ((Bool) args.value("ignore-error")).value();
} else {
this.ignoreError = false; // backward compaibility.
}
}

@Override
Expand All @@ -112,7 +136,7 @@ public List<Row> execute(List<Row> rows, ExecutorContext context)
JsonElement element = null;
if (value instanceof String) {
String document = (String) value;
element = parser.parse(document.trim());
element = parser.fromJson(document.trim(), JsonElement.class);
} else if (value instanceof JsonObject || value instanceof JsonArray) {
element = (JsonElement) value;
} else {
Expand Down Expand Up @@ -143,8 +167,19 @@ public List<Row> execute(List<Row> rows, ExecutorContext context)
row.add(column, getValue(element.getAsJsonPrimitive()));
}
}
} catch (JSONException e) {
throw new ErrorRowException(NAME, e.getMessage(), 1);
} catch (Exception e) {
String msg = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
// In case there are more rows being handled, we attempt to surface the Json that is
// causing an issue to make it easier for users to identify the problem quickly.
if (rows.size() > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't rows always size 1? Otherwise throwing an ErrorRowException would incorrectly mark every Row as an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not always. If we parsing AvroFile. Parse-as-avro-file will generate records > 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but the RecipeExecutor will break that into multiple lists of size 1. Either this is always size 1, or the entire ErrorRowException contract is wrong.

msg = String.format("Incorrectly constructed json '%s', %s", value, e.getCause() != null ?
e.getCause().getMessage() : e.getMessage());
}

// If ignore error set, then don't throw exception, just move to next record.
if (!ignoreError) {
throw new ErrorRowException(NAME, msg, 1);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2017-2019 Cask Data, Inc.
* Copyright © 2017-2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -54,6 +54,7 @@
@Description("parse-as-avro-file <column>.")
public class ParseAvroFile implements Directive, Lineage {
public static final String NAME = "parse-as-avro-file";
private static final int AVRO_FILE_SIZE_LIMIT = 5 * 1024 * 1024;
private String column;
private Gson gson;

Expand Down Expand Up @@ -83,6 +84,15 @@ public List<Row> execute(List<Row> rows, final ExecutorContext context) throws D
if (idx != -1) {
Object object = row.getValue(idx);
if (object instanceof byte[]) {
int size = ((byte[]) object).length;
if (size > AVRO_FILE_SIZE_LIMIT) {
throw new DirectiveExecutionException(
NAME,
String.format("Avro file greater than 5 MB are not currently supported by this directive " +
"(Current size : %d). Use File source connector with 'avro' as format to " +
"read large files.", size)
);
}
DataFileReader<GenericRecord> reader = null;
try {
reader =
Expand All @@ -105,7 +115,11 @@ public List<Row> execute(List<Row> rows, final ExecutorContext context) throws D
}
} else {
throw new DirectiveExecutionException(
NAME, String.format("Column '%s' is of invalid type. It should be of type 'byte array'.", column));
NAME, String.format("Avro data file parsing directive requires '%s' to be a bytes field. " +
"Change type in input to bytes and make sure the format in File source is " +
"set as 'blob'. We recommend using Avro format in file source instead of using " +
"this directive",
column));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2017-2019 Cask Data, Inc.
* Copyright © 2017-2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -52,6 +52,40 @@ public void testParseAsAvroFile() throws Exception {
Assert.assertEquals(1495194308245L, results.get(1688).getValue("timestamp"));
}

@Test
public void testAvroWithJsonFieldParsingWithParseJsonIgnoreError() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these new tests are testing two directives. More specifically, these are not testing any change in behavior in the avro directive, they are just testing the changes in json directive.

Unit tests should be testing isolated components when possible. It would be better to enhance the parse json test. For example, if we remove the parse avro file directive, we could easily remove test coverage of the parse json directive accidentally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a problem customer had. We have other tests for catching parse-as-json problems. This was when a json in avro file have. Using redacted user data as test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this is a unit test, meaning it should be testing very specific parts of the code. With the ignore-error changed introduced, it should be testing that the json directive returns empty results when given bad data and ignore-error is true.

InputStream stream = ParseAvroFileTest.class.getClassLoader().getResourceAsStream("bad.avro");
byte[] data = IOUtils.toByteArray(stream);

String[] directives = new String[] {
"parse-as-avro-file body",
"parse-as-json :message_contents 2 true"
};

List<Row> rows = new ArrayList<>();
rows.add(new Row("body", data));

List<Row> results = TestingRig.execute(directives, rows);
Assert.assertEquals(6693, results.size()); // total 6670, 7 records are bad.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these types of asserts are not specific enough. For example, how can we be sure the right rows are filtered? It would be better to just send a single row in as input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using customer test data to make sure we don't break it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't make sure of anything besides the size

}

@Test
public void testAvroWithJsonFieldParsingWithParseJsonIgnoreErrorFail() throws Exception {
InputStream stream = ParseAvroFileTest.class.getClassLoader().getResourceAsStream("bad.avro");
byte[] data = IOUtils.toByteArray(stream);

String[] directives = new String[] {
"parse-as-avro-file body",
"parse-as-json :message_contents 2"
};

List<Row> rows = new ArrayList<>();
rows.add(new Row("body", data));

List<Row> results = TestingRig.execute(directives, rows);
Assert.assertEquals(0, results.size()); // total 6670, 7 records are bad.
}

@Test(expected = RecipeException.class)
public void testIncorrectType() throws Exception {
String[] directives = new String[] {
Expand Down
Binary file added wrangler-core/src/test/resources/bad.avro
Binary file not shown.