From 67756f49dd019b7be2d0ba494209ad0484db0dce Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Thu, 14 Sep 2023 17:02:29 +0800 Subject: [PATCH] fix code review --- .../flink/config/FlinkEnvConfiguration.scala | 7 +- .../factory/FlinkEngineConnFactory.scala | 42 +++++++--- .../factory/TestFlinkEngineConnFactory.scala | 82 +++++++++++++++++++ 3 files changed, 116 insertions(+), 15 deletions(-) create mode 100644 linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index 78db5c6ce2..aef2f7ffc8 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -44,6 +44,8 @@ object FlinkEnvConfiguration { FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar" ) + val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml") + val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "") val FLINK_PROVIDED_USER_LIB_PATH = @@ -60,6 +62,7 @@ object FlinkEnvConfiguration { "/appcom/Install/flink/lib", "The local lib path of each user in Flink EngineConn." ) + val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true) val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "") val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "") @@ -137,9 +140,5 @@ object FlinkEnvConfiguration { val FLINK_HANDSHAKE_WAIT_TIME_MILLS = CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) - val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars( - "wds.linkis.engineConn.javaOpts.default", - "-Xloggc:/gc.log -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" - ) } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index d38f69f7a9..3cf1ec6708 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -243,24 +243,44 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } protected def getExtractJavaOpts(envJavaOpts: String): String = { - // var defaultJavaOpts = FlinkEnvConfiguration.FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue var defaultJavaOpts = "" - val yamlFilePath = "/appcom/config/flink-config/flink-conf.yaml" - val source = Source.fromFile(yamlFilePath) - try { - val yamlContent = source.mkString - val yaml = new Yaml() - val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) - if (configMap.containsKey("env.java.opts")) { - defaultJavaOpts = configMap.get("env.java.opts").toString + val yamlFilePath = FLINK_CONF_DIR.getValue + val yamlFile = yamlFilePath + FLINK_CONF_YAML.getHotValue() + if (new File(yamlFile).exists()) { + val source = Source.fromFile(yamlFile) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey("env.java.opts")) { + defaultJavaOpts = configMap.get("env.java.opts").toString + } + } finally { + source.close() + } + } else { + val inputStream = getClass.getResourceAsStream(yamlFile) + if (inputStream != null) { + val source = Source.fromInputStream(inputStream) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey("env.java.opts")) { + defaultJavaOpts = configMap.get("env.java.opts").toString + } + } finally { + source.close() + } + } else { + throw new FileNotFoundException("YAML file not found in both file system and classpath.") } - } finally { - source.close() } val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) merged } + protected def mergeAndDeduplicate(str1: String, str2: String): String = { val patternX = """-XX:([^\s]+)=([^\s]+)""".r val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => diff --git a/linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala new file mode 100644 index 0000000000..6feaaafe30 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.factory + +import org.junit.jupiter.api.Test + +class TestFlinkEngineConnFactory { + + @Test + def testMergeAndDeduplicate: Unit = { + var defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b"; + var envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c"; + val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + } + + protected def mergeAndDeduplicate(str1: String, str2: String): String = { + val patternX = """-XX:([^\s]+)=([^\s]+)""".r + val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + }.toMap + + val patternD = """-D([^\s]+)=([^\s]+)""".r + val keyValueMapD = patternD.findAllMatchIn(str2).map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + }.toMap + val xloggcPattern = """-Xloggc:[^\s]+""".r + val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(str1).getOrElse("").toString + val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString + var escapedXloggcValue = "" + var replaceStr1 = "" + var replaceStr2 = "" + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { + escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") + replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = str2.replace(xloggcValueStr2, "") + } + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { + escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") + replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = str2 + } + if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { + replaceStr1 = str1 + replaceStr2 = str2 + } + val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + + val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ") + javaOpts + } + +}