Skip to content

Commit

Permalink
[FLINK-14579][sql cli] enable SQL CLI to configure modules via yaml c…
Browse files Browse the repository at this point in the history
…onfig

enable SQL CLI to configure modules via yaml config.

This closes apache#10093.
  • Loading branch information
bowenli86 committed Nov 8, 2019
1 parent 2125ea2 commit 501f640
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 0 deletions.
9 changes: 9 additions & 0 deletions flink-table/flink-sql-client/conf/sql-client-defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ catalogs: [] # empty list
# hive-conf-dir: /opt/hive_conf/
# default-database: ...

#==============================================================================
# Modules
#==============================================================================

# Define modules here.

#modules: # note the following modules will be of the order they are specified
# - name: core
# type: core

#==============================================================================
# Execution properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.client.config.entries.DeploymentEntry;
import org.apache.flink.table.client.config.entries.ExecutionEntry;
import org.apache.flink.table.client.config.entries.FunctionEntry;
import org.apache.flink.table.client.config.entries.ModuleEntry;
import org.apache.flink.table.client.config.entries.TableEntry;
import org.apache.flink.table.client.config.entries.ViewEntry;

Expand Down Expand Up @@ -53,6 +54,8 @@ public class Environment {

public static final String DEPLOYMENT_ENTRY = "deployment";

private Map<String, ModuleEntry> modules;

private Map<String, CatalogEntry> catalogs;

private Map<String, TableEntry> tables;
Expand All @@ -66,6 +69,7 @@ public class Environment {
private DeploymentEntry deployment;

public Environment() {
this.modules = Collections.emptyMap();
this.catalogs = Collections.emptyMap();
this.tables = Collections.emptyMap();
this.functions = Collections.emptyMap();
Expand All @@ -74,6 +78,24 @@ public Environment() {
this.deployment = DeploymentEntry.DEFAULT_INSTANCE;
}

public Map<String, ModuleEntry> getModules() {
return modules;
}

public void setModules(List<Map<String, Object>> modules) {
this.modules = new HashMap<>(modules.size());

modules.forEach(config -> {
final ModuleEntry entry = ModuleEntry.create(config);
if (this.modules.containsKey(entry.getName())) {
throw new SqlClientException(
String.format("Cannot register module '%s' because a module with this name is already registered.",
entry.getName()));
}
this.modules.put(entry.getName(), entry);
});
}

public Map<String, CatalogEntry> getCatalogs() {
return catalogs;
}
Expand Down Expand Up @@ -153,6 +175,11 @@ public DeploymentEntry getDeployment() {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("===================== Modules =====================\n");
modules.forEach((name, module) -> {
sb.append("- ").append(ModuleEntry.MODULE_NAME).append(": ").append(name).append("\n");
module.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n'));
});
sb.append("===================== Catalogs =====================\n");
catalogs.forEach((name, catalog) -> {
sb.append("- ").append(CatalogEntry.CATALOG_NAME).append(": ").append(name).append("\n");
Expand Down Expand Up @@ -207,6 +234,11 @@ public static Environment parse(String content) throws IOException {
public static Environment merge(Environment env1, Environment env2) {
final Environment mergedEnv = new Environment();

// merge modules
final Map<String, ModuleEntry> modules = new HashMap<>(env1.getModules());
modules.putAll(env2.getModules());
mergedEnv.modules = modules;

// merge catalogs
final Map<String, CatalogEntry> catalogs = new HashMap<>(env1.getCatalogs());
catalogs.putAll(env2.getCatalogs());
Expand Down Expand Up @@ -243,6 +275,8 @@ public static Environment enrich(
Map<String, ViewEntry> views) {
final Environment enrichedEnv = new Environment();

enrichedEnv.modules = new LinkedHashMap<>(env.getModules());

// merge catalogs
enrichedEnv.catalogs = new LinkedHashMap<>(env.getCatalogs());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.client.config.entries;

import org.apache.flink.table.client.config.ConfigUtil;
import org.apache.flink.table.descriptors.DescriptorProperties;

import java.util.Collections;
import java.util.Map;

import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;

/**
* Describes a module configuration entry.
*/
public class ModuleEntry extends ConfigEntry {

public static final String MODULE_NAME = "name";

private final String name;

protected ModuleEntry(String name, DescriptorProperties properties) {
super(properties);
this.name = name;
}

public String getName() {
return name;
}

@Override
protected void validate(DescriptorProperties properties) {
properties.validateString(MODULE_TYPE, false, 1);

// further validation is performed by the discovered factory
}

public static ModuleEntry create(Map<String, Object> config) {
return create(ConfigUtil.normalizeYaml(config));
}

private static ModuleEntry create(DescriptorProperties properties) {
properties.validateString(MODULE_NAME, false, 1);

final String name = properties.getString(MODULE_NAME);

final DescriptorProperties cleanedProperties =
properties.withoutKeys(Collections.singletonList(MODULE_NAME));

return new ModuleEntry(name, cleanedProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.descriptors.CoreModuleDescriptorValidator;
import org.apache.flink.table.factories.BatchTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
Expand All @@ -81,6 +83,7 @@
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.sinks.TableSink;
Expand Down Expand Up @@ -113,6 +116,7 @@ public class ExecutionContext<ClusterID> {
private final Environment mergedEnv;
private final List<URL> dependencies;
private final ClassLoader classLoader;
private final Map<String, Module> modules;
private final Map<String, Catalog> catalogs;
private final Map<String, TableSource<?>> tableSources;
private final Map<String, TableSink<?>> tableSinks;
Expand Down Expand Up @@ -141,6 +145,12 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo
dependencies.toArray(new URL[dependencies.size()]),
this.getClass().getClassLoader());

// create modules
modules = new LinkedHashMap<>();
mergedEnv.getModules().forEach((name, entry) ->
modules.put(name, createModule(entry.asMap(), classLoader))
);

// create catalogs
catalogs = new LinkedHashMap<>();
mergedEnv.getCatalogs().forEach((name, entry) ->
Expand Down Expand Up @@ -262,6 +272,12 @@ private static ExecutionConfigAccessor createExecutionParameterProvider(CommandL
}
}

private Module createModule(Map<String, String> moduleProperties, ClassLoader classLoader) {
final ModuleFactory factory =
TableFactoryService.find(ModuleFactory.class, moduleProperties, classLoader);
return factory.createModule(moduleProperties);
}

private Catalog createCatalog(String name, Map<String, String> catalogProperties, ClassLoader classLoader) {
final CatalogFactory factory =
TableFactoryService.find(CatalogFactory.class, catalogProperties, classLoader);
Expand Down Expand Up @@ -382,6 +398,13 @@ private EnvironmentInstance() {
mergedEnv.getConfiguration().asMap().forEach((k, v) ->
tableEnv.getConfig().getConfiguration().setString(k, v));

// load modules
if (!modules.isEmpty()) {
// unload core module first to respect whatever users configure
tableEnv.unloadModule(CoreModuleDescriptorValidator.MODULE_TYPE_CORE);
modules.forEach(tableEnv::loadModule);
}

// register catalogs
catalogs.forEach(tableEnv::registerCatalog);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.types.DataType;

import org.junit.Test;
Expand All @@ -61,6 +63,7 @@

import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand All @@ -73,6 +76,7 @@ public class DependencyTest {
public static final String TEST_PROPERTY = "test-property";

public static final String CATALOG_TYPE_TEST = "DependencyTest";
public static final String MODULE_TYPE_TEST = "ModuleDependencyTest";

private static final String FACTORY_ENVIRONMENT_FILE = "test-sql-client-factory.yaml";
private static final String TABLE_FACTORY_JAR_FILE = "table-factories-test-jar.jar";
Expand Down Expand Up @@ -129,6 +133,38 @@ public TestTableSinkFactory() {
}
}

/**
* Module that can be discovered if classloading is correct.
*/
public static class TestModuleFactory implements ModuleFactory {

@Override
public Module createModule(Map<String, String> properties) {
return new TestModule();
}

@Override
public Map<String, String> requiredContext() {
final Map<String, String> context = new HashMap<>();
context.put(MODULE_TYPE, MODULE_TYPE_TEST);
return context;
}

@Override
public List<String> supportedProperties() {
final List<String> properties = new ArrayList<>();
properties.add("test");
return properties;
}
}

/**
* Test module.
*/
public static class TestModule implements Module {

}

/**
* Catalog that can be discovered if classloading is correct.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.util.Set;

import static org.apache.flink.table.client.config.entries.CatalogEntry.CATALOG_NAME;
import static org.apache.flink.table.client.config.entries.ModuleEntry.MODULE_NAME;
import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -97,6 +99,16 @@ public void testDuplicateCatalog() {
createCatalog("catalog2", "test")));
}

@Test
public void testDuplicateModules() {
exception.expect(SqlClientException.class);
Environment env = new Environment();
env.setModules(Arrays.asList(
createModule("module1", "test"),
createModule("module2", "test"),
createModule("module2", "test")));
}

private static Map<String, Object> createCatalog(String name, String type) {
Map<String, Object> prop = new HashMap<>();

Expand All @@ -105,4 +117,13 @@ private static Map<String, Object> createCatalog(String name, String type) {

return prop;
}

private static Map<String, Object> createModule(String name, String type) {
Map<String, Object> prop = new HashMap<>();

prop.put(MODULE_NAME, name);
prop.put(MODULE_TYPE, type);

return prop;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
public class ExecutionContextTest {

private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
private static final String MODULES_ENVIRONMENT_FILE = "test-sql-client-modules.yaml";
private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml";
private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-client-streaming.yaml";
private static final String CONFIGURATION_ENVIRONMENT_FILE = "test-sql-client-configuration.yaml";
Expand All @@ -80,6 +81,23 @@ public void testExecutionConfig() throws Exception {
assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
}

@Test
public void testModules() throws Exception {
final ExecutionContext<?> context = createModuleExecutionContext();
final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment();

Set<String> allModules = new HashSet<>(Arrays.asList(tableEnv.listModules()));
assertEquals(2, allModules.size());
assertEquals(
new HashSet<>(
Arrays.asList(
"core",
"mymodule")
),
allModules
);
}

@Test
public void testCatalogs() throws Exception {
final String inmemoryCatalog = "inmemorycatalog";
Expand Down Expand Up @@ -294,6 +312,16 @@ private <T> ExecutionContext<T> createDefaultExecutionContext() throws Exception
return createExecutionContext(DEFAULTS_ENVIRONMENT_FILE, replaceVars);
}

private <T> ExecutionContext<T> createModuleExecutionContext() throws Exception {
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_PLANNER", "old");
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "changelog");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
return createExecutionContext(MODULES_ENVIRONMENT_FILE, replaceVars);
}

private <T> ExecutionContext<T> createCatalogExecutionContext() throws Exception {
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_PLANNER", "old");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory
org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory
org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory
org.apache.flink.table.client.gateway.local.DependencyTest$TestHiveCatalogFactory
org.apache.flink.table.client.gateway.local.DependencyTest$TestModuleFactory
Loading

0 comments on commit 501f640

Please sign in to comment.