Skip to content

Commit

Permalink
[Feature][Zeta] Configuration files support user variable replacement (
Browse files Browse the repository at this point in the history
  • Loading branch information
FuYouJ authored and wunan1210 committed Jul 5, 2023
1 parent 36784d4 commit 7305585
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 0 deletions.
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
- [Zeta] Fix task `notifyTaskStatusToMaster` failed when job not running or failed before run (#4847)
- [Zeta] Fix cpu load problem (#4828)
- [zeta] Fix the deadlock issue with JDBC driver loading (#4878)
- [zeta] dynamically replace the value of the variable at runtime (#4950)

### E2E

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

@EqualsAndHashCode(callSuper = true)
@Data
Expand Down Expand Up @@ -102,6 +103,7 @@ public class ClientCommandArgs extends AbstractCommandArgs {
@Override
public Command<?> buildCommand() {
Common.setDeployMode(getDeployMode());
userParamsToSysEnv();
if (checkConfig) {
return new SeaTunnelConfValidateCommand(this);
}
Expand All @@ -114,6 +116,16 @@ public Command<?> buildCommand() {
return new ClientExecuteCommand(this);
}

private void userParamsToSysEnv() {
if (!this.variables.isEmpty()) {
variables.stream()
.filter(Objects::nonNull)
.map(variable -> variable.split("=", 2))
.filter(pair -> pair.length == 2)
.forEach(pair -> System.setProperty(pair[0], pair[1]));
}
}

public DeployMode getDeployMode() {
return DeployMode.CLIENT;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.core.starter.seatunnel.args;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;

import org.apache.seatunnel.core.starter.utils.CommandLineUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.List;

public class ClientCommandArgsTest {
@Test
public void testUserDefinedParamsCommand() throws URISyntaxException {
int fakeParallelism = 16;
String username = "seatunnel=2.3.1";
String password = "dsjr42=4wfskahdsd=w1chh";
String fakeSourceTable = "fake";
String fakeSinkTable = "sink";
String[] args = {
"-c",
"/args/user_defined_params.conf",
"-e",
"local",
"-i",
"fake_source_table=" + fakeSourceTable,
"-i",
"fake_parallelism=" + fakeParallelism,
"-i",
"fake_sink_table=" + fakeSinkTable,
"-i",
"password=" + password,
"-i",
"username=" + username
};
ClientCommandArgs clientCommandArgs =
CommandLineUtils.parse(args, new ClientCommandArgs(), "seatunnel-zeta", true);
clientCommandArgs.buildCommand();
URL resource = ClientCommandArgsTest.class.getResource("/args/user_defined_params.conf");

Config config =
ConfigFactory.parseFile(Paths.get(resource.toURI()).toFile())
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
.resolveWith(
ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
List<? extends ConfigObject> sourceConfigs = config.getObjectList("source");
for (ConfigObject configObject : sourceConfigs) {
Config sourceConfig = configObject.toConfig();

String tableName = sourceConfig.getString("result_table_name");
Assertions.assertEquals(tableName, fakeSourceTable);

int parallelism = Integer.parseInt(sourceConfig.getString("parallelism"));
Assertions.assertEquals(fakeParallelism, parallelism);

Assertions.assertEquals(sourceConfig.getString("username"), username);
Assertions.assertEquals(sourceConfig.getString("password"), password);
}
List<? extends ConfigObject> sinkConfigs = config.getObjectList("sink");
for (ConfigObject sinkObject : sinkConfigs) {
Config sinkConfig = sinkObject.toConfig();
String tableName = sinkConfig.getString("result_table_name");
Assertions.assertEquals(tableName, fakeSinkTable);

Assertions.assertEquals(sinkConfig.getString("username"), username);
Assertions.assertEquals(sinkConfig.getString("password"), password);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = ${fake_source_table}
parallelism = ${fake_parallelism}
username = ${username}
password = ${password}
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

sink {
console {
result_table_name = ${fake_sink_table}
username = ${username}
password = ${password}
}
}

0 comments on commit 7305585

Please sign in to comment.