diff --git a/contrib/format-daffodil/README.md b/contrib/format-daffodil/README.md
new file mode 100644
index 00000000000..d036c63b2e8
--- /dev/null
+++ b/contrib/format-daffodil/README.md
@@ -0,0 +1,19 @@
+# Daffodil 'Format' Reader
+This plugin enables Drill to read DFDL-described data from files by way of the Apache Daffodil DFDL implementation.
+
+## Limitations: TBD
+
+At the moment, the DFDL schema is found on the local file system, which won't continue to work.
+
+There are restrictions on the DFDL schemas that this can handle.
+
+In particular, all element children must have distinct element names, including across choice branches.
+(This rules out a number of large DFDL schemas.)
+
+The data is parsed fully from its native form into a Drill data structure held in memory.
+No attempt is made to avoid access to parts of the DFDL-described data that are not needed to answer the query.
+
+If the data is not well-formed, an error occurs and the query fails.
+
+If the data is invalid, and validity checking by Daffodil is enabled, then an error occurs and the query fails.
+
diff --git a/contrib/format-daffodil/pom.xml b/contrib/format-daffodil/pom.xml
new file mode 100644
index 00000000000..01955c746dc
--- /dev/null
+++ b/contrib/format-daffodil/pom.xml
@@ -0,0 +1,94 @@
+
+
+
+ 4.0.0
+
+
+ drill-contrib-parent
+ org.apache.drill.contrib
+ 1.22.0-SNAPSHOT
+
+
+ drill-format-daffodil
+ Drill : Contrib : Format : Daffodil
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ ${project.version}
+
+
+ org.apache.daffodil
+ daffodil-japi_2.12
+ 3.7.0-SNAPSHOT
+
+
+ org.apache.daffodil
+ daffodil-runtime1_2.12
+ 3.7.0-SNAPSHOT
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ tests
+ ${project.version}
+ test
+
+
+
+ org.apache.drill
+ drill-common
+ tests
+ ${project.version}
+ test
+
+
+
+
+
+
+ maven-resources-plugin
+
+
+ copy-java-sources
+ process-sources
+
+ copy-resources
+
+
+ ${basedir}/target/classes/org/apache/drill/exec/store/daffodil
+
+
+
+ src/main/java/org/apache/drill/exec/store/daffodil
+ true
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java
new file mode 100644
index 00000000000..20a0d81d4e0
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class);
+ private final DaffodilFormatConfig dafConfig;
+ private final RowSetLoader rowSetLoader;
+ private final CustomErrorContext errorContext;
+ private final DaffodilMessageParser dafParser;
+ private final InputStream dataInputStream;
+
+ static class DaffodilReaderConfig {
+ final DaffodilFormatPlugin plugin;
+ DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+ this.plugin = plugin;
+ }
+ }
+
+ public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
+
+ errorContext = negotiator.parentErrorContext();
+ this.dafConfig = readerConfig.plugin.getConfig();
+
+ String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd";
+ String rootName = dafConfig.getRootName();
+ String rootNamespace = dafConfig.getRootNamespace();
+ boolean validationMode = dafConfig.getValidationMode();
+
+ URI dfdlSchemaURI;
+ try {
+ dfdlSchemaURI = new URI(schemaURIString);
+ } catch (URISyntaxException e) {
+ throw UserException.validationError(e)
+ .build(logger);
+ }
+
+ FileDescrip file = negotiator.file();
+ DrillFileSystem fs = file.fileSystem();
+ URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+ DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+ DataProcessor dp;
+ try {
+ dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace);
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to get Daffodil DFDL processor for: %s", fsSchemaURI))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // Create the corresponding Drill schema.
+ // Note: this could be a very large schema. Think of a large complex RDBMS schema,
+ // all of it, hundreds of tables, but all part of the same metadata tree.
+ TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+ // Inform Drill about the schema
+ negotiator.tableSchema(drillSchema, true);
+
+ //
+ // DATA TIME: Next we construct the runtime objects, and open files.
+ //
+ // We get the DaffodilMessageParser, which is a stateful driver for daffodil that
+ // actually does the parsing.
+ rowSetLoader = negotiator.build().writer();
+
+ // We construct the Daffodil InfosetOutputter which the daffodil parser uses to
+ // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+ DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader);
+
+ // Now we can setup the dafParser with the outputter it will drive with
+ // the parser-produced infoset.
+ dafParser = new DaffodilMessageParser(dp); // needs further initialization after this.
+ dafParser.setInfosetOutputter(outputter);
+
+ Path dataPath = file.split().getPath();
+ // Lastly, we open the data stream
+ try {
+ dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to open input file: %s", dataPath.toString()))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // And lastly,... tell daffodil the input data stream.
+ dafParser.setInputStream(dataInputStream);
+ }
+
+
+ /**
+ * This is the core of actual processing - data movement from Daffodil to Drill.
+ *
+ * If there is space in the batch, and there is data available to parse
+ * then this calls the daffodil parser, which parses data, delivering it to the rowWriter
+ * by way of the infoset outputter.
+ *
+ * Repeats until the rowWriter is full (a batch is full), or there is no more data, or
+ * a parse error ends execution with a throw.
+ *
+ * Validation errors and other warnings are not errors and are logged but do not cause
+ * parsing to fail/throw.
+ * @return true if there are rows retrieved, false if no rows were retrieved, which means
+ * no more will ever be retrieved (end of data).
+ * @throws RuntimeException on parse errors.
+ */
+ @Override
+ public boolean next() {
+ // Check assumed invariants
+ // We don't know if there is data or not. This could be called on an empty data file.
+ // We DO know that this won't be called if there is no space in the batch for even 1
+ // row.
+ if (dafParser.isEOF()) {
+ return false; // return without even checking for more rows or trying to parse.
+ }
+ while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip this loop.
+ // the predicate is always true once.
+ try {
+ dafParser.parse();
+ if (dafParser.isProcessingError()) {
+ assert(Objects.nonNull(dafParser.getDiagnostics()));
+ throw UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+ .addContext(errorContext).build(logger);
+ }
+ if (dafParser.isValidationError()) {
+ logger.warn(dafParser.getDiagnosticsAsString());
+ // Note that even if daffodil is set to not validate, validation errors may still occur
+ // from DFDL's "recoverableError" assertions.
+ }
+ } catch (Exception e) {
+ throw UserException.dataReadError(e).message("Error parsing file: " + e.getMessage())
+ .addContext(errorContext).build(logger);
+ }
+ rowSetLoader.save();
+ }
+ int nRows = rowSetLoader.rowCount();
+ assert nRows > 0; // This cannot be zero. If the parse failed we will have already thrown out of here.
+ return true;
+ }
+
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(dataInputStream);
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java
new file mode 100644
index 00000000000..df9aebf08f4
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+ extends InfosetOutputter {
+
+ private boolean isOriginalRoot() {
+ boolean result = currentTupleWriter() == rowSetWriter;
+ if (result)
+ assert(tupleWriterStack.size() == 1);
+ return result;
+ }
+
+ /**
+ * True if the next startComplex call will be for the
+ * DFDL infoset root element whose children are the columns of
+ * the row set.
+ */
+ private boolean isRootElement = true;
+
+ /**
+ * Stack that is used only if we have sub-structures that are not
+ * simple-type fields of the row.
+ */
+ private final Stack tupleWriterStack = new Stack<>();
+
+ private final Stack arrayWriterStack = new Stack<>();
+
+ private TupleWriter currentTupleWriter() {
+ return tupleWriterStack.peek();
+ }
+
+ private ArrayWriter currentArrayWriter() {
+ return arrayWriterStack.peek();
+ }
+
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+ private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+ private RowSetLoader rowSetWriter;
+
+ public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+ this.rowSetWriter = writer;
+ this.tupleWriterStack.push(writer);
+ }
+
+ @Override
+ public void reset() {
+ tupleWriterStack.clear();
+ tupleWriterStack.push(rowSetWriter);
+ arrayWriterStack.clear();
+ this.isRootElement = true;
+ checkCleanState();
+ }
+
+ private void checkCleanState() {
+ assert(isOriginalRoot());
+ assert(arrayWriterStack.isEmpty());
+ assert(isRootElement);
+ }
+
+ @Override
+ public void startDocument() {
+ checkCleanState();
+ }
+
+ @Override
+ public void endDocument() {
+ checkCleanState();
+ }
+
+ private String colName(ElementMetadata md) {
+ return DrillDaffodilSchemaVisitor.makeColumnName(md);
+ }
+
+ @Override
+ public void startSimple(InfosetSimpleElement ise) {
+ assert (!isRootElement);
+ ElementMetadata md = ise.metadata();
+ String colName = colName(md);
+ ColumnWriter cw;
+ if (md.isArray()) {
+ // A simple type array
+ assert(!arrayWriterStack.isEmpty());
+ cw = currentArrayWriter().scalar();
+ } else {
+ // A simple element within a map
+ // Note the map itself might be an array
+ // but we don't care about that here.
+ cw = currentTupleWriter().column(colName);
+ }
+ ColumnMetadata cm = cw.schema();
+ assert(cm.isScalar());
+ if (md.isNillable() && ise.isNilled()) {
+ assert cm.isNullable();
+ cw.setNull();
+ } else {
+ convertDaffodilValueToDrillValue(ise, cm, cw);
+ }
+ }
+
+ @Override
+ public void endSimple(InfosetSimpleElement diSimple) {
+ assert (!isRootElement);
+ // do nothing
+ }
+
+ @Override
+ public void startComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ String colName = colName(ce.metadata());
+ if (isRootElement) {
+ assert(isOriginalRoot());
+ // This complex element's corresponds to the root element of the
+ // DFDL schema. We don't treat this as a column of the row set.
+ // Rather, it's children are the columns of the row set.
+ //
+ // If we do nothing at all here, then we'll start getting
+ // even calls for the children.
+ isRootElement = false;
+ return;
+ }
+ if (md.isArray()) {
+ assert(!arrayWriterStack.isEmpty());
+ // FIXME: is this the way to add a complex array child item (i.e., each array item is a map)
+ tupleWriterStack.push(currentArrayWriter().tuple());
+ } else {
+ tupleWriterStack.push(currentTupleWriter().tuple(colName));
+ }
+ }
+
+ @Override
+ public void endComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ if (isOriginalRoot()) {
+ isRootElement = true;
+ // do nothing else. The row gets closed-out in the DaffodilBatchReader.next() method.
+ } else {
+ // it's a map.
+ // We seem to not need to do anything to end the map. No action taken here works.
+ if (md.isArray()) {
+ assert (!arrayWriterStack.isEmpty());
+ currentArrayWriter().save(); // required for map array entries.
+ }
+ tupleWriterStack.pop();
+ }
+ }
+
+ @Override
+ public void startArray(InfosetArray diArray) {
+ ElementMetadata md = diArray.metadata();
+ assert (md.isArray());
+ // DFDL has no notion of an array directly within another array. A named field (map) is necessary
+ // before you can have another array.
+ assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a map, or the top level row.
+ String colName = colName(md);
+ TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+ ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+ arrayWriterStack.push(aw);
+ }
+
+ @Override
+ public void endArray(InfosetArray ia) {
+ ElementMetadata md = ia.metadata();
+ assert (md.isArray());
+ assert (!arrayWriterStack.empty());
+ // FIXME: How do we end/close-out an array?
+ // note that each array instance, when the instance is a map, must have
+ // save called after it is written to the array but that happens
+ // in endComplex events since it must be called not once per array, but
+ // once per array item.
+ arrayWriterStack.pop();
+ }
+
+ private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, ColumnMetadata cm, ColumnWriter cw) {
+ PrimitiveType dafType = ise.metadata().primitiveType();
+ TypeProtos.MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+ assert(drillType == cm.type());
+ switch (drillType) {
+ case INT: {
+ //
+ // FIXME: Javadoc for setObject says "primarily for testing"
+ // So how are we supposed to assign the column value then?
+ // Is there a way to get from a ColumnWriter to a typed scalar writer (downcast perhaps?)
+ cw.setObject(ise.getInt());
+ break;
+ }
+ case BIGINT: {
+ cw.setObject(ise.getLong());
+ break;
+ }
+ case SMALLINT: {
+ cw.setObject(ise.getShort());
+ break;
+ }
+ case TINYINT: {
+ cw.setObject(ise.getByte());
+ break;
+ }
+// .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)
+// .put("UNSIGNEDINT", TypeProtos.MinorType.UINT4)
+// .put("UNSIGNEDSHORT", TypeProtos.MinorType.UINT2)
+// .put("UNSIGNEDBYTE", TypeProtos.MinorType.UINT1)
+// .put("INTEGER", TypeProtos.MinorType.BIGINT)
+// .put("NONNEGATIVEINTEGER", TypeProtos.MinorType.BIGINT)
+ case BIT: {
+ cw.setObject(ise.getBoolean());
+ break;
+ }
+// .put("DATE", TypeProtos.MinorType.DATE) // requires conversion
+// .put("DATETIME", TypeProtos.MinorType.TIMESTAMP) // requires conversion
+// .put("DECIMAL", TypeProtos.MinorType.VARDECIMAL) // requires conversion (maybe)
+ case FLOAT8: {
+ cw.setObject(ise.getDouble());
+ break;
+ }
+ case FLOAT4: {
+ cw.setObject(ise.getFloat());
+ break;
+ }
+ case VARBINARY: {
+ cw.setObject(ise.getHexBinary());
+ break;
+ }
+ case VARCHAR: {
+ //
+ // FIXME: VARCHAR is defined in drill as utf8 string.
+ // Is Drill expecting something other than a Java string in this setObject call?
+ // Should we be mapping Daffodil strings to Drill VAR16CHAR type?
+ //
+ String s = ise.getString();
+ cw.setObject(s);
+ break;
+ }
+// .put("TIME", TypeProtos.MinorType.TIME) // requires conversion
+
+ }
+ }
+
+ private void DFDLParseError(String s) {
+ throw new RuntimeException(s);
+ }
+
+ private static void nyi() {
+ throw new RuntimeException("not yet implemented.");
+ }
+
+ private static void fatalError(String s) {
+ throw new RuntimeException(s);
+ }
+}
+
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java
new file mode 100644
index 00000000000..80e2ba5110d
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.store.daffodil.DaffodilBatchReader.DaffodilReaderConfig;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+@JsonTypeName(DaffodilFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class DaffodilFormatConfig implements FormatPluginConfig {
+
+ public final List extensions;
+ public final String schemaURI;
+ public final boolean validationMode;
+ public final String rootName;
+ public final String rootNamespace;
+ /**
+ * In the constructor for a format config, you should not use
+ * boxed versions of primitive types. It creates problems with
+ * defaulting them (they default to null) which cannot be unboxed.
+ */
+ @JsonCreator
+ public DaffodilFormatConfig(@JsonProperty("extensions") List extensions,
+ @JsonProperty("schemaURI") String schemaURI,
+ @JsonProperty("rootName") String rootName,
+ @JsonProperty("rootNamespace") String rootNamespace,
+ @JsonProperty("validationMode") boolean validationMode) {
+
+ this.extensions = extensions == null ? Collections.singletonList("dat") : ImmutableList.copyOf(extensions);
+ this.rootName = rootName;
+ this.rootNamespace = rootNamespace;
+ this.schemaURI = schemaURI;
+ // no default. Users must pick.
+ this.validationMode = validationMode;
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public List getExtensions() {
+ return extensions;
+ }
+
+ public String getSchemaURI() {
+ return schemaURI;
+ }
+
+ public String getRootName() {
+ return rootName;
+ }
+
+ public String getRootNamespace() {
+ return rootNamespace;
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public boolean getValidationMode() {
+ return validationMode;
+ }
+
+ public DaffodilReaderConfig getReaderConfig(DaffodilFormatPlugin plugin) {
+ DaffodilReaderConfig readerConfig = new DaffodilReaderConfig(plugin);
+ return readerConfig;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaURI, validationMode, rootName, rootNamespace);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ DaffodilFormatConfig other = (DaffodilFormatConfig) obj;
+ return Objects.equals(schemaURI, other.schemaURI)
+ && Objects.equals(rootName, other.rootName)
+ && Objects.equals(rootNamespace, other.rootNamespace)
+ && Objects.equals(validationMode, other.validationMode);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("schemaURI", schemaURI)
+ .field("rootName", rootName)
+ .field("rootNamespace", rootNamespace)
+ .field("validationMode", validationMode)
+ .toString();
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java
new file mode 100644
index 00000000000..95b5e82aaac
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
+
+public class DaffodilFormatPlugin extends EasyFormatPlugin {
+
+ public static final String DEFAULT_NAME = "daffodil";
+ public static final String OPERATOR_TYPE = "DAFFODIL_SUB_SCAN";
+
+ public static class DaffodilReaderFactory extends FileReaderFactory {
+ private final DaffodilBatchReader.DaffodilReaderConfig readerConfig;
+
+ private final EasySubScan scan;
+
+ public DaffodilReaderFactory(DaffodilBatchReader.DaffodilReaderConfig config, EasySubScan scan) {
+ this.readerConfig = config;
+ this.scan = scan;
+ }
+
+ @Override
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+ return new DaffodilBatchReader(readerConfig, scan, negotiator);
+ }
+ }
+
+ public DaffodilFormatPlugin(String name, DrillbitContext context,
+ Configuration fsConf, StoragePluginConfig storageConfig,
+ DaffodilFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+ }
+
+ private static EasyFormatConfig easyConfig(Configuration fsConf, DaffodilFormatConfig pluginConfig) {
+ return EasyFormatConfig.builder()
+ .readable(true)
+ .writable(false)
+ .blockSplittable(false)
+ .compressible(true)
+ .extensions(pluginConfig.getExtensions())
+ .fsConf(fsConf)
+ .readerOperatorType(OPERATOR_TYPE)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
+ .supportsLimitPushdown(true)
+ .supportsProjectPushdown(true)
+ .defaultName(DaffodilFormatPlugin.DEFAULT_NAME)
+ .build();
+ }
+
+ @Override
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
+ builder.nullType(Types.optional(MinorType.VARCHAR));
+ builder.readerFactory(new DaffodilReaderFactory(formatConfig.getReaderConfig(this), scan));
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java
new file mode 100644
index 00000000000..67b74d2571a
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.japi.Diagnostic;
+import org.apache.daffodil.japi.ParseResult;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.japi.io.InputSourceDataInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * DFDL Daffodil Streaming message parser
+ *
+ * You construct this providing a DataProcessor obtained from the
+ * DaffodilDataProcessorFactory.
+ * The DataProcessor contains the compiled DFDL schema, ready to use, as
+ * well as whether validation while parsing has been requested.
+ *
+ * The DataProcessor object may be shared/reused by multiple threads each of which
+ * has its own copy of this class.
+ * This object is, however, stateful, and must not be shared by multiple threads.
+ *
+ * You must call setInputStream, and setInfosetOutputter before
+ * you call parse().
+ * The input stream and the InfosetOutputter objects are also private to one thread and are stateful
+ * and owned by this object.
+ * Once you have called setInputStream, you should view the input stream as the private property of
+ * this object.
+ * The parse() will invoke the InfosetOutputter's methods to deliver
+ * parsed data, and it may optionally create diagnostics (obtained via getDiagnostics)
+ * indicating which kind they are via the getIsProcessingError, getIsValidationError.
+ *
+ * Note that the InfosetOutputter may be called many times before a processing error is detected,
+ * as Daffodil delivers result data incrementally.
+ *
+ * Validation errors do not affect the InfosetOutputter output calls, but indicate that data was
+ * detected that is invalid.
+ *
+ * When parse() returns, the parse has ended and one can check for errors/diagnostics.
+ * One can call parse() again if there is still data to consume, which is checked via the
+ * isEOF() method.
+ *
+ * There are no guarantees about where the input stream is positioned between parse() calls.
+ * In particular, it may not be positioned at the start of the next message, as Daffodil may
+ * have pre-fetched additional bytes from the input stream which it found are not part of the
+ * current infoset, but the next one.
+ * The positioning of the input stream may in fact be somewhere in the middle of a byte,
+ * as Daffodil does not require messages to be of lengths that are in whole byte units.
+ * Hence, once you give the input stream to this object via setInputStream, that input stream is
+ * owned privately by this class for ever after.
+ */
+public class DaffodilMessageParser {
+
+ /**
+ * Constructs the parser using a DataProcessor obtained from
+ * a DaffodilDataProcessorFactory.
+ * @param dp
+ */
+ DaffodilMessageParser(DataProcessor dp) {
+ this.dp = dp;
+ }
+
+ /**
+ * Provide the input stream from which data is to be parsed.
+ *
+ * This input stream is then owned by this object and becomes part of its state.
+ *
+ * It is; however, the responsibility of the caller to close this
+ * input stream after the completion of all parse calls.
+ * In particular, if a parse error is considered fatal, then
+ * the caller should close the input stream.
+ * There are advanced error-recovery techniques that may attempt to find
+ * data that can be parsed later in the data stream.
+ * In those cases the input stream would not be closed after a processing error,
+ * but such usage is beyond the scope of this javadoc.
+ * @param inputStream
+ */
+ public void setInputStream(InputStream inputStream) {
+ dis = new InputSourceDataInputStream(inputStream);
+ }
+
+ /**
+ * Provides the InfosetOutputter which will be called to deliver
+ * the Infoset via calls to its methods.
+ * @param outputter
+ */
+ public void setInfosetOutputter(InfosetOutputter outputter) {
+ this.outputter = outputter;
+ }
+
+ /**
+ * Called to pull messages from the data stream.
+ * The message 'Infoset' is delivered by way of calls to the InfosetOutputter's methods.
+ *
+ * After calling this, one may call getIsProcessingError, getIsValiationError, isEOF, and
+ * getDiagnostics.
+ */
+ public void parse() {
+ if (dis == null)
+ throw new IllegalStateException("Input stream must be provided by setInputStream() call.");
+ if (outputter == null)
+ throw new IllegalStateException("InfosetOutputter must be provided by setInfosetOutputter() call.");
+
+ reset();
+ ParseResult res = dp.parse(dis, outputter);
+ isProcessingError = res.isProcessingError();
+ isValidationError = res.isValidationError();
+ diagnostics = res.getDiagnostics();
+ }
+
+ /**
+ * True if the input stream is known to contain no more data.
+ * If the input stream is a true stream, not a file, then temporary unavailability of data
+ * may cause this call to block until the stream is closed from the other end, or data becomes
+ * available.
+ *
+ * False if the input stream is at EOF, and no more data can be obtained.
+ * It is an error to call parse() after isEOF has returned true.
+ * @return
+ */
+ public boolean isEOF() {
+ return !dis.hasData();
+ }
+
+ /**
+ * True if the parse() call failed with a processing error.
+ * This indicates that the data was not well-formed and could not be
+ * parsed successfully.
+ *
+ * It is possible for isProcessingError and isValidationError to both be true.
+ * @return
+ */
+ public boolean isProcessingError() { return isProcessingError; }
+
+ /**
+ * True if a validation error occurred during parsing.
+ * Subsequently to a validation error occurring, parsing may succeed or fail.
+ * after the validation error was detected.
+ * @return
+ */
+ public boolean isValidationError() { return isValidationError; }
+
+ /**
+ * After a parse() call this returns null or a list of 1 or more diagnostics.
+ *
+ * If isProcessingError or isValidationError are true, then this will contain at least 1
+ * diagnostic.
+ * If both are true this will contain at least 2 diagnostics.
+ * @return
+ */
+ public List getDiagnostics() { return diagnostics; }
+ public String getDiagnosticsAsString() {
+ String result = diagnostics.stream()
+ .map(Diagnostic::getMessage)
+ .collect(Collectors.joining("\n"));
+ return result;
+ }
+
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilMessageParser.class);
+
+ private List diagnostics; // diagnostics.
+ private boolean isProcessingError;
+ private boolean isValidationError;
+
+ private InputSourceDataInputStream dis;
+ private InfosetOutputter outputter;
+ private DataProcessor dp;
+
+ private void reset() {
+ outputter.reset();
+ isProcessingError = false;
+ isValidationError = false;
+ diagnostics = null;
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java
new file mode 100644
index 00000000000..149e0b2ceec
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil.schema;
+
+
+import org.apache.daffodil.japi.Compiler;
+import org.apache.daffodil.japi.Daffodil;
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.japi.Diagnostic;
+import org.apache.daffodil.japi.InvalidParserException;
+import org.apache.daffodil.japi.InvalidUsageException;
+import org.apache.daffodil.japi.ProcessorFactory;
+import org.apache.daffodil.japi.ValidationMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.Channels;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Compiles a DFDL schema (mostly for tests) or loads a pre-compiled DFDL schema so
+ * that one can obtain a DataProcessor for use with DaffodilMessageParser.
+ *
+ * TODO: Needs to use a cache to avoid reloading/recompiling every time.
+ */
+public class DaffodilDataProcessorFactory {
+ // Default constructor is used.
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilDataProcessorFactory.class);
+
+ private DataProcessor dp;
+
+ /**
+ * Thrown if schema compilation fails.
+ *
+ * Contains diagnostic objects which give the cause(s) of the failure.
+ */
+ public static class CompileFailure extends Exception {
+ List diags;
+
+ CompileFailure(List diagnostics) {
+ super("DFDL Schema Compile Failure");
+ diags = diagnostics;
+ }
+ }
+
+ /**
+ * Gets a Daffodil DataProcessor given the necessary arguments to compile or reload it.
+ * @param schemaFileURI pre-compiled dfdl schema (.bin extension) or DFDL schema source (.xsd extension)
+ * @param validationMode Use true to request Daffodil built-in 'limited' validation.
+ * Use false for no validation.
+ * @param rootName Local name of root element of the message. Can be null to use the first element
+ * declaration of the primary schema file. Ignored if reloading a pre-compiled schema.
+ * @param rootNS Namespace URI as a string. Can be null to use the target namespace
+ * of the primary schema file or if it is unambiguous what element is the rootName.
+ * Ignored if reloading a pre-compiled schema.
+ * @return the DataProcessor
+ * @throws CompileFailure - if schema compilation fails
+ * @throws IOException - if the schemaFileURI cannot be opened or is not found.
+ * @throws URISyntaxException - if the schemaFileURI is not legal syntax.
+ * @throws InvalidParserException - if the reloading of the parser from pre-compiled binary fails.
+ */
+ public DataProcessor getDataProcessor(
+ URI schemaFileURI,
+ boolean validationMode,
+ String rootName,
+ String rootNS)
+ throws CompileFailure, IOException, URISyntaxException, InvalidParserException {
+
+ DaffodilDataProcessorFactory dmp = new DaffodilDataProcessorFactory();
+ boolean isPrecompiled = schemaFileURI.toString().endsWith(".bin");
+ if (isPrecompiled) {
+ if (Objects.nonNull(rootName) && !rootName.isEmpty()) {
+ // A usage error. You shouldn't supply the name and optionally namespace if loading
+ // precompiled schema because those are built into it. Should be null or "".
+ logger.warn("Root element name '{}' is ignored when used with precompiled DFDL schema.", rootName);
+ }
+ dmp.loadSchema(schemaFileURI);
+ dmp.setupDP(validationMode, null);
+ } else {
+ List pfDiags = dmp.compileSchema(schemaFileURI, rootName, rootNS);
+ dmp.setupDP(validationMode, pfDiags);
+ }
+ return dmp.dp;
+ }
+
+ private void loadSchema(URI schemaFileURI)
+ throws IOException, InvalidParserException {
+ Compiler c = Daffodil.compiler();
+ dp = c.reload(Channels.newChannel(schemaFileURI.toURL().openStream()));
+ }
+
+ @SuppressWarnings("ReassignedVariable")
+ private List compileSchema(URI schemaFileURI, String rootName, String rootNS)
+ throws URISyntaxException, IOException, CompileFailure {
+ Compiler c = Daffodil.compiler();
+ ProcessorFactory pf = c.compileSource(schemaFileURI, rootName, rootNS);
+ List pfDiags = pf.getDiagnostics();
+ if (pf.isError()) {
+ pfDiags.forEach(diag ->
+ logger.error(diag.getSomeMessage())
+ );
+ throw new CompileFailure(pfDiags);
+ }
+ dp = pf.onPath("/");
+ return pfDiags; // must be just warnings. If it was errors we would have thrown.
+ }
+
+ /**
+ * Common setup steps used whether or not we reloaded or compiled a DFDL schema.
+ */
+ private void setupDP(boolean validationMode, List pfDiags) throws CompileFailure {
+ Objects.requireNonNull(dp); // true because failure to produce a dp throws CompileFailure.
+ if (validationMode) {
+ try {
+ dp = dp.withValidationMode(ValidationMode.Limited);
+ } catch (InvalidUsageException e) {
+ // impossible
+ throw new Error(e);
+ }
+ }
+ List dpDiags = dp.getDiagnostics();
+ if (dp.isError()) {
+ throw new CompileFailure(dpDiags);
+ }
+ // well this part is only if we compiled, and provided the pfDiags arg as non-null.
+ List compilationWarnings;
+ if (pfDiags != null && !pfDiags.isEmpty()) {
+ compilationWarnings = pfDiags;
+ compilationWarnings.addAll(dpDiags); // dpDiags might be empty. That's ok.
+ } else {
+ compilationWarnings = dpDiags; // dpDiags might be empty. That's ok.
+ }
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java
new file mode 100644
index 00000000000..7a1c6314578
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil.schema;
+
+import org.apache.daffodil.japi.InvalidParserException;
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+
+public class DrillDaffodilSchemaUtils {
+ private static final MinorType DEFAULT_TYPE = MinorType.VARCHAR;
+ private static final Logger logger = LoggerFactory.getLogger(DrillDaffodilSchemaUtils.class);
+
+ /**
+ * This map maps the data types defined by the DFDL definition to Drill data types.
+ */
+ public static final ImmutableMap DFDL_TYPE_MAPPINGS =
+ ImmutableMap.builder()
+ .put("LONG", MinorType.BIGINT)
+ .put("INT", MinorType.INT)
+ .put("SHORT", MinorType.SMALLINT)
+ .put("BYTE", MinorType.TINYINT)
+ .put("UNSIGNEDLONG", MinorType.UINT8)
+ .put("UNSIGNEDINT", MinorType.UINT4)
+ .put("UNSIGNEDSHORT", MinorType.UINT2)
+ .put("UNSIGNEDBYTE", MinorType.UINT1)
+ .put("INTEGER", MinorType.BIGINT)
+ .put("NONNEGATIVEINTEGER", MinorType.BIGINT)
+ .put("BOOLEAN", MinorType.BIT)
+ .put("DATE", MinorType.DATE) // requires conversion
+ .put("DATETIME", MinorType.TIMESTAMP) // requires conversion
+ .put("DECIMAL", MinorType.VARDECIMAL) // requires conversion (maybe)
+ .put("DOUBLE", MinorType.FLOAT8)
+ .put("FLOAT", MinorType.FLOAT4)
+ .put("HEXBINARY", MinorType.VARBINARY)
+ .put("STRING", MinorType.VARCHAR)
+ .put("TIME", MinorType.TIME) // requires conversion
+ .build();
+
+
+ @VisibleForTesting
+ public static TupleMetadata processSchema(URI dfdlSchemaURI, String rootName, String namespace)
+ throws IOException, DaffodilDataProcessorFactory.CompileFailure,
+ URISyntaxException, InvalidParserException {
+ DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+ DataProcessor dp = dpf.getDataProcessor(dfdlSchemaURI, true, rootName, namespace);
+ return daffodilDataProcessorToDrillSchema(dp);
+ }
+
+ public static TupleMetadata daffodilDataProcessorToDrillSchema(DataProcessor dp) {
+ DrillDaffodilSchemaVisitor schemaVisitor = new DrillDaffodilSchemaVisitor();
+ dp.walkMetadata(schemaVisitor);
+ TupleMetadata drillSchema = schemaVisitor.getDrillSchema();
+ return drillSchema;
+ }
+
+ /**
+ * Returns a {@link MinorType} of the corresponding DFDL Data Type. Defaults to VARCHAR if unknown
+ * @param dfdlType A String of the DFDL Data Type (local name only, i.e., no "xs:" prefix.
+ * @return A {@link MinorType} of the Drill data type.
+ */
+ public static MinorType getDrillDataType(PrimitiveType dfdlType) {
+ try {
+ MinorType type = DrillDaffodilSchemaUtils.DFDL_TYPE_MAPPINGS.get(dfdlType.name().toUpperCase());
+ if (type == null) {
+ return DEFAULT_TYPE;
+ } else {
+ return type;
+ }
+ } catch (NullPointerException e) {
+ logger.warn("Unknown data type found in XSD reader: {}. Returning VARCHAR.", dfdlType);
+ return DEFAULT_TYPE;
+ }
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java
new file mode 100644
index 00000000000..c199a9635b0
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil.schema;
+
+import org.apache.daffodil.runtime1.api.ChoiceMetadata;
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.MetadataHandler;
+import org.apache.daffodil.runtime1.api.SequenceMetadata;
+import org.apache.daffodil.runtime1.api.SimpleElementMetadata;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * This class transforms a DFDL/Daffodil schema into a Drill Schema.
+ */
+public class DrillDaffodilSchemaVisitor
+ extends MetadataHandler {
+ private static final Logger logger = LoggerFactory.getLogger(DrillDaffodilSchemaVisitor.class);
+
+ /**
+ * Changes to false after the very first invocation for the root element.
+ */
+ private boolean isOriginalRoot = true;
+
+ /**
+ * Unfortunately, SchemaBuilder and MapBuilder, while similar, do not share a base class
+ * so we have a stack of MapBuilders, and when empty we use the SchemaBuilder
+ */
+ private final SchemaBuilder builder = new SchemaBuilder();
+
+ private final Stack mapBuilderStack = new Stack<>();
+
+ private MapBuilder mapBuilder() {
+ return mapBuilderStack.peek();
+ }
+
+ /**
+ * Returns a {@link TupleMetadata} representation of the DFDL schema.
+ * Should only be called after the walk of the DFDL schema with this visitor has been called.
+ *
+ * @return A {@link TupleMetadata} representation of the DFDL schema.
+ */
+ public TupleMetadata getDrillSchema() {
+ return builder.build();
+ }
+
+ public static String makeColumnName(ElementMetadata md) {
+ return md.toQName().replace(":", "_");
+ }
+
+ @Override
+ public void simpleElementMetadata(SimpleElementMetadata md) {
+ assert (!isOriginalRoot);
+ if (mapBuilderStack.isEmpty()) {
+ simpleElementAsRowSetColumnMetadata(md);
+ } else {
+ simpleElementWithinComplexElementMetadata(md);
+ }
+ }
+
+ private void simpleElementAsRowSetColumnMetadata(SimpleElementMetadata md) {
+ assert (!isOriginalRoot);
+ assert (mapBuilderStack.isEmpty());
+ String colName = makeColumnName(md);
+ MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(md.primitiveType());
+ //
+ // below code adds to the schema builder directly, not a map builder
+ //
+ if (md.isArray())
+ builder.addArray(colName, drillType);
+ else if (md.isOptional() || md.isNillable())
+ builder.addNullable(colName, drillType);
+ else
+ builder.add(colName, drillType);
+ }
+
+ private void simpleElementWithinComplexElementMetadata(SimpleElementMetadata md) {
+ assert (!isOriginalRoot);
+ assert (!mapBuilderStack.isEmpty());
+ String colName = makeColumnName(md);
+ MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(md.primitiveType());
+ //
+ // The code below adds to a map builder from the stack
+ //
+ if (md.isArray()) {
+ mapBuilder().addArray(colName, drillType);
+ } else if (md.isOptional() || md.isNillable()) {
+ mapBuilder().addNullable(colName, drillType);
+ } else {
+ mapBuilder().add(colName, drillType);
+ }
+ }
+
+ @Override
+ public void startComplexElementMetadata(ComplexElementMetadata md) {
+ if (isOriginalRoot) {
+ startComplexOriginalRootMetadata(md);
+ } else if (mapBuilderStack.isEmpty()) {
+ startComplexElementAsRowSetColumnMetadata(md);
+ } else {
+ startComplexChildElementMetadata(md);
+ }
+ }
+
+ /**
+ * The original root given to Drill needs to be a schema element corresponding
+ * to one row of data.
+ *
+ * Drill will call daffodil parse() to parse one such element. The
+ * children elements of this element will become the column contents of the
+ * row.
+ *
+ * So the metadata for this row, to drill, is ONLY the columns of this
+ * top level element type.
+ * @param md
+ */
+ private void startComplexOriginalRootMetadata(ComplexElementMetadata md) {
+ assert (isOriginalRoot);
+ assert (mapBuilderStack.isEmpty());
+ isOriginalRoot = false;
+ if (md instanceof InfosetSimpleElement)
+ DFDLSchemaError("Row as a simple type element is not supported.");
+ if (md.isArray())
+ DFDLSchemaError("Row element must not be an array.");
+ //
+ // We do nothing else here. Essentially the name of this top level element
+ // is not relevant to drill metadata. Think of it as a table name, or a name for the
+ // entire final set of rows that are the query result.
+ }
+
+ /**
+ * This complex type element becomes a column of the row set which is itself a map.
+ * @param md
+ */
+ private void startComplexElementAsRowSetColumnMetadata(ComplexElementMetadata md) {
+ assert(!isOriginalRoot);
+ assert(mapBuilderStack.isEmpty());
+ String colName = makeColumnName(md);
+ //
+ // This directly adds to the builder, as this complex element itself is a column
+ // of the top level row
+ //
+ // Then it creates a map builder for recursively adding the contents.
+ //
+ if (md.isArray())
+ mapBuilderStack.push(builder.addMapArray(colName));
+ else
+ mapBuilderStack.push(builder.addMap(colName)); // also handles optional complex elements
+ }
+
+ private void startComplexChildElementMetadata(ComplexElementMetadata md) {
+ assert (!mapBuilderStack.isEmpty());
+ assert (!isOriginalRoot);
+ String colName = makeColumnName(md);
+ //
+ // This is for non-top-level columns, but adding a field within a map.
+ // That map represents a complex type element that does NOT correspond to
+ // the top-level rows.
+ //
+ if (md.isArray()) {
+ // there is no notion of optional/nullable in drill for arrays.
+ // Note that our arrays are always maps. We don't have pure-value
+ // simple-type arrays.
+ mapBuilderStack.push(mapBuilder().addMapArray(colName));
+ } else {
+ // there is no concept of optional/nullable in drill for maps.
+ mapBuilderStack.push(mapBuilder().addMap(colName));
+ }
+ }
+
+ @Override
+ public void endComplexElementMetadata(ComplexElementMetadata md) {
+ // the mapBuilderStack can be empty if we had the most basic
+ // kind of schema with a repeating row-set containing only
+ // simple type children.
+ if (!mapBuilderStack.isEmpty()) {
+ if (mapBuilderStack.size() == 1) {
+ mapBuilder().resumeSchema();
+ } else {
+ mapBuilder().resumeMap();
+ }
+ mapBuilderStack.pop(); // only pop if there was something on the stack.
+ }
+ }
+
+ @Override
+ public void startSequenceMetadata(SequenceMetadata m) {}
+
+ @Override
+ public void endSequenceMetadata(SequenceMetadata m) {}
+
+ @Override
+ public void startChoiceMetadata(ChoiceMetadata m) {}
+
+ @Override
+ public void endChoiceMetadata(ChoiceMetadata m) {}
+
+ private void DFDLSchemaError(String s) {
+ throw new RuntimeException(s);
+ }
+}
diff --git a/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json b/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 00000000000..966d9ba1b5b
--- /dev/null
+++ b/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,37 @@
+{
+ "storage":{
+ "dfs": {
+ "type": "file",
+ "formats": {
+ "daffodil": {
+ "type": "daffodil",
+ "extensions": [
+ "dat"
+ ]
+ }
+ }
+ },
+ "cp": {
+ "type": "file",
+ "formats": {
+ "daffodil": {
+ "type": "daffodil",
+ "extensions": [
+ "dat"
+ ]
+ }
+ }
+ },
+ "s3": {
+ "type": "file",
+ "formats": {
+ "daffodil": {
+ "type": "daffodil",
+ "extensions": [
+ "dat"
+ ]
+ }
+ }
+ }
+ }
+}
diff --git a/contrib/format-daffodil/src/main/resources/drill-module.conf b/contrib/format-daffodil/src/main/resources/drill-module.conf
new file mode 100644
index 00000000000..52a902572e3
--- /dev/null
+++ b/contrib/format-daffodil/src/main/resources/drill-module.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.classpath.scanning: {
+ packages += "org.apache.drill.exec.store.daffodil"
+}
diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java
new file mode 100644
index 00000000000..7df13619324
--- /dev/null
+++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.drill.categories.RowSetTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+@Category(RowSetTest.class)
+public class TestDaffodilReader extends ClusterTest {
+
+ String schemaURIRoot = "file:///opt/drill/contrib/format-daffodil/src/test/resources/";
+ @BeforeClass
+ public static void setup() throws Exception {
+ // boilerplate call to start test rig
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ DaffodilFormatConfig formatConfig =
+ new DaffodilFormatConfig(null,
+ "",
+ "",
+ "",
+ false);
+
+ cluster.defineFormat("dfs", "daffodil", formatConfig);
+
+ // Needed to test against compressed files.
+ // Copies data from src/test/resources to the dfs root.
+ dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+ dirTestWatcher.copyResourceToRoot(Paths.get("schema/"));
+ }
+
+ private String selectRow(String schema, String file) {
+ return "SELECT * FROM table(dfs.`data/" + file + "` " +
+ " (type => 'daffodil'," +
+ " validationMode => 'true', " +
+ " schemaURI => '" + schemaURIRoot + "schema/" + schema + ".dfdl.xsd'," +
+ " rootName => 'row'," +
+ " rootNamespace => null " +
+ "))";
+ }
+
+ /**
+ * This unit test tests a simple data file
+ *
+ * @throws Exception Throw exception if anything goes wrong
+ */
+ @Test
+ public void testSimpleQuery1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("simple", "data01Int.dat.gz"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(1, results.rowCount());
+
+ // create the expected metadata and data for this test
+ // metadata first
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("col", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(0x00000101) // aka 257
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testSimpleQuery2() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("simple","data06Int.dat"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(6, results.rowCount());
+
+ // create the expected metadata and data for this test
+ // metadata first
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("col", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(0x00000101)
+ .addRow(0x00000102)
+ .addRow(0x00000103)
+ .addRow(0x00000104)
+ .addRow(0x00000105)
+ .addRow(0x00000106)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testComplexQuery1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("complex1", "data02Int.dat"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(1, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String col = rdr.getAsString();
+ assertEquals("{257, 258}", col);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ @Test
+ public void testComplexQuery2() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("complex1", "data06Int.dat"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(3, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals("{257, 258}", map);
+ rdr.next();
+ map = rdr.getAsString();
+ assertEquals("{259, 260}", map);
+ rdr.next();
+ map = rdr.getAsString();
+ assertEquals("{261, 262}", map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ /**
+ * Tests data which is rows of two ints and an array containing a
+ * map containing two ints.
+ * Each row can be visualized like this: "{257, 258, [{259, 260},...]}"
+ * @throws Exception
+ */
+ @Test
+ public void testComplexArrayQuery1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("complexArray1", "data12Int.dat"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(1, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals("{257, 258, [{259, 260}, {261, 262}, {257, 258}, {259, 260}, {261, 262}]}", map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ /**
+ * Tests data which is an array of ints in one column of the row set
+ * @throws Exception
+ */
+ @Test
+ public void testSimpleArrayQuery1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("simpleArrayField1", "data12Int.dat"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(1, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals("{[257, 258, 259, 260, 261, 262, 257, 258, 259, 260, 261, 262]}", map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ /**
+ * Tests data which is rows of two ints and an array containing a
+ * map containing an int and a vector of ints.
+ * Each row can be visualized like this: "{257, 258, [{259, [260, 261, 262]},...]}"
+ * @throws Exception
+ */
+ @Test
+ public void testComplexArrayQuery2() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("complexArray2", "data12Int.dat"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(1, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals("{257, 258, [{259, [260, 261, 262]}, {257, [258, 259, 260]}, {261, [262]}]}", map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ @Test
+ public void testMoreTypes1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("moreTypes1", "moreTypes1.txt.dat"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(2, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals("{2147483647, 9223372036854775807, 32767, 127, true, " +
+ "1.7976931348623157E308, 3.4028235E38, [31, 32, 33, 34, 35, 36, 37, 38], \"daffodil\"}", map);
+ rdr.next();
+ map = rdr.getAsString();
+ assertEquals("{-2147483648, -9223372036854775808, -32768, -128, false, " +
+ "-1.7976931348623157E308, -3.4028235E38, [38, 37, 36, 35, 34, 33, 32, 31], \"drill\"}", map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+}
diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/schema/TestDaffodilToDrillMetadataConversion.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/schema/TestDaffodilToDrillMetadataConversion.java
new file mode 100644
index 00000000000..8076fe5e8cf
--- /dev/null
+++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/schema/TestDaffodilToDrillMetadataConversion.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil.schema;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestDaffodilToDrillMetadataConversion {
+
+ @Test
+ public void testSimple() throws Exception {
+ URI schemaURI = getClass().getResource("/schema/simple.dfdl.xsd").toURI();
+ TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI, "row", null);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("col", MinorType.INT)
+ .buildSchema();
+ assertTrue(expectedSchema.isEquivalent(schema));
+ }
+
+ @Test
+ public void testComplex1() throws Exception {
+ URI schemaURI = getClass().getResource("/schema/complex1.dfdl.xsd").toURI();
+ TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI, "row", null);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a1", MinorType.INT)
+ .add("a2", MinorType.INT)
+ .buildSchema();
+ assertTrue(expectedSchema.isEquivalent(schema));
+ }
+
+ @Test
+ public void testComplex2() throws Exception {
+ URI schemaURI = getClass().getResource("/schema/complex2.dfdl.xsd").toURI();
+ TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI, "row", null);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a1", MinorType.INT)
+ .add("a2", MinorType.INT)
+ .addMap("b")
+ .add("b1", MinorType.INT)
+ .add("b2", MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+ assertTrue(expectedSchema.isEquivalent(schema));
+ }
+
+}
diff --git a/contrib/format-daffodil/src/test/resources/data/data01Int.dat b/contrib/format-daffodil/src/test/resources/data/data01Int.dat
new file mode 100644
index 00000000000..dee9c4c8ada
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data01Int.dat differ
diff --git a/contrib/format-daffodil/src/test/resources/data/data01Int.dat.gz b/contrib/format-daffodil/src/test/resources/data/data01Int.dat.gz
new file mode 100644
index 00000000000..5e4b3b37acf
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data01Int.dat.gz differ
diff --git a/contrib/format-daffodil/src/test/resources/data/data02Int.dat b/contrib/format-daffodil/src/test/resources/data/data02Int.dat
new file mode 100644
index 00000000000..8577a259181
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data02Int.dat differ
diff --git a/contrib/format-daffodil/src/test/resources/data/data06Int.dat b/contrib/format-daffodil/src/test/resources/data/data06Int.dat
new file mode 100644
index 00000000000..8c29db2ec51
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data06Int.dat differ
diff --git a/contrib/format-daffodil/src/test/resources/data/data12Int.dat b/contrib/format-daffodil/src/test/resources/data/data12Int.dat
new file mode 100644
index 00000000000..39fa5271b4f
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data12Int.dat differ
diff --git a/contrib/format-daffodil/src/test/resources/data/moreTypes1.txt.dat b/contrib/format-daffodil/src/test/resources/data/moreTypes1.txt.dat
new file mode 100644
index 00000000000..cd0b5c1591a
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/data/moreTypes1.txt.dat
@@ -0,0 +1,2 @@
+2147483647 9223372036854775807 32767 127 T 1.7976931348623157E308 3.4028235E38 12345678 'daffodil'
+-2147483648 -9223372036854775808 -32768 -128 F -1.7976931348623157E308 -3.4028235E38 87654321 'drill'
diff --git a/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd
new file mode 100644
index 00000000000..1f4ab8954ef
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd
@@ -0,0 +1,54 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/complex2.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complex2.dfdl.xsd
new file mode 100644
index 00000000000..d4d74aefe1f
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/complex2.dfdl.xsd
@@ -0,0 +1,65 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd
new file mode 100644
index 00000000000..59d285bd31e
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd
@@ -0,0 +1,65 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/complexArray2.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complexArray2.dfdl.xsd
new file mode 100644
index 00000000000..89d74fa2de9
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/complexArray2.dfdl.xsd
@@ -0,0 +1,65 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/moreTypes1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/moreTypes1.dfdl.xsd
new file mode 100644
index 00000000000..b1a8085a9bd
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/moreTypes1.dfdl.xsd
@@ -0,0 +1,70 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd
new file mode 100644
index 00000000000..eea582b9a56
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd
@@ -0,0 +1,71 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/simpleArrayField1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/simpleArrayField1.dfdl.xsd
new file mode 100644
index 00000000000..6c72c375159
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/simpleArrayField1.dfdl.xsd
@@ -0,0 +1,71 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 59747eba87f..7379fd37325 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -42,6 +42,7 @@
data
format-access
+ format-daffodil
format-deltalake
format-esri
format-excel
diff --git a/distribution/pom.xml b/distribution/pom.xml
index c5c1d678099..560bfaa3780 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -468,6 +468,11 @@
drill-format-log
${project.version}
+
+ org.apache.drill.contrib
+ drill-format-daffodil
+ ${project.version}
+
org.apache.drill.contrib
drill-druid-storage
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 48a6360d8a9..06b71fee826 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -29,6 +29,7 @@
org.apache.drill.contrib.data:tpch-sample-data:jar
org.apache.drill.contrib:drill-deltalake-format:jar
org.apache.drill.contrib:drill-druid-storage:jar
+ org.apache.drill.contrib:drill-format-daffodil:jar
org.apache.drill.contrib:drill-format-esri:jar
org.apache.drill.contrib:drill-format-excel:jar
org.apache.drill.contrib:drill-format-hdf5:jar
diff --git a/pom.xml b/pom.xml
index 9ce88959ba4..c97b64e5cb6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@
4.3.3
2.1.12
+ 3.7.0-SNAPSHOT
@@ -316,6 +317,11 @@
logback-core
${logback.version}
+
+ org.apache.daffodil
+ daffodil-japi_2.12
+ ${daffodil.version}
+