Skip to content

Commit

Permalink
WIP converting to EVF v2. Pushing to repo for troubleshooting purposes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Megan Foss committed Mar 10, 2022
1 parent 1a91592 commit 4b221b5
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator
private CustomErrorContext errorContext;
private InputStream fsStream;
private ResultSetLoader loader;
private RowSetLoader writer;
private BufferedReader reader;
private int lineNum;

public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
this.config = config;
this.config = config; //reader-specific schema and projection manager
this.maxRecords = maxRecords;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.apache.drill.exec.store.fixedwidth;

import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycle;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ReaderLifecycle;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.SchemaNegotiatorImpl;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

public class FixedwidthBatchReaderImpl implements ManagedReader {

private final int maxRecords;
private final FixedwidthFormatConfig config;
private InputStream fsStream;
private ResultSetLoader loader;
private FileSplit split;
private CustomErrorContext errorContext;
private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);

public FixedwidthBatchReaderImpl (SchemaNegotiator negotiator, FixedwidthFormatConfig config, int maxRecords) {
this.loader = open(negotiator);
this.config = config;
this.maxRecords = maxRecords;
}

@Override
public boolean next() {

}

@Override
public void close() {
if (fsStream != null){
AutoCloseables.closeSilently(fsStream);
fsStream = null;
}
}

private ResultSetLoader open(SchemaNegotiator negotiator) {
split = (FileSplit) negotiator.split();
errorContext = negotiator.parentErrorContext();
openFile(negotiator);

try {
negotiator.tableSchema(buildSchema(), true);
loader = negotiator.build();
} catch (Exception e) {
throw UserException
.dataReadError(e)
.message("Failed to open input file: {}", split.getPath().toString())
.addContext(errorContext)
.addContext(e.getMessage())
.build(logger);
}
reader = new BufferedReader(new InputStreamReader(fsStream, Charsets.UTF_8));
return loader;
}

private void openFile(FileSchemaNegotiator negotiator) {
try {
fsStream = negotiator.file().fileSystem().openPossiblyCompressedStream(split.getPath());
sasFileReader = new SasFileReaderImpl(fsStream);
firstRow = sasFileReader.readNext();
} catch (IOException e) {
throw UserException
.dataReadError(e)
.message("Unable to open Fixed Width File %s", split.getPath())
.addContext(e.getMessage())
.addContext(errorContext)
.build(logger);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public void setFieldTypes(int i) {
}
}

@JsonIgnore
public void validateFieldInput(){
Set<String> uniqueNames = new HashSet<>();
List<Integer> fieldIndices = this.getFieldIndices();
Expand Down Expand Up @@ -192,7 +191,7 @@ public void validateFieldInput(){
if (!Pattern.matches("[a-zA-Z]\\w*", name)) {
throw UserException
.validationError()
.message("Invalid input: " + name)
.message("Column Name '" + name + "' is not valid. Must contain letters, numbers, and underscores only.")
.addContext("Plugin", FixedwidthFormatPlugin.DEFAULT_NAME)
.build(logger);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan sc
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ public static void setup() throws Exception {
FixedwidthFormatConfig formatConfig = new FixedwidthFormatConfig(Lists.newArrayList("fwf"),
Lists.newArrayList(
new FixedwidthFieldConfig("Number", 1, 5, TypeProtos.MinorType.VARDECIMAL),
new FixedwidthFieldConfig("Address",12, 3,TypeProtos.MinorType.INT, ""),
new FixedwidthFieldConfig("Letter", 7,4, TypeProtos.MinorType.VARCHAR, ""),
new FixedwidthFieldConfig("Date",16, 10,TypeProtos.MinorType.DATE, "MM-dd-yyyy"),
new FixedwidthFieldConfig( "Time", 27, 8,TypeProtos.MinorType.TIME,"HH:mm:ss" ),
new FixedwidthFieldConfig("DateTime", 36, 23,TypeProtos.MinorType.TIMESTAMP, "MM-dd-yyyy'T'HH:mm:ss.SSX" )
new FixedwidthFieldConfig("Address", 12, 3, TypeProtos.MinorType.INT),
new FixedwidthFieldConfig("Letter", 7, 4, TypeProtos.MinorType.VARCHAR),
new FixedwidthFieldConfig("Date", 16, 10, TypeProtos.MinorType.DATE, "MM-dd-yyyy"),
new FixedwidthFieldConfig("Time", 27, 8, TypeProtos.MinorType.TIME,"HH:mm:ss"),
new FixedwidthFieldConfig("DateTime", 36, 23, TypeProtos.MinorType.TIMESTAMP, "MM-dd-yyyy'T'HH:mm:ss.SSX")
));
cluster.defineFormat("dfs", "fwf", formatConfig);
cluster.defineFormat("cp", "fwf", formatConfig);
Expand Down Expand Up @@ -218,4 +218,3 @@ private RowSet setupTestData(){
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,6 @@ public interface SchemaNegotiator {
* schema order
*/
ResultSetLoader build();

Object split();
}

0 comments on commit 4b221b5

Please sign in to comment.