diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java index c22e00fa122..06107b86248 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java @@ -24,11 +24,13 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.Factory; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.flink.connector.PartitionConverter; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.gravitino.flink.connector.catalog.BaseCatalog; +import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.FlinkTableFactory; /** @@ -40,12 +42,13 @@ public class GravitinoPaimonCatalog extends BaseCatalog { private final AbstractCatalog paimonCatalog; protected GravitinoPaimonCatalog( - String catalogName, - AbstractCatalog paimonCatalog, + CatalogFactory.Context context, + String defaultDatabase, PropertiesConverter propertiesConverter, PartitionConverter partitionConverter) { - super(catalogName, paimonCatalog.getDefaultDatabase(), propertiesConverter, partitionConverter); - this.paimonCatalog = paimonCatalog; + super(context.getName(), defaultDatabase, propertiesConverter, partitionConverter); + FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory(); + this.paimonCatalog = flinkCatalogFactory.createCatalog(context); } @Override diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java index 52489fc667f..8732ade23ed 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java @@ -23,12 +23,12 @@ import java.util.Set; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.gravitino.flink.connector.DefaultPartitionConverter; import org.apache.gravitino.flink.connector.PartitionConverter; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; -import org.apache.paimon.flink.FlinkCatalog; -import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.gravitino.flink.connector.utils.FactoryUtils; /** * Factory for creating instances of {@link GravitinoPaimonCatalog}. It will be created by SPI @@ -38,9 +38,12 @@ public class GravitinoPaimonCatalogFactory implements BaseCatalogFactory { @Override public Catalog createCatalog(Context context) { - FlinkCatalog catalog = new FlinkCatalogFactory().createCatalog(context); + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtils.createCatalogFactoryHelper(this, context); + String defaultDatabase = + helper.getOptions().get(GravitinoPaimonCatalogFactoryOptions.DEFAULT_DATABASE); return new GravitinoPaimonCatalog( - context.getName(), catalog, propertiesConverter(), partitionConverter()); + context, defaultDatabase, propertiesConverter(), partitionConverter()); } @Override diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java index dd78f96d24b..a4180b9eb40 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java @@ -19,8 +19,17 @@ package org.apache.gravitino.flink.connector.paimon; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.paimon.flink.FlinkCatalogOptions; + public class GravitinoPaimonCatalogFactoryOptions { /** Identifier for the {@link GravitinoPaimonCatalog}. */ public static final String IDENTIFIER = "gravitino-paimon"; + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(FlinkCatalogOptions.DEFAULT_DATABASE.key()) + .stringType() + .defaultValue(FlinkCatalogOptions.DEFAULT_DATABASE.defaultValue()); }