Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Aug 9, 2023
1 parent a986f45 commit 0472f80
Show file tree
Hide file tree
Showing 422 changed files with 508 additions and 562 deletions.
4 changes: 2 additions & 2 deletions kernel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Notice that there two sets of public APIs to build connectors.
# Project setup with Delta Kernel
The Delta Kernel project provides the following two Maven artifacts:
- `delta-kernel-api`: This is a must-have dependency and contains all the public `Table` and `TableClient` APIs discussed earlier.
- `delta-kernel-default`: This is an optional dependency that contains *default* implementations of the `TableClient` interfaces using Hadoop libraries. Developers can optionally use these default implementations to speed up the development of their Delta connector.
- `delta-kernel-defaults`: This is an optional dependency that contains *default* implementations of the `TableClient` interfaces using Hadoop libraries. Developers can optionally use these default implementations to speed up the development of their Delta connector.
```xml
<!-- Must have dependency -->
<dependency>
Expand All @@ -38,7 +38,7 @@ The Delta Kernel project provides the following two Maven artifacts:
<!-- Optional dependency -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-default</artifactId>
<artifactId>delta-kernel-defaults</artifactId>
<version>VERSION</version>
</dependency>
```
Expand Down
4 changes: 2 additions & 2 deletions kernel/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ val scalaTestVersion = "3.2.15"
val deltaSparkVersion = deltaStorageVersion
val sparkVersion = "3.3.2"

lazy val kernelDefault = (project in file("kernel-default"))
lazy val kernelDefaults = (project in file("kernel-defaults"))
.dependsOn(kernelApi)
.settings(
name := "delta-kernel-default",
name := "delta-kernel-defaults",
commonSettings,
scalaStyleSettings,
releaseSettings,
Expand Down
13 changes: 12 additions & 1 deletion kernel/dev/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,15 @@
<blank line>
import io.delta.kernel.*
import io.delta.kernel.internal.*
<blank line>
import io.delta.kernel.defaults.*
import io.delta.kernel.defaults.internal.*
-->
<module name="ImportOrder">
<property name="separated" value="true"/>
<property name="ordered" value="true"/>
<property name="groups" value="java,javax,scala,*,io.delta.kernel,io.delta.kernel.internal"/>
<property name="groups"
value="java,javax,scala,*,io.delta.kernel,io.delta.kernel.internal,io.delta.kernel.defaults,io.delta.kernel.defaults.internal"/>
<property name="option" value="under"/>
</module>

Expand All @@ -216,6 +220,13 @@
<property name="checks" value="ImportOrder"/>
<property name="message" value="^'io.delta.kernel.internal\..*'.*"/>
</module>

<!-- io.delta.kernel.defaults and io.delta.kernel.defaults.internal -->
<module name="SuppressionXpathSingleFilter">
<property name="checks" value="ImportOrder"/>
<property name="message" value="^'io.delta.kernel.defaults.internal\..*'.*"/>
</module>

<!-- java and javax -->
<module name="SuppressionXpathSingleFilter">
<property name="checks" value="ImportOrder"/>
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/table-reader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ limitations under the License.-->

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-default</artifactId>
<artifactId>delta-kernel-defaults</artifactId>
<version>${delta-kernel.version}</version>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
Expand All @@ -31,38 +34,34 @@
import org.apache.hadoop.conf.Configuration;

import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.DefaultTableClient;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.DataReadResult;
import io.delta.kernel.data.vector.VectorUtils;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.defaults.internal.data.vector.VectorUtils;

/**
* Base class for reading Delta Lake tables using the Delta Kernel APIs.
*/
public abstract class BaseTableReader
{
public abstract class BaseTableReader {
public static final int DEFAULT_LIMIT = 20;

protected final String tablePath;
protected final TableClient tableClient;

public BaseTableReader(String tablePath)
{
public BaseTableReader(String tablePath) {
this.tablePath = requireNonNull(tablePath);
this.tableClient = DefaultTableClient.create(new Configuration());
}

/**
* Show the given {@code limit} rows containing the given columns from the table.
*
* @param limit Max number of rows to show.
* @param limit Max number of rows to show.
* @param columnsOpt If null, show all columns in the table.
* @throws TableNotFoundException
* @throws IOException
Expand All @@ -74,8 +73,7 @@ public abstract void show(int limit, Optional<List<String>> columnsOpt)
* Utility method to return a pruned schema that contains the given {@code columns} from
* {@code baseSchema}
*/
protected static StructType pruneSchema(StructType baseSchema, Optional<List<String>> columns)
{
protected static StructType pruneSchema(StructType baseSchema, Optional<List<String>> columns) {
if (!columns.isPresent()) {
return baseSchema;
}
Expand All @@ -90,8 +88,7 @@ protected static StructType pruneSchema(StructType baseSchema, Optional<List<Str
return new StructType(selectedFields);
}

protected static int printData(DataReadResult dataReadResult, int maxRowsToPrint)
{
protected static int printData(DataReadResult dataReadResult, int maxRowsToPrint) {
int printedRowCount = 0;
ColumnarBatch data = dataReadResult.getData();
Optional<ColumnVector> selectionVector = dataReadResult.getSelectionVector();
Expand All @@ -107,13 +104,11 @@ protected static int printData(DataReadResult dataReadResult, int maxRowsToPrint
return printedRowCount;
}

protected static void printSchema(StructType schema)
{
protected static void printSchema(StructType schema) {
System.out.printf(formatter(schema.length()), schema.fieldNames().toArray(new String[0]));
}

protected static void printRow(ColumnarBatch batch, int rowId)
{
protected static void printRow(ColumnarBatch batch, int rowId) {
int numCols = batch.getSchema().length();
Object[] rowValues = IntStream.range(0, numCols).mapToObj(colOrdinal -> {
ColumnVector columnVector = batch.getColumnVector(colOrdinal);
Expand All @@ -126,18 +121,10 @@ protected static void printRow(ColumnarBatch batch, int rowId)
System.out.printf(formatter(numCols), rowValues);
}

private static String formatter(int length)
{
return IntStream.range(0, length)
.mapToObj(i -> "%20s")
.collect(Collectors.joining("|")) + "\n";
}

/**
* Minimum command line options for any implementation of this reader.
*/
protected static Options baseOptions()
{
protected static Options baseOptions() {
return new Options()
.addRequiredOption("t", "table", true, "Fully qualified table path")
.addOption("c", "columns", true,
Expand All @@ -157,14 +144,12 @@ protected static Options baseOptions()
/**
* Helper method to parse the command line arguments.
*/
protected static CommandLine parseArgs(Options options, String[] args)
{
protected static CommandLine parseArgs(Options options, String[] args) {
CommandLineParser cliParser = new DefaultParser();

try {
return cliParser.parse(options, args);
}
catch (ParseException parseException) {
} catch (ParseException parseException) {
new HelpFormatter().printHelp(
"java " + SingleThreadedTableReader.class.getCanonicalName(),
options,
Expand All @@ -175,19 +160,23 @@ protected static CommandLine parseArgs(Options options, String[] args)
return null;
}

protected static Optional<List<String>> parseColumnList(CommandLine cli, String optionName)
{
protected static Optional<List<String>> parseColumnList(CommandLine cli, String optionName) {
return Optional.ofNullable(cli.getOptionValue(optionName))
.map(colString -> Arrays.asList(colString.split(",[ ]*")));
}

protected static int parseInt(CommandLine cli, String optionName, int defaultValue)
throws ParseException
{
throws ParseException {
return Optional.ofNullable(cli.getParsedOptionValue(optionName))
.map(Number.class::cast)
.map(Number::intValue)
.orElse(defaultValue);
}

private static String formatter(int length) {
return IntStream.range(0, length)
.mapToObj(i -> "%20s")
.collect(Collectors.joining("|")) + "\n";
}
}

Loading

0 comments on commit 0472f80

Please sign in to comment.