Skip to content

Commit

Permalink
[FLINK-37222] Do not reuse views across TableEnvironments in SQL client
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Feb 3, 2025
1 parent 02e37a6 commit def3091
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.apache.flink.table.gateway.service.context;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;

import java.util.Optional;

/**
* An in-memory catalog that can be reused across different {@link TableEnvironment}. The SQL client
* works against {@link TableEnvironment} design and reuses some of the components (e.g.
* CatalogManager), but not all (e.g. Planner) which causes e.g. views registered in an in-memory
* catalog to fail. This class is a workaround not to keep Planner bound parts of a view reused
* across different {@link TableEnvironment}.
*/
public class EnvironmentReusableInMemoryCatalog extends GenericInMemoryCatalog {
public EnvironmentReusableInMemoryCatalog(String name, String defaultDatabase) {
super(name, defaultDatabase);
}

@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
CatalogBaseTable tableToRegister =
extractView(table)
.flatMap(QueryOperationCatalogView::getOriginalView)
.map(v -> (CatalogBaseTable) v)
.orElse(table);
super.createTable(tablePath, tableToRegister, ignoreIfExists);
}

private Optional<QueryOperationCatalogView> extractView(CatalogBaseTable table) {
if (table instanceof ResolvedCatalogView) {
final CatalogView origin = ((ResolvedCatalogView) table).getOrigin();
if (origin instanceof QueryOperationCatalogView) {
return Optional.of((QueryOperationCatalogView) origin);
}
return Optional.empty();
} else if (table instanceof QueryOperationCatalogView) {
return Optional.of((QueryOperationCatalogView) table);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogStoreHolder;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableFactoryUtil;
Expand Down Expand Up @@ -446,7 +445,7 @@ private static CatalogManager buildCatalogManager(
catalogStore.config(),
catalogStore.classLoader()))
.orElse(
new GenericInMemoryCatalog(
new EnvironmentReusableInMemoryCatalog(
defaultCatalogName, settings.getBuiltInDatabaseName()));
}
defaultCatalog.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,9 @@ public String getExpandedQuery() {
public boolean supportsShowCreateView() {
return originalView != null;
}

@Internal
public Optional<CatalogView> getOriginalView() {
return Optional.ofNullable(originalView);
}
}

0 comments on commit def3091

Please sign in to comment.