diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index ff0349452314c..e443bf186c757 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -53,7 +53,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -427,41 +426,32 @@ public static Module createModule( MODULE_TYPE.key(), options.get(MODULE_TYPE.key()))); } + final DefaultModuleContext discoveryContext = + new DefaultModuleContext(options, configuration, classLoader); try { - final Map optionsWithType = new HashMap<>(options); - optionsWithType.put(MODULE_TYPE.key(), moduleName); - - final ModuleFactory legacyFactory = - TableFactoryService.find(ModuleFactory.class, optionsWithType, classLoader); - return legacyFactory.createModule(optionsWithType); - } catch (NoMatchingTableFactoryException e) { - final DefaultModuleContext discoveryContext = + final ModuleFactory factory = + discoverFactory( + ((ModuleFactory.Context) discoveryContext).getClassLoader(), + ModuleFactory.class, + moduleName); + + final DefaultModuleContext context = new DefaultModuleContext(options, configuration, classLoader); - try { - final ModuleFactory factory = - discoverFactory( - ((ModuleFactory.Context) discoveryContext).getClassLoader(), - ModuleFactory.class, - moduleName); - - final DefaultModuleContext context = - new DefaultModuleContext(options, configuration, classLoader); - return factory.createModule(context); - } catch (Throwable t) { - throw new ValidationException( - String.format( - "Unable to create module '%s'.%n%nModule options are:%n%s", - moduleName, - options.entrySet().stream() - .map( - optionEntry -> - stringifyOption( - optionEntry.getKey(), - optionEntry.getValue())) - .sorted() - .collect(Collectors.joining("\n"))), - t); - } + return factory.createModule(context); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Unable to create module '%s'.%n%nModule options are:%n%s", + moduleName, + options.entrySet().stream() + .map( + optionEntry -> + stringifyOption( + optionEntry.getKey(), + optionEntry.getValue())) + .sorted() + .collect(Collectors.joining("\n"))), + t); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ModuleFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ModuleFactory.java index 6a668bd70413a..796003c28391f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ModuleFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ModuleFactory.java @@ -25,7 +25,6 @@ import org.apache.flink.table.module.Module; import org.apache.flink.table.module.ModuleException; -import java.util.List; import java.util.Map; import java.util.Set; @@ -38,20 +37,7 @@ * instead. */ @PublicEvolving -public interface ModuleFactory extends TableFactory, Factory { - - /** - * Creates and configures a {@link Module} using the given properties. - * - * @param properties normalized properties describing a module. - * @return the configured module. - * @deprecated Use {@link #createModule(Context)} instead and implement {@link Factory} instead - * of {@link TableFactory}. - */ - @Deprecated - default Module createModule(Map properties) { - throw new ModuleException("Module factories must implement createModule()."); - } +public interface ModuleFactory extends Factory { /** Creates and configures a {@link Module}. */ default Module createModule(Context context) { @@ -79,49 +65,9 @@ interface Context { ClassLoader getClassLoader(); } - default String factoryIdentifier() { - if (requiredContext() == null || supportedProperties() == null) { - throw new ModuleException("Module factories must implement factoryIdentifier()"); - } - - return null; - } - - default Set> requiredOptions() { - if (requiredContext() == null || supportedProperties() == null) { - throw new ModuleException("Module factories must implement requiredOptions()"); - } - - return null; - } + String factoryIdentifier(); - default Set> optionalOptions() { - if (requiredContext() == null || supportedProperties() == null) { - throw new ModuleException("Module factories must implement optionalOptions()"); - } + Set> requiredOptions(); - return null; - } - - // -------------------------------------------------------------------------------------------- - // Default implementations for legacy {@link TableFactory} stack. - // -------------------------------------------------------------------------------------------- - - /** - * @deprecated Implement the {@link Factory} based stack instead. - */ - @Deprecated - default Map requiredContext() { - // Default implementation for modules implementing the new {@link Factory} stack instead. - return null; - } - - /** - * @deprecated Implement the {@link Factory} based stack instead. - */ - @Deprecated - default List supportedProperties() { - // Default implementation for modules implementing the new {@link Factory} stack instead. - return null; - } + Set> optionalOptions(); } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/LegacyDummyModuleFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/LegacyDummyModuleFactory.java deleted file mode 100644 index 26664f5566cab..0000000000000 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/LegacyDummyModuleFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.factories.module; - -import org.apache.flink.table.factories.ModuleFactory; -import org.apache.flink.table.module.CommonModuleOptions; -import org.apache.flink.table.module.Module; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -/** Test implementation of a {@link ModuleFactory} using the legacy stack. */ -@Deprecated -public class LegacyDummyModuleFactory implements ModuleFactory { - - public static final String IDENTIFIER = "LegacyModule"; - - @Override - public Map requiredContext() { - return Collections.singletonMap(CommonModuleOptions.MODULE_TYPE.key(), IDENTIFIER); - } - - @Override - public List supportedProperties() { - return Collections.emptyList(); - } - - @Override - public Module createModule(Map options) { - return new Module() {}; - } -} diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.legacy.factories.TableFactory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.legacy.factories.TableFactory index 8664f58ca68f4..109471382f5c2 100644 --- a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.legacy.factories.TableFactory +++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.legacy.factories.TableFactory @@ -14,4 +14,3 @@ # limitations under the License. org.apache.flink.table.factories.TestTableSinkFactory -org.apache.flink.table.factories.module.LegacyDummyModuleFactory diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 2700188b53219..225af8444a871 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -1631,12 +1631,6 @@ class TableEnvironmentTest { validateShowModules(("core", false)) } - @Test - def testLegacyModule(): Unit = { - tableEnv.executeSql("LOAD MODULE LegacyModule") - validateShowModules(("core", true), ("LegacyModule", true)) - } - @Test def testExecuteSqlWithCreateDropView(): Unit = { createTableForTests()