Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Flight SQL server support #6023

Open
wants to merge 91 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
185fd4f
Initial proj setup and minimal impl
jmao-denver Jul 17, 2024
04288bf
Implement some meta calls
jmao-denver Aug 5, 2024
e79c493
Fix/disable tests
jmao-denver Aug 6, 2024
2002936
Add JDBC test
jmao-denver Aug 14, 2024
84c737b
Plumb rudamentary DoAction / GetFlightInfo command support
devinrsmith Aug 15, 2024
f7c9df1
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Aug 29, 2024
3c4ed60
Ensure ticket tables go through auth transform
devinrsmith Aug 29, 2024
d045ac4
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Sep 5, 2024
770956d
Cleanup FlightSQL parts
devinrsmith Sep 5, 2024
51e039d
cleanup auth cookie handling
devinrsmith Sep 5, 2024
a04f8ab
Disable gRPC trace logging
devinrsmith Sep 5, 2024
9caef38
Fix downstream dagger compile
devinrsmith Sep 5, 2024
329bdbf
More cleanup
devinrsmith Sep 5, 2024
7d00fe6
remove unused class
devinrsmith Sep 5, 2024
7b79e70
more unused
devinrsmith Sep 5, 2024
90bb328
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Sep 23, 2024
9649204
Add ActionResolver
devinrsmith Sep 23, 2024
7e96815
f
devinrsmith Sep 24, 2024
19701ba
Simplify auth cookie logic. Cleanup testing. Create jdbcTest source set.
devinrsmith Oct 1, 2024
af3d62c
f
devinrsmith Oct 2, 2024
c0e260e
In-process FlightSQL testing
devinrsmith Oct 2, 2024
158dfcb
f
devinrsmith Oct 3, 2024
995aaa2
more tests
devinrsmith Oct 3, 2024
3550037
refactor actions
devinrsmith Oct 3, 2024
8051c36
cleanup
devinrsmith Oct 3, 2024
77961f9
f
devinrsmith Oct 4, 2024
141403a
Fix table schema
devinrsmith Oct 4, 2024
3e130a0
f
devinrsmith Oct 4, 2024
088503c
f
devinrsmith Oct 7, 2024
c4af2b9
Tests work
devinrsmith Oct 11, 2024
f593aa8
Cleanup
devinrsmith Oct 11, 2024
1aaabd8
More tests
devinrsmith Oct 11, 2024
8ec940e
more tests
devinrsmith Oct 15, 2024
9f5dffc
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql-com…
devinrsmith Oct 15, 2024
b57b972
More tests
devinrsmith Oct 16, 2024
b316621
Add JDBC tests
devinrsmith Oct 16, 2024
3b221f0
f
devinrsmith Oct 17, 2024
54e1ad2
f
devinrsmith Oct 18, 2024
b69738f
f
devinrsmith Oct 18, 2024
0ddf2eb
f
devinrsmith Oct 22, 2024
78e8899
f
devinrsmith Oct 22, 2024
11313f0
f
devinrsmith Oct 22, 2024
7f426bf
f
devinrsmith Oct 22, 2024
c2d3ab1
Comments
devinrsmith Oct 23, 2024
1c564f4
f
devinrsmith Oct 23, 2024
2ca027b
f
devinrsmith Oct 23, 2024
1a9d0fa
More tests
devinrsmith Oct 23, 2024
96af11b
Manage
devinrsmith Oct 23, 2024
accfda8
Use randomized handles instead of incrementing ids
devinrsmith Oct 23, 2024
742ad24
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Oct 23, 2024
65fe87c
Undid wrapping, we are not calling onError for unauthentication sessions
devinrsmith Oct 24, 2024
5092476
oops, remove idea folder
devinrsmith Oct 24, 2024
6df7379
Introduce PathResolver
devinrsmith Oct 24, 2024
7ef6ade
Update ActionResolver to ensure it supports more complicated (off-thr…
devinrsmith Oct 24, 2024
8e84a4c
Add Schema support
devinrsmith Oct 24, 2024
a5e689c
Plumb Schema support
devinrsmith Oct 24, 2024
6e3395c
feat: Add arrow Schema as supported type
devinrsmith Oct 24, 2024
ab17945
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Oct 24, 2024
8614ff1
rename
devinrsmith Oct 24, 2024
4e48079
Refactor to fix stringify come first. Add explicit comment that strin…
devinrsmith Oct 24, 2024
cccfadd
Merge branch 'add-schema-object-support' into 5339-flight-sql
devinrsmith Oct 24, 2024
bd5fb1e
move serializeToByteString
devinrsmith Oct 24, 2024
5bc3182
remove VARBINARY change for byte[]
devinrsmith Oct 25, 2024
38d8c0f
Add reference to relevant ticket
devinrsmith Oct 25, 2024
f1e587d
review response 1
devinrsmith Oct 25, 2024
8650c2e
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Oct 25, 2024
cfe3109
Review response part 2
devinrsmith Oct 25, 2024
2b4db46
Nate comments
devinrsmith Oct 29, 2024
6bcf92e
f
devinrsmith Oct 29, 2024
f90463c
Colin review points
devinrsmith Oct 29, 2024
a040826
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Oct 29, 2024
ca8e0b3
Migrate flightsql to server-jetty-app
devinrsmith Oct 29, 2024
89f0d70
Migrate to extensions/flight-sql
devinrsmith Oct 29, 2024
7d1f57b
fix EmbeddedServer
devinrsmith Oct 29, 2024
938c05a
add configuration to disable resolvers
devinrsmith Oct 29, 2024
094401f
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Nov 4, 2024
49a0195
Fixup util-thread wrt downstream dagger
devinrsmith Nov 4, 2024
9830ce0
Ensure FlightSQL tests work on Java 17+
devinrsmith Nov 4, 2024
4ecbe2a
Only retain table if it is refreshing.
devinrsmith Nov 5, 2024
20c18d3
Small comments / error message changes
devinrsmith Nov 5, 2024
260c679
Add typed visitors for Commands and Tickets
devinrsmith Nov 5, 2024
00cd69e
Create FlightSqlActionHelper
devinrsmith Nov 6, 2024
171e5b1
Refactor constants
devinrsmith Nov 6, 2024
fd4043a
cleanup
devinrsmith Nov 6, 2024
bb77291
Create ActionVisitorBase
devinrsmith Nov 6, 2024
a902aef
Create CommandVisitorBase
devinrsmith Nov 6, 2024
6ae2ea9
Review response
devinrsmith Nov 12, 2024
306dc26
Divorce jdbcTest source set from test source set
devinrsmith Nov 12, 2024
6a954f5
review points
devinrsmith Nov 13, 2024
758c42c
More review responses
devinrsmith Nov 13, 2024
d9d4a8b
statementNeverExecuted and one more isDeniedAccess optimization
devinrsmith Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions engine/sql/src/main/java/io/deephaven/engine/sql/Sql.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@
import io.deephaven.qst.table.TableHeader.Builder;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.qst.type.Type;
import io.deephaven.sql.Scope;
import io.deephaven.sql.ScopeStaticImpl;
import io.deephaven.sql.SqlAdapter;
import io.deephaven.sql.TableInformation;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.annotations.ScriptApi;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;

/**
* Experimental SQL execution. Subject to change.
Expand All @@ -47,35 +50,38 @@ public static TableSpec dryRun(String sql) {
return dryRun(sql, currentScriptSessionNamedTables());
}

@InternalUseOnly
public static TableSpec parseSql(String sql, Map<String, Table> scope, Function<String, TicketTable> ticketFunction,
Map<TicketTable, Table> out) {
return SqlAdapter.parseSql(sql, scope(scope, out, ticketFunction));
}

private static Table evaluate(String sql, Map<String, Table> scope) {
final Map<TicketTable, Table> map = new HashMap<>(scope.size());
final TableSpec tableSpec = parseSql(sql, scope, map);
final TableSpec tableSpec = parseSql(sql, scope, Sql::sqlref, map);
log.debug().append("Executing. Graphviz representation:").nl().append(ToGraphvizDot.INSTANCE, tableSpec).endl();
return tableSpec.logic().create(new TableCreatorTicketInterceptor(TableCreatorImpl.INSTANCE, map));
}

private static TableSpec dryRun(String sql, Map<String, Table> scope) {
final TableSpec tableSpec = parseSql(sql, scope, null);
final TableSpec tableSpec = parseSql(sql, scope, Sql::sqlref, null);
log.info().append("Dry run. Graphviz representation:").nl().append(ToGraphvizDot.INSTANCE, tableSpec).endl();
return tableSpec;
}

private static TableSpec parseSql(String sql, Map<String, Table> scope, Map<TicketTable, Table> out) {
return SqlAdapter.parseSql(sql, scope(scope, out));
}

private static TicketTable sqlref(String tableName) {
// The TicketTable can technically be anything unique (incrementing number, random, ...), but for
// visualization purposes it makes sense to use the (already unique) table name.
return TicketTable.of(("sqlref/" + tableName).getBytes(StandardCharsets.UTF_8));
}

private static Scope scope(Map<String, Table> scope, Map<TicketTable, Table> out) {
private static Scope scope(Map<String, Table> scope, Map<TicketTable, Table> out,
Function<String, TicketTable> ticketFunction) {
final ScopeStaticImpl.Builder builder = ScopeStaticImpl.builder();
for (Entry<String, Table> e : scope.entrySet()) {
final String tableName = e.getKey();
final Table table = e.getValue();
// The TicketTable can technically be anything unique (incrementing number, random, ...), but for
// visualization purposes it makes sense to use the (already unique) table name.
final TicketTable spec = sqlref(tableName);
final TicketTable spec = ticketFunction.apply(tableName);
final List<String> qualifiedName = List.of(tableName);
final TableHeader header = adapt(table.getDefinition());
builder.addTables(TableInformation.of(qualifiedName, header, spec));
Expand Down Expand Up @@ -103,11 +109,7 @@ private static TableHeader adapt(TableDefinition tableDef) {
}

private static ColumnHeader<?> adapt(ColumnDefinition<?> columnDef) {
if (columnDef.getComponentType() == null) {
return ColumnHeader.of(columnDef.getName(), columnDef.getDataType());
}
// SQLTODO(array-type)
throw new UnsupportedOperationException("SQLTODO(array-type)");
return ColumnHeader.of(columnDef.getName(), Type.find(columnDef.getDataType()));
}

private enum ToGraphvizDot implements ObjFormatter<TableSpec> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,17 @@

import io.deephaven.engine.table.Table;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.EmptyTable;
import io.deephaven.qst.table.InputTable;
import io.deephaven.qst.table.MultiJoinInput;
import io.deephaven.qst.table.NewTable;
import io.deephaven.qst.TableCreatorDelegate;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.qst.table.TimeTable;

import java.util.List;
import java.util.Map;
import java.util.Objects;

class TableCreatorTicketInterceptor implements TableCreator<Table> {
private final TableCreator<Table> delegate;
class TableCreatorTicketInterceptor extends TableCreatorDelegate<Table> {
private final Map<TicketTable, Table> map;

public TableCreatorTicketInterceptor(TableCreator<Table> delegate, Map<TicketTable, Table> map) {
this.delegate = Objects.requireNonNull(delegate);
super(delegate);
this.map = Objects.requireNonNull(map);
}

Expand All @@ -31,36 +25,6 @@ public Table of(TicketTable ticketTable) {
if (table != null) {
return table;
}
return delegate.of(ticketTable);
}

@Override
public Table of(NewTable newTable) {
return delegate.of(newTable);
}

@Override
public Table of(EmptyTable emptyTable) {
return delegate.of(emptyTable);
}

@Override
public Table of(TimeTable timeTable) {
return delegate.of(timeTable);
}

@Override
public Table of(InputTable inputTable) {
return delegate.of(inputTable);
}

@Override
public Table multiJoin(List<MultiJoinInput<Table>> multiJoinInputs) {
return delegate.multiJoin(multiJoinInputs);
}

@Override
public Table merge(Iterable<Table> tables) {
return delegate.merge(tables);
return super.of(ticketTable);
}
}
31 changes: 17 additions & 14 deletions engine/table/src/main/java/io/deephaven/engine/util/TableTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.util.type.ArrayTypeUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.*;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -67,13 +68,13 @@ private static <T> BinaryOperator<T> throwingMerger() {
};
}

private static <T, K, U> Collector<T, ?, Map<K, U>> toLinkedMap(
private static <T, K, U> Collector<T, ?, LinkedHashMap<K, U>> toLinkedMap(
Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper) {
return Collectors.toMap(keyMapper, valueMapper, throwingMerger(), LinkedHashMap::new);
}

private static final Collector<ColumnHolder<?>, ?, Map<String, ColumnSource<?>>> COLUMN_HOLDER_LINKEDMAP_COLLECTOR =
private static final Collector<ColumnHolder<?>, ?, LinkedHashMap<String, ColumnSource<?>>> COLUMN_HOLDER_LINKEDMAP_COLLECTOR =
toLinkedMap(ColumnHolder::getName, ColumnHolder::getColumnSource);

/////////// Utilities To Display Tables /////////////////
Expand Down Expand Up @@ -752,22 +753,24 @@ public static Table newTable(ColumnHolder<?>... columnHolders) {
checkSizes(columnHolders);
WritableRowSet rowSet = getRowSet(columnHolders);
Map<String, ColumnSource<?>> columns = Arrays.stream(columnHolders).collect(COLUMN_HOLDER_LINKEDMAP_COLLECTOR);
return new QueryTable(rowSet.toTracking(), columns) {
{
setFlat();
}
};
QueryTable queryTable = new QueryTable(rowSet.toTracking(), columns);
queryTable.setFlat();
return queryTable;
}

public static Table newTable(TableDefinition definition, ColumnHolder<?>... columnHolders) {
return newTable(definition, null, columnHolders);
}

public static Table newTable(TableDefinition definition, @Nullable Map<String, Object> attributes,
ColumnHolder<?>... columnHolders) {
checkSizes(columnHolders);
WritableRowSet rowSet = getRowSet(columnHolders);
Map<String, ColumnSource<?>> columns = Arrays.stream(columnHolders).collect(COLUMN_HOLDER_LINKEDMAP_COLLECTOR);
return new QueryTable(definition, rowSet.toTracking(), columns) {
{
setFlat();
}
};
final WritableRowSet rowSet = getRowSet(columnHolders);
final LinkedHashMap<String, ColumnSource<?>> columns =
Arrays.stream(columnHolders).collect(COLUMN_HOLDER_LINKEDMAP_COLLECTOR);
final QueryTable queryTable = new QueryTable(definition, rowSet.toTracking(), columns, null, attributes);
queryTable.setFlat();
return queryTable;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private Liveness() {}
/**
* <p>
* Determine whether a cached object should be reused, w.r.t. liveness. Null inputs are never safe for reuse. If the
* object is a {@link LivenessReferent} and not a non-refreshing {@link DynamicNode}, this method will return the
* result of trying to manage object with the top of the current thread's {@link LivenessScopeStack}.
* object is a {@link LivenessReferent} and is a refreshing {@link DynamicNode}, this method will return the result
* of trying to manage object with the top of the current thread's {@link LivenessScopeStack}.
*
* @param object The object
* @return True if the object did not need management, or if it was successfully managed, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import com.google.protobuf.ByteStringAccess;
import com.google.rpc.Code;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.util.NameValidator;
import io.deephaven.barrage.flatbuf.BarrageMessageWrapper;
import io.deephaven.base.ArrayUtil;
import io.deephaven.base.ClassUtil;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ChunkType;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -27,6 +29,8 @@
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.util.ColumnFormatting;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
Expand All @@ -35,15 +39,10 @@
import io.deephaven.extensions.barrage.chunk.vector.VectorExpansionKernel;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.ExportedTableCreationResponse;
import io.deephaven.proto.flight.util.MessageHelper;
import io.deephaven.proto.flight.util.SchemaHelper;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.api.util.NameValidator;
import io.deephaven.engine.util.ColumnFormatting;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.chunk.ChunkType;
import io.deephaven.proto.backplane.grpc.ExportedTableCreationResponse;
import io.deephaven.qst.column.Column;
import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;
import io.grpc.stub.StreamObserver;
Expand All @@ -69,8 +68,21 @@
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.function.*;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.ToIntFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -207,6 +219,14 @@ public static ByteString schemaBytesFromTableDefinition(
fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS, tableDefinition, attributes, isFlat));
}

public static Schema schemaFromTable(@NotNull final Table table) {
return makeSchema(DEFAULT_SNAPSHOT_DESER_OPTIONS, table.getDefinition(), table.getAttributes(), table.isFlat());
}

public static Schema toSchema(final TableDefinition definition, Map<String, Object> attributes, boolean isFlat) {
return makeSchema(DEFAULT_SNAPSHOT_DESER_OPTIONS, definition, attributes, isFlat);
}

public static ByteString schemaBytes(@NotNull final ToIntFunction<FlatBufferBuilder> schemaPayloadWriter) {

// note that flight expects the Schema to be wrapped in a Message prefixed by a 4-byte identifier
Expand All @@ -226,17 +246,23 @@ public static int makeTableSchemaPayload(
@NotNull final TableDefinition tableDefinition,
@NotNull final Map<String, Object> attributes,
final boolean isFlat) {
final Map<String, String> schemaMetadata = attributesToMetadata(attributes, isFlat);
return makeSchema(options, tableDefinition, attributes, isFlat).getSchema(builder);
}

public static Schema makeSchema(
@NotNull final StreamReaderOptions options,
@NotNull final TableDefinition tableDefinition,
@NotNull final Map<String, Object> attributes,
final boolean isFlat) {
final Map<String, String> schemaMetadata = attributesToMetadata(attributes, isFlat);
final Map<String, String> descriptions = GridAttributes.getColumnDescriptions(attributes);
final InputTableUpdater inputTableUpdater = (InputTableUpdater) attributes.get(Table.INPUT_TABLE_ATTRIBUTE);
final List<Field> fields = columnDefinitionsToFields(
descriptions, inputTableUpdater, tableDefinition, tableDefinition.getColumns(),
ignored -> new HashMap<>(),
attributes, options.columnsAsList())
.collect(Collectors.toList());

return new Schema(fields, schemaMetadata).getSchema(builder);
return new Schema(fields, schemaMetadata);
}

@NotNull
Expand Down
24 changes: 24 additions & 0 deletions extensions/flight-sql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Flight SQL

See [FlightSqlResolver](src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java) for documentation on
Deephaven's Flight SQL service.

## Client

The Flight SQL client is simple constructed based on the underlying Flight client.

```java
FlightClient flightClient = ...;
FlightSqlClient flightSqlClient = new FlightSqlClient(flightClient);
```

## JDBC

The default Flight SQL JDBC driver uses cookie authorization; by default, this is not enabled on the Deephaven server.
To enable this, the request header "x-deephaven-auth-cookie-request" must be set to "true".

Example JDBC connection string to self-signed TLS:

```
jdbc:arrow-flight-sql://localhost:8443/?Authorization=Anonymous&useEncryption=1&disableCertificateVerification=1&x-deephaven-auth-cookie-request=true
```
Loading