Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Flight SQL server support #6023

Open
wants to merge 91 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
185fd4f
Initial proj setup and minimal impl
jmao-denver Jul 17, 2024
04288bf
Implement some meta calls
jmao-denver Aug 5, 2024
e79c493
Fix/disable tests
jmao-denver Aug 6, 2024
2002936
Add JDBC test
jmao-denver Aug 14, 2024
84c737b
Plumb rudamentary DoAction / GetFlightInfo command support
devinrsmith Aug 15, 2024
f7c9df1
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Aug 29, 2024
3c4ed60
Ensure ticket tables go through auth transform
devinrsmith Aug 29, 2024
d045ac4
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Sep 5, 2024
770956d
Cleanup FlightSQL parts
devinrsmith Sep 5, 2024
51e039d
cleanup auth cookie handling
devinrsmith Sep 5, 2024
a04f8ab
Disable gRPC trace logging
devinrsmith Sep 5, 2024
9caef38
Fix downstream dagger compile
devinrsmith Sep 5, 2024
329bdbf
More cleanup
devinrsmith Sep 5, 2024
7d00fe6
remove unused class
devinrsmith Sep 5, 2024
7b79e70
more unused
devinrsmith Sep 5, 2024
90bb328
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Sep 23, 2024
9649204
Add ActionResolver
devinrsmith Sep 23, 2024
7e96815
f
devinrsmith Sep 24, 2024
19701ba
Simplify auth cookie logic. Cleanup testing. Create jdbcTest source set.
devinrsmith Oct 1, 2024
af3d62c
f
devinrsmith Oct 2, 2024
c0e260e
In-process FlightSQL testing
devinrsmith Oct 2, 2024
158dfcb
f
devinrsmith Oct 3, 2024
995aaa2
more tests
devinrsmith Oct 3, 2024
3550037
refactor actions
devinrsmith Oct 3, 2024
8051c36
cleanup
devinrsmith Oct 3, 2024
77961f9
f
devinrsmith Oct 4, 2024
141403a
Fix table schema
devinrsmith Oct 4, 2024
3e130a0
f
devinrsmith Oct 4, 2024
088503c
f
devinrsmith Oct 7, 2024
c4af2b9
Tests work
devinrsmith Oct 11, 2024
f593aa8
Cleanup
devinrsmith Oct 11, 2024
1aaabd8
More tests
devinrsmith Oct 11, 2024
8ec940e
more tests
devinrsmith Oct 15, 2024
9f5dffc
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql-com…
devinrsmith Oct 15, 2024
b57b972
More tests
devinrsmith Oct 16, 2024
b316621
Add JDBC tests
devinrsmith Oct 16, 2024
3b221f0
f
devinrsmith Oct 17, 2024
54e1ad2
f
devinrsmith Oct 18, 2024
b69738f
f
devinrsmith Oct 18, 2024
0ddf2eb
f
devinrsmith Oct 22, 2024
78e8899
f
devinrsmith Oct 22, 2024
11313f0
f
devinrsmith Oct 22, 2024
7f426bf
f
devinrsmith Oct 22, 2024
c2d3ab1
Comments
devinrsmith Oct 23, 2024
1c564f4
f
devinrsmith Oct 23, 2024
2ca027b
f
devinrsmith Oct 23, 2024
1a9d0fa
More tests
devinrsmith Oct 23, 2024
96af11b
Manage
devinrsmith Oct 23, 2024
accfda8
Use randomized handles instead of incrementing ids
devinrsmith Oct 23, 2024
742ad24
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Oct 23, 2024
65fe87c
Undid wrapping, we are not calling onError for unauthentication sessions
devinrsmith Oct 24, 2024
5092476
oops, remove idea folder
devinrsmith Oct 24, 2024
6df7379
Introduce PathResolver
devinrsmith Oct 24, 2024
7ef6ade
Update ActionResolver to ensure it supports more complicated (off-thr…
devinrsmith Oct 24, 2024
8e84a4c
Add Schema support
devinrsmith Oct 24, 2024
a5e689c
Plumb Schema support
devinrsmith Oct 24, 2024
6e3395c
feat: Add arrow Schema as supported type
devinrsmith Oct 24, 2024
ab17945
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Oct 24, 2024
8614ff1
rename
devinrsmith Oct 24, 2024
4e48079
Refactor to fix stringify come first. Add explicit comment that strin…
devinrsmith Oct 24, 2024
cccfadd
Merge branch 'add-schema-object-support' into 5339-flight-sql
devinrsmith Oct 24, 2024
bd5fb1e
move serializeToByteString
devinrsmith Oct 24, 2024
5bc3182
remove VARBINARY change for byte[]
devinrsmith Oct 25, 2024
38d8c0f
Add reference to relevant ticket
devinrsmith Oct 25, 2024
f1e587d
review response 1
devinrsmith Oct 25, 2024
8650c2e
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Oct 25, 2024
cfe3109
Review response part 2
devinrsmith Oct 25, 2024
2b4db46
Nate comments
devinrsmith Oct 29, 2024
6bcf92e
f
devinrsmith Oct 29, 2024
f90463c
Colin review points
devinrsmith Oct 29, 2024
a040826
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Oct 29, 2024
ca8e0b3
Migrate flightsql to server-jetty-app
devinrsmith Oct 29, 2024
89f0d70
Migrate to extensions/flight-sql
devinrsmith Oct 29, 2024
7d1f57b
fix EmbeddedServer
devinrsmith Oct 29, 2024
938c05a
add configuration to disable resolvers
devinrsmith Oct 29, 2024
094401f
Merge remote-tracking branch 'upstream/main' into 5339-flight-sql
devinrsmith Nov 4, 2024
49a0195
Fixup util-thread wrt downstream dagger
devinrsmith Nov 4, 2024
9830ce0
Ensure FlightSQL tests work on Java 17+
devinrsmith Nov 4, 2024
4ecbe2a
Only retain table if it is refreshing.
devinrsmith Nov 5, 2024
20c18d3
Small comments / error message changes
devinrsmith Nov 5, 2024
260c679
Add typed visitors for Commands and Tickets
devinrsmith Nov 5, 2024
00cd69e
Create FlightSqlActionHelper
devinrsmith Nov 6, 2024
171e5b1
Refactor constants
devinrsmith Nov 6, 2024
fd4043a
cleanup
devinrsmith Nov 6, 2024
bb77291
Create ActionVisitorBase
devinrsmith Nov 6, 2024
a902aef
Create CommandVisitorBase
devinrsmith Nov 6, 2024
6ae2ea9
Review response
devinrsmith Nov 12, 2024
306dc26
Divorce jdbcTest source set from test source set
devinrsmith Nov 12, 2024
6a954f5
review points
devinrsmith Nov 13, 2024
758c42c
More review responses
devinrsmith Nov 13, 2024
d9d4a8b
statementNeverExecuted and one more isDeniedAccess optimization
devinrsmith Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions engine/sql/src/main/java/io/deephaven/engine/sql/Sql.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import io.deephaven.sql.ScopeStaticImpl;
import io.deephaven.sql.SqlAdapter;
import io.deephaven.sql.TableInformation;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.annotations.ScriptApi;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;

/**
* Experimental SQL execution. Subject to change.
Expand All @@ -47,35 +49,38 @@ public static TableSpec dryRun(String sql) {
return dryRun(sql, currentScriptSessionNamedTables());
}

@InternalUseOnly
public static TableSpec parseSql(String sql, Map<String, Table> scope, Function<String, TicketTable> ticketFunction,
Map<TicketTable, Table> out) {
return SqlAdapter.parseSql(sql, scope(scope, out, ticketFunction));
}

private static Table evaluate(String sql, Map<String, Table> scope) {
final Map<TicketTable, Table> map = new HashMap<>(scope.size());
final TableSpec tableSpec = parseSql(sql, scope, map);
final TableSpec tableSpec = parseSql(sql, scope, Sql::sqlref, map);
log.debug().append("Executing. Graphviz representation:").nl().append(ToGraphvizDot.INSTANCE, tableSpec).endl();
return tableSpec.logic().create(new TableCreatorTicketInterceptor(TableCreatorImpl.INSTANCE, map));
}

private static TableSpec dryRun(String sql, Map<String, Table> scope) {
final TableSpec tableSpec = parseSql(sql, scope, null);
final TableSpec tableSpec = parseSql(sql, scope, Sql::sqlref, null);
log.info().append("Dry run. Graphviz representation:").nl().append(ToGraphvizDot.INSTANCE, tableSpec).endl();
return tableSpec;
}

private static TableSpec parseSql(String sql, Map<String, Table> scope, Map<TicketTable, Table> out) {
return SqlAdapter.parseSql(sql, scope(scope, out));
}

private static TicketTable sqlref(String tableName) {
// The TicketTable can technically be anything unique (incrementing number, random, ...), but for
// visualization purposes it makes sense to use the (already unique) table name.
return TicketTable.of(("sqlref/" + tableName).getBytes(StandardCharsets.UTF_8));
}

private static Scope scope(Map<String, Table> scope, Map<TicketTable, Table> out) {
private static Scope scope(Map<String, Table> scope, Map<TicketTable, Table> out,
Function<String, TicketTable> ticketFunction) {
final ScopeStaticImpl.Builder builder = ScopeStaticImpl.builder();
for (Entry<String, Table> e : scope.entrySet()) {
final String tableName = e.getKey();
final Table table = e.getValue();
// The TicketTable can technically be anything unique (incrementing number, random, ...), but for
// visualization purposes it makes sense to use the (already unique) table name.
final TicketTable spec = sqlref(tableName);
final TicketTable spec = ticketFunction.apply(tableName);
final List<String> qualifiedName = List.of(tableName);
final TableHeader header = adapt(table.getDefinition());
builder.addTables(TableInformation.of(qualifiedName, header, spec));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,17 @@

import io.deephaven.engine.table.Table;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.EmptyTable;
import io.deephaven.qst.table.InputTable;
import io.deephaven.qst.table.MultiJoinInput;
import io.deephaven.qst.table.NewTable;
import io.deephaven.qst.TableCreatorDelegate;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.qst.table.TimeTable;

import java.util.List;
import java.util.Map;
import java.util.Objects;

class TableCreatorTicketInterceptor implements TableCreator<Table> {
private final TableCreator<Table> delegate;
class TableCreatorTicketInterceptor extends TableCreatorDelegate<Table> {
private final Map<TicketTable, Table> map;

public TableCreatorTicketInterceptor(TableCreator<Table> delegate, Map<TicketTable, Table> map) {
this.delegate = Objects.requireNonNull(delegate);
super(delegate);
this.map = Objects.requireNonNull(map);
}

Expand All @@ -31,36 +25,6 @@ public Table of(TicketTable ticketTable) {
if (table != null) {
return table;
}
return delegate.of(ticketTable);
}

@Override
public Table of(NewTable newTable) {
return delegate.of(newTable);
}

@Override
public Table of(EmptyTable emptyTable) {
return delegate.of(emptyTable);
}

@Override
public Table of(TimeTable timeTable) {
return delegate.of(timeTable);
}

@Override
public Table of(InputTable inputTable) {
return delegate.of(inputTable);
}

@Override
public Table multiJoin(List<MultiJoinInput<Table>> multiJoinInputs) {
return delegate.multiJoin(multiJoinInputs);
}

@Override
public Table merge(Iterable<Table> tables) {
return delegate.merge(tables);
return super.of(ticketTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.util.type.ArrayTypeUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.*;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -67,13 +68,13 @@ private static <T> BinaryOperator<T> throwingMerger() {
};
}

private static <T, K, U> Collector<T, ?, Map<K, U>> toLinkedMap(
private static <T, K, U> Collector<T, ?, LinkedHashMap<K, U>> toLinkedMap(
Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper) {
return Collectors.toMap(keyMapper, valueMapper, throwingMerger(), LinkedHashMap::new);
}

private static final Collector<ColumnHolder<?>, ?, Map<String, ColumnSource<?>>> COLUMN_HOLDER_LINKEDMAP_COLLECTOR =
private static final Collector<ColumnHolder<?>, ?, LinkedHashMap<String, ColumnSource<?>>> COLUMN_HOLDER_LINKEDMAP_COLLECTOR =
toLinkedMap(ColumnHolder::getName, ColumnHolder::getColumnSource);

/////////// Utilities To Display Tables /////////////////
Expand Down Expand Up @@ -760,10 +761,16 @@ public static Table newTable(ColumnHolder<?>... columnHolders) {
}

public static Table newTable(TableDefinition definition, ColumnHolder<?>... columnHolders) {
return newTable(definition, null, columnHolders);
}

public static Table newTable(TableDefinition definition, @Nullable Map<String, Object> attributes,
ColumnHolder<?>... columnHolders) {
checkSizes(columnHolders);
WritableRowSet rowSet = getRowSet(columnHolders);
Map<String, ColumnSource<?>> columns = Arrays.stream(columnHolders).collect(COLUMN_HOLDER_LINKEDMAP_COLLECTOR);
return new QueryTable(definition, rowSet.toTracking(), columns) {
final WritableRowSet rowSet = getRowSet(columnHolders);
final LinkedHashMap<String, ColumnSource<?>> columns =
Arrays.stream(columnHolders).collect(COLUMN_HOLDER_LINKEDMAP_COLLECTOR);
return new QueryTable(definition, rowSet.toTracking(), columns, null, attributes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this isn't a new change, but consider a named subtype here? Or would it make sense to just chain .flatten() on the end, as the table is already non-refreshing with a flat rowset?

Or, just add a local so we don't need to use the instance initializer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated usages. I don't think subtype is desired, but I don't see any reason why it needs to be set in instance initializer.

{
setFlat();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.arrow.util.Collections2;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
Expand Down Expand Up @@ -736,6 +737,12 @@ private static ArrowType arrowTypeFor(Class<?> type) {
return Types.MinorType.FLOAT8.getType();
case Object:
if (type.isArray()) {
if (type == byte[].class) {
return Types.MinorType.VARBINARY.getType();
}
// if (type == char[].class) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
// return Types.MinorType.VARCHAR.getType();
// }
return Types.MinorType.LIST.getType();
}
if (type == LocalDate.class) {
Expand Down
14 changes: 14 additions & 0 deletions flightsql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# FlightSQL

## Client
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved

## JDBC

The default FlightSQL JDBC driver uses cookie authorization; by default, this is not enabled on the Deephaven server.
To enable this, the request header "x-deephaven-auth-cookie-request" must be set to "true".

Example JDBC connection string to self-signed TLS:

```
jdbc:arrow-flight-sql://localhost:8443/?Authorization=Anonymous&useEncryption=1&disableCertificateVerification=1&x-deephaven-auth-cookie-request=true
```
67 changes: 67 additions & 0 deletions flightsql/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
}

description = 'The Deephaven flight SQL library'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
description = 'The Deephaven flight SQL library'
description = 'The Deephaven Flight SQL library'

or possibly FlightSQL? not sure how they like to do that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flight SQL is how I see they refer to it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated lots of FlightSQL strings to Flight SQL.


sourceSets {
jdbcTest {
compileClasspath += sourceSets.test.output
runtimeClasspath += sourceSets.test.output
}
}

configurations {
jdbcTestImplementation.extendsFrom testImplementation
jdbcTestRuntimeOnly.extendsFrom testRuntimeOnly
jdbcTestAnnotationProcessor.extendsFrom testAnnotationProcessor
}

dependencies {
api project(':server')
implementation project(':sql')
implementation project(':engine-sql')
// :sql does not expose calcite as a dependency (maybe it should?); in the meantime, we want to make sure we can
// provide reasonable error messages to the client
implementation libs.calcite.core

implementation libs.dagger
implementation libs.arrow.flight.sql

// testImplementation project(':extensions-csv')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why commented out? worth a comment here discussing, or removing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meant to remove; was originally part of Jianfeng's testing, but no longer part of the new tests.

testImplementation project(':authorization')
testImplementation project(':server-test-utils')

testAnnotationProcessor libs.dagger.compiler
testImplementation libs.assertj
testImplementation platform(libs.junit.bom)
testImplementation libs.junit.jupiter
testRuntimeOnly libs.junit.platform.launcher
testRuntimeOnly libs.junit.vintage.engine
testRuntimeOnly project(':log-to-slf4j')
testRuntimeOnly libs.slf4j.simple

jdbcTestImplementation project(':server-jetty')
// Isolating to its own sourceSet / classpath because it breaks logging until we can upgrade to a newer version
// See https://github.com/apache/arrow/pull/40908
// See https://github.com/deephaven/deephaven-core/issues/5947
jdbcTestRuntimeOnly libs.arrow.flight.sql.jdbc
}

test {
useJUnitPlatform()
}

def jdbcTest = tasks.register('jdbcTest', Test) {
description = 'Runs JDBC tests.'
group = 'verification'

testClassesDirs = sourceSets.jdbcTest.output.classesDirs
classpath = sourceSets.jdbcTest.runtimeClasspath
shouldRunAfter test

useJUnitPlatform()
}

check.dependsOn jdbcTest
1 change: 1 addition & 0 deletions flightsql/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.server;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.io.logger.LogBuffer;
import io.deephaven.io.logger.LogBufferGlobal;
import io.deephaven.server.runner.GrpcServer;
import io.deephaven.server.runner.MainHelper;
import io.deephaven.util.SafeCloseable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

@Timeout(30)
public abstract class DeephavenServerTestBase {

public interface TestComponent {

GrpcServer server();

ExecutionContext executionContext();
}

protected TestComponent component;

private LogBuffer logBuffer;
private SafeCloseable executionContext;
private GrpcServer server;
protected int localPort;

protected abstract TestComponent component();

@BeforeAll
public static void setupOnce() throws IOException {
MainHelper.bootstrapProjectDirectories();
}

@BeforeEach
public void setup() throws IOException {
logBuffer = new LogBuffer(128);
LogBufferGlobal.setInstance(logBuffer);
component = component();
executionContext = component.executionContext().open();
server = component.server();
server.start();
localPort = server.getPort();
}

@AfterEach
void tearDown() throws InterruptedException {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
server.stopWithTimeout(10, TimeUnit.SECONDS);
server.join();
executionContext.close();
LogBufferGlobal.clear(logBuffer);
}
}
Loading
Loading