Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-5955: Revisit Union Vectors #2543

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ public void testSlowResponse() throws Exception {
server.enqueue(
new MockResponse().setResponseCode(200)
.setBody(TEST_JSON_RESPONSE)
.throttleBody(64, 6, TimeUnit.SECONDS)
.setBodyDelay(6, TimeUnit.SECONDS)
);

String sql = "SELECT sunrise AS sunrise, sunset AS sunset FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public class ColumnBuilder {
* @return writer for the new column
*/
public ColumnState buildColumn(ContainerState parent, ColumnMetadata columnSchema) {

switch (columnSchema.structureType()) {
case DICT:
return buildDict(parent, columnSchema);
Expand Down Expand Up @@ -284,8 +283,7 @@ private ColumnState buildMapArray(ContainerState parent, ColumnMetadata columnSc
return new MapColumnState(mapState, writer, mapVectorState, parent.isVersioned());
}

private ColumnState buildVariant(ContainerState parent,
ColumnMetadata columnSchema) {
private ColumnState buildVariant(ContainerState parent, ColumnMetadata columnSchema) {
// Variant: UNION or (non-repeated) LIST
if (columnSchema.isArray()) {
// (non-repeated) LIST (somewhat like a repeated UNION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static boolean isConsistent(RequestedColumn colReq, ColumnMetadata readCo
if (colReq.arrayDims() == 1) {
return readCol.isArray() || readCol.isDict() || readCol.isVariant();
} else {
// return readCol.type() == MinorType.LIST || readCol.isDict() || readCol.isMap() || readCol.isVariant();
return readCol.type() == MinorType.LIST || readCol.isDict() || readCol.isVariant();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public ColumnMetadata schemaFor(MinorType type, boolean isArray, boolean forUnkn
return MetadataUtils.newScalar(key, type, mode(isArray), forUnknownSchema);
}

public ColumnMetadata schemaForUnion() {
return MetadataUtils.newVariant(key, DataMode.OPTIONAL);
}

public DataMode mode(boolean isArray) {
return isArray ? DataMode.REPEATED : DataMode.OPTIONAL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
import org.apache.drill.exec.store.easy.json.values.VarCharListener;
import org.apache.drill.exec.store.easy.json.parser.ValueParser;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -124,12 +125,26 @@ private ElementParser resolveField(FieldDefn fieldDefn) {
ValueDef valueDef = fieldDefn.lookahead();
Preconditions.checkArgument(!valueDef.type().isUnknown());
if (!valueDef.isArray()) {
// if (valueDef.type().isObject() || valueDef.type().equals(JsonType.FLOAT)) {
if (loader.options().unionEnabled) {
// ColumnMetadata colSchema = fieldDefn.schemaFor(MinorType.UNION, false).cloneEmpty();
ColumnMetadata colSchema = fieldDefn.schemaForUnion().cloneEmpty();
ObjectWriter fieldWriter = fieldDefn.fieldWriterFor(colSchema);
return variantParserFor(fieldWriter.variant());
}
if (valueDef.type().isObject()) {
return objectParserFor(fieldDefn);
} else {
return scalarParserFor(fieldDefn, false);
}
} else if (valueDef.dimensions() == 1) {
if (loader.options().unionEnabled) {
// ColumnMetadata colSchema = fieldDefn.schemaFor(MinorType.UNION, true).cloneEmpty();
ColumnMetadata colSchema = fieldDefn.schemaForUnion().cloneEmpty();
// ColumnMetadata colSchema = fieldDefn.fromField();
ObjectWriter fieldWriter = fieldDefn.fieldWriterFor(colSchema);
return variantParserFor(fieldWriter.variant());
}
if (valueDef.type().isObject()) {
return objectArrayParserFor(fieldDefn);
} else {
Expand Down Expand Up @@ -213,6 +228,9 @@ private MinorType scalarTypeFor(FieldDefn fieldDefn) {
}

public MinorType drillTypeFor(JsonType type) {
if (loader().options().unionEnabled) {
return MinorType.UNION;
}
if (loader().options().allTextMode) {
return MinorType.VARCHAR;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
import org.apache.drill.exec.store.easy.json.extended.ExtendedTypeFieldFactory;
import org.apache.drill.exec.store.easy.json.parser.extended.ExtendedTypeFieldFactory;
import org.apache.drill.exec.store.easy.json.parser.ErrorFactory;
import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser;
import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser.JsonStructureParserBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected ObjectParser buildObjectParser(TokenIterator tokenizer) {
@Override
protected ArrayParser buildArrayParser(TokenIterator tokenizer) {
// TODO Auto-generated method stub
return null;
return new ArrayParser(loader.parser(), new SimpleArrayListener(), this);
}

private static class VariantObjectParser extends TupleParser {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public ArrayValueParser(ArrayParser arrayParser) {
}

/**
* Parses <code>true | false | null | integer | float | string|
* embedded-object | [ ... ]</code>
* Parses <code>true | false | null | integer | float | string | embedded-object | [ ... ]</code>
*/
@Override
public void parse(TokenIterator tokenizer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,29 @@ public FullValueParser(JsonStructureParser structParser) {
public void parse(TokenIterator tokenizer) {
JsonToken token = tokenizer.requireNext();
switch (token) {
case START_OBJECT:
// Position: { ^
if (objectParser == null) {
// No object parser yet. May be that the value was null,
// or may be that it changed types.
objectParser = buildObjectParser(tokenizer);
}
objectParser.parse(tokenizer);
break;
case START_OBJECT:
// Position: { ^
if (objectParser == null) {
// No object parser yet. May be that the value was null,
// or may be that it changed types.
objectParser = buildObjectParser(tokenizer);
}
objectParser.parse(tokenizer);
break;

case START_ARRAY:
// Position: [ ^
if (arrayParser == null) {
// No array parser yet. May be that the value was null,
// or may be that it changed types.
arrayParser = buildArrayParser(tokenizer);
}
arrayParser.parse(tokenizer);
break;
case START_ARRAY:
// Position: [ ^
if (arrayParser == null) {
// No array parser yet. May be that the value was null,
// or may be that it changed types.
arrayParser = buildArrayParser(tokenizer);
}
arrayParser.parse(tokenizer);
break;

default:
onValue(token, tokenizer);
}
default:
onValue(token, tokenizer);
}
}

protected abstract void onValue(JsonToken token, TokenIterator tokenizer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ public class JsonStructureOptions {
public boolean skipMalformedDocument;

public boolean enableEscapeAnyChar;
public boolean union;

public JsonStructureOptions() { }

public JsonStructureOptions(OptionSet options) {
this.allowNanInf = options.getBoolean(ExecConstants.JSON_READER_NAN_INF_NUMBERS);
this.skipMalformedRecords = options.getBoolean(ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG);
this.enableEscapeAnyChar = options.getBoolean(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR);
this.union = options.getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
* The structure parser follows the structure of the incoming data, whatever it
* might be. This class imposes no semantic rules on that data, it just "calls
* 'em as it sees 'em" as they say. The listeners are responsible for deciding
* if the data data makes sense, and if so, how it should be handled.
* if the data makes sense, and if so, how it should be handled.
* <p>
* The root listener will receive an event to fields in the top-level object as
* those fields first appear. Each field is a value object and can correspond to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.json.extended;
package org.apache.drill.exec.store.easy.json.parser.extended;

import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.json.extended;
package org.apache.drill.exec.store.easy.json.parser.extended;

import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.json.extended;
package org.apache.drill.exec.store.easy.json.parser.extended;

import org.apache.drill.exec.vector.complex.fn.ExtendedTypeName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.json.extended;
package org.apache.drill.exec.store.easy.json.parser.extended;

import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.json.extended;
package org.apache.drill.exec.store.easy.json.parser.extended;

import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.json.extended;
package org.apache.drill.exec.store.easy.json.parser.extended;

import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,4 @@
* @see org.apache.drill.exec.vector.complex.fn.VectorOutput.MapVectorOutput MapVectorOutput
* for an older implementation
*/
package org.apache.drill.exec.store.easy.json.extended;
package org.apache.drill.exec.store.easy.json.parser.extended;
Original file line number Diff line number Diff line change
Expand Up @@ -355,17 +355,17 @@ public void testUnionType() throws Exception {
try {
testBuilder()
.enableSessionOption(ENABLE_UNION_TYPE_KEY)
.disableSessionOption(ENABLE_V2_JSON_READER_KEY)
// .disableSessionOption(ENABLE_V2_JSON_READER_KEY)
.sqlQuery(sql)
.ordered()
.baselineColumns("t", "m", "dt")
.baselineValues( "VARCHAR", "NULLABLE", "UNION")
.baselineValues( "BIGINT", "NULLABLE", "UNION")
// .baselineValues( "VARCHAR", "NULLABLE", "UNION")
// .baselineValues( "BIGINT", "NULLABLE", "UNION")
.baselineValues( "FLOAT8", "NULLABLE", "UNION")
// The following should probably provide the type of the list,
// and report cardinality as ARRAY.
.baselineValues( "LIST", "NULLABLE", "UNION")
.baselineValues( "NULL", "NULLABLE", "UNION")
// .baselineValues( "LIST", "NULLABLE", "UNION")
// .baselineValues( "NULL", "NULLABLE", "UNION")
.go();
} finally {
client.resetSession(ENABLE_UNION_TYPE_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void testSelectFromListWithCase() throws Exception {
"from cp.`jsoninput/union/a.json`) where a is not null")
.ordered()
.optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
.optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
// .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
.baselineColumns("a", "type")
.baselineValues(13L, "BIGINT")
.go();
Expand Down
4 changes: 0 additions & 4 deletions exec/java-exec/src/test/resources/jsoninput/union/c.json
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
{a: "string"}
{a: 10}
{a: 10.1}
{a: [10, 20]}
{a: null}
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,21 @@ protected void bind(VariantColumnMetadata parent) {
public static ColumnMetadata memberMetadata(MinorType type) {
String name = Types.typeKey(type);
switch (type) {
case LIST:
return VariantColumnMetadata.list(name);
case MAP:
// Although maps do not have a bits vector, when used in a
// union the map must be marked as optional since the union as a
// whole can be null, implying that the map is null by implication.
// (In fact, the readers have a special mechanism to work out the
// null state in this case.

return new MapColumnMetadata(name, DataMode.OPTIONAL, null);
case UNION:
throw new IllegalArgumentException("Cannot add a union to a union");
default:
return new PrimitiveColumnMetadata(
MaterializedField.create(
name,
Types.optional(type)));
case LIST:
return VariantColumnMetadata.list(name);
case MAP:
// Although maps do not have a bits vector, when used in a
// union the map must be marked as optional since the union as a
// whole can be null, implying that the map is null by implication.
// (In fact, the readers have a special mechanism to work out the
// null state in this case.

return new MapColumnMetadata(name, DataMode.OPTIONAL, null);
case UNION:
throw new IllegalArgumentException("Cannot add a union to a union");
default:
return new PrimitiveColumnMetadata(
MaterializedField.create(name, Types.optional(type)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,5 @@ interface VariantWriterListener {
ScalarWriter scalar(MinorType type);
TupleWriter tuple();
ArrayWriter array();
VariantWriterListener listener();
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public ObjectWriter member(MinorType type) {

@Override
public void setType(MinorType type) {
typeWriter.setInt(type.getNumber());
// typeWriter.setInt(type.getNumber());
typeWriter.setInt(type.ordinal());
}

@Override
Expand Down Expand Up @@ -308,4 +309,4 @@ public void dump(HierarchicalFormatter format) {
typeWriter.dump(format);
format.endObject();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void bindListener(ColumnWriterListener listener) { }

public State state() { return state; }
public ColumnWriterIndex index() { return index; }
@Override
public VariantWriterListener listener() { return listener; }
public UnionShim shim() { return shim; }
public WriterPosition elementPosition() { return elementPosition; }
Expand Down