Skip to content

Commit

Permalink
[flink] Support options filter in FlinkGenericCatalogFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangyuf committed Jul 12, 2024
1 parent d94137c commit 82f6701
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;

import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
Expand All @@ -32,6 +33,7 @@
import java.util.Set;

import static org.apache.paimon.flink.FlinkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.hive.HiveCatalog.HIVE_PREFIX;

/** Factory for {@link FlinkGenericCatalog}. */
public class FlinkGenericCatalogFactory implements CatalogFactory {
Expand All @@ -56,14 +58,24 @@ public Set<org.apache.flink.configuration.ConfigOption<?>> optionalOptions() {
@Override
public FlinkGenericCatalog createCatalog(Context context) {
CatalogFactory hiveFactory = createHiveCatalogFactory(context.getClassLoader());
Catalog catalog = hiveFactory.createCatalog(context);
Context flinkContext = createFlinkContext(context);
Catalog flinkCatalog = hiveFactory.createCatalog(flinkContext);
return createCatalog(
context.getClassLoader(), context.getOptions(), context.getName(), catalog);
context.getClassLoader(), context.getOptions(), context.getName(), flinkCatalog);
}

@VisibleForTesting
public Context createFlinkContext(Context context) {
return new FactoryUtil.DefaultCatalogContext(
context.getName(),
OptionsUtils.convertToPropertiesPrefixKey(context.getOptions(), HIVE_PREFIX),
context.getConfiguration(),
context.getClassLoader());
}

@VisibleForTesting
public static FlinkGenericCatalog createCatalog(
ClassLoader cl, Map<String, String> optionMap, String name, Catalog flinkCatalog) {
ClassLoader cl, Map<String, String> optionMap, String name, Catalog catalog) {
Options options = Options.fromMap(optionMap);
options.set(CatalogOptions.METASTORE, "hive");
FlinkCatalog paimon =
Expand All @@ -75,7 +87,7 @@ public static FlinkGenericCatalog createCatalog(
cl,
options);

return new FlinkGenericCatalog(paimon, flinkCatalog);
return new FlinkGenericCatalog(paimon, catalog);
}

private static CatalogFactory createHiveCatalogFactory(ClassLoader cl) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link FlinkGenericCatalogFactory}. */
public class FlinkGenericCatalogFactoryTest {

private final FlinkGenericCatalogFactory genericCatalogFactory =
new FlinkGenericCatalogFactory();

@TempDir public static java.nio.file.Path temporaryFolder;

@Test
public void testHiveCatalogOptionsFilter() {
String path1 = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
String path2 = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();

String catalogName = "test-catalog";
Map<String, String> options = new HashMap<>();
options.put("warehouse", path1);
options.put("hive.hive-conf-dir", path2);
CatalogFactory.Context context =
new FactoryUtil.DefaultCatalogContext(
catalogName,
options,
null,
FlinkGenericCatalogFactoryTest.class.getClassLoader());

CatalogFactory.Context flinkContext = genericCatalogFactory.createFlinkContext(context);
Map<String, String> flinkOptions = flinkContext.getOptions();
assertThat(flinkOptions.get("hive-conf-dir")).isEqualTo(options.get("hive.hive-conf-dir"));
assertThat(flinkOptions.get("warehouse")).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class HiveCatalog extends AbstractCatalog {
private static final String SERDE_CLASS_NAME = "org.apache.paimon.hive.PaimonSerDe";
private static final String STORAGE_HANDLER_CLASS_NAME =
"org.apache.paimon.hive.PaimonStorageHandler";
private static final String HIVE_PREFIX = "hive.";
public static final String HIVE_PREFIX = "hive.";
public static final String HIVE_SITE_FILE = "hive-site.xml";

private final HiveConf hiveConf;
Expand Down

0 comments on commit 82f6701

Please sign in to comment.