Skip to content

Commit

Permalink
4
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Dec 31, 2024
1 parent 482e4bf commit 275e0f4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,22 +478,15 @@ public FlightInfo getFlightInfoTables(final CommandGetTables request, final Call
public void getStreamTables(final CommandGetTables command, final CallContext context,
final ServerStreamListener listener) {
try {
// TODO
// command.getDbSchemaFilterPattern();
// command.getTableNameFilterPattern();
// command.getTableTypesList();
// context.isCancelled();

ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext);
final boolean includeSchema = command.getIncludeSchema();
final Schema schema = includeSchema ? Schemas.GET_TABLES_SCHEMA
FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext, command);
final Schema schema = command.getIncludeSchema() ? Schemas.GET_TABLES_SCHEMA
: Schemas.GET_TABLES_SCHEMA_NO_SCHEMA;

try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) {
listener.start(vectorSchemaRoot);
vectorSchemaRoot.allocateNew();
flightSqlSchemaHelper.getTablesSchema(vectorSchemaRoot, includeSchema);
flightSqlSchemaHelper.getTablesSchema(vectorSchemaRoot);
listener.putNext();
listener.completed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.doris.thrift.TListTableStatusResult;
import org.apache.doris.thrift.TTableStatus;

import com.google.common.base.Preconditions;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.flight.sql.FlightSqlColumnMetadata;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand Down Expand Up @@ -67,11 +69,41 @@ public class FlightSqlSchemaHelper {
private static final Logger LOG = LogManager.getLogger(FlightSqlSchemaHelper.class);
private final ConnectContext ctx;
private final FrontendServiceImpl impl;
private final boolean includeSchema;
private String dbSchemaFilterPattern;
private String tableNameFilterPattern;
private List<String> tableTypesList;
private static final byte[] EMPTY_SERIALIZED_SCHEMA = getSerializedSchema(Collections.emptyList());

public FlightSqlSchemaHelper(ConnectContext context) {
public FlightSqlSchemaHelper(ConnectContext context, CommandGetTables command) {
ctx = context;
impl = new FrontendServiceImpl(ExecuteEnv.getInstance());
includeSchema = command.getIncludeSchema();
setParameterForGetTables(command);
}

private void setParameterForGetTables(CommandGetTables command) {
if (command.hasDbSchemaFilterPattern()) {
dbSchemaFilterPattern = command.getDbSchemaFilterPattern();
} else {
dbSchemaFilterPattern = "";
}
if (command.hasTableNameFilterPattern()) {
if (command.getTableNameFilterPattern().contains(".")) {
Preconditions.checkState(command.getTableNameFilterPattern().split("\\.", -1).length == 2);
dbSchemaFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[0];
tableNameFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[1];
} else {
tableNameFilterPattern = command.getTableNameFilterPattern();
}
} else {
tableNameFilterPattern = "";
}
if (!command.getTableTypesList().isEmpty()) {
tableTypesList = command.getTableTypesList();
} else {
tableTypesList = null;
}
}

/**
Expand All @@ -83,6 +115,9 @@ private TGetDbsResult getDbNames() throws TException {
// Otherwise, if the configured ExternalCatalog cannot be connected,
// `catalog.getAllDbs()` will be stuck and wait until the timeout period ends.
getDbsParams.setCatalog("internal");
if (!dbSchemaFilterPattern.isEmpty()) {
getDbsParams.setPattern(dbSchemaFilterPattern);
}
getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
return impl.getDbNames(getDbsParams);
}
Expand All @@ -96,6 +131,9 @@ private TListTableStatusResult listTableStatus(String dbName, String catalogName
if (!catalogName.isEmpty()) {
getTablesParams.setCatalog(catalogName);
}
if (!tableNameFilterPattern.isEmpty()) {
getTablesParams.setPattern(tableNameFilterPattern);
}
getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
return impl.listTableStatus(getTablesParams);
}
Expand Down Expand Up @@ -271,7 +309,7 @@ protected static byte[] getSerializedSchema(List<Field> fields) {
}

// for FlightSqlProducer Schemas.GET_TABLES_SCHEMA_NO_SCHEMA and Schemas.GET_TABLES_SCHEMA
public void getTablesSchema(VectorSchemaRoot vectorSchemaRoot, boolean includeSchema) throws TException {
public void getTablesSchema(VectorSchemaRoot vectorSchemaRoot) throws TException {
VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name");
VarCharVector schemaNameVector = (VarCharVector) vectorSchemaRoot.getVector("db_schema_name");
VarCharVector tableNameVector = (VarCharVector) vectorSchemaRoot.getVector("table_name");
Expand Down Expand Up @@ -310,7 +348,7 @@ public void getTablesSchema(VectorSchemaRoot vectorSchemaRoot, boolean includeSc
// the metadata will show two layers of `Databases - Tables`.
//
// TODO, show two layers of original data `Databases - Tables` in DBeaver.
tableNameVector.setSafe(tablesCount, new Text(dbName + ":" + tableStatus.getName()));
tableNameVector.setSafe(tablesCount, new Text(dbName + "." + tableStatus.getName()));
tableTypeVector.setSafe(tablesCount, new Text(tableStatus.getType()));
if (includeSchema) {
List<Field> fields = tableToFields.get(tableStatus.getName());
Expand Down

0 comments on commit 275e0f4

Please sign in to comment.