diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java index d07779297545..83d85e748a35 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java @@ -23,11 +23,14 @@ import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -56,14 +59,34 @@ public Set> optionalOptions() { @Override public FlinkGenericCatalog createCatalog(Context context) { CatalogFactory hiveFactory = createHiveCatalogFactory(context.getClassLoader()); - Catalog catalog = hiveFactory.createCatalog(context); + Context flinkContext = createFlinkContext(context, hiveFactory); + Catalog catalog = hiveFactory.createCatalog(flinkContext); return createCatalog( context.getClassLoader(), context.getOptions(), context.getName(), catalog); } + @VisibleForTesting + public Context createFlinkContext(Context context, CatalogFactory catalogFactory) { + Set> catalogOptions = new HashSet<>(catalogFactory.requiredOptions()); + catalogOptions.addAll(catalogFactory.optionalOptions()); + Map contextOptions = context.getOptions(); + Map flinkCatalogOptions = new HashMap<>(); + catalogOptions.forEach( + option -> { + if (contextOptions.containsKey(option.key())) { + flinkCatalogOptions.put(option.key(), contextOptions.get(option.key())); + } + }); + return new FactoryUtil.DefaultCatalogContext( + context.getName(), + flinkCatalogOptions, + context.getConfiguration(), + context.getClassLoader()); + } + @VisibleForTesting public static FlinkGenericCatalog createCatalog( - ClassLoader cl, Map optionMap, String name, Catalog flinkCatalog) { + ClassLoader cl, Map optionMap, String name, Catalog catalog) { Options options = Options.fromMap(optionMap); options.set(CatalogOptions.METASTORE, "hive"); FlinkCatalog paimon = @@ -75,7 +98,7 @@ public static FlinkGenericCatalog createCatalog( cl, options); - return new FlinkGenericCatalog(paimon, flinkCatalog); + return new FlinkGenericCatalog(paimon, catalog); } private static CatalogFactory createHiveCatalogFactory(ClassLoader cl) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkGenericCatalogFactoryTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkGenericCatalogFactoryTest.java new file mode 100644 index 000000000000..75d3a62f40dc --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkGenericCatalogFactoryTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.TestCatalogFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FlinkGenericCatalogFactory}. */ +public class FlinkGenericCatalogFactoryTest { + + private final FlinkGenericCatalogFactory genericCatalogFactory = + new FlinkGenericCatalogFactory(); + + @TempDir public static java.nio.file.Path temporaryFolder; + + @Test + public void testGenericCatalogOptionsFilter() { + String path1 = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + String path2 = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + + TestCatalogFactory testCatalogFactory = new TestCatalogFactory(); + String catalogName = "test-catalog"; + Map options = new HashMap<>(); + options.put("warehouse", path1); + options.put(TestCatalogFactory.DEFAULT_DATABASE.key(), path2); + CatalogFactory.Context context = + new FactoryUtil.DefaultCatalogContext( + catalogName, + options, + null, + FlinkGenericCatalogFactoryTest.class.getClassLoader()); + + CatalogFactory.Context flinkContext = + genericCatalogFactory.createFlinkContext(context, testCatalogFactory); + + Map flinkOptions = flinkContext.getOptions(); + assertThat(flinkOptions.get(TestCatalogFactory.DEFAULT_DATABASE.key())) + .isEqualTo(options.get(TestCatalogFactory.DEFAULT_DATABASE.key())); + assertThat(flinkOptions.get("warehouse")).isNull(); + } +}