Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev 1.1.14 webank streamis fink load yaml #311

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ object FlinkEnvConfiguration {
FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar"
)

val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml")

val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "")

val FLINK_PROVIDED_USER_LIB_PATH =
Expand All @@ -61,6 +63,7 @@ object FlinkEnvConfiguration {
"The local lib path of each user in Flink EngineConn."
)

val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true)
val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "")
val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "")

Expand Down Expand Up @@ -137,4 +140,7 @@ object FlinkEnvConfiguration {
val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)

val FLINK_ENV_JAVA_OPTS =
CommonVars("flink.env.java.opts", "env.java.opts")

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.linkis.engineconnplugin.flink.factory

import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.utils.{ClassUtils, Logging}
import org.apache.linkis.common.utils.{ClassUtils, Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.launch.EngineConnServer
Expand All @@ -32,35 +31,32 @@ import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, Fli
import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
import org.apache.linkis.engineconnplugin.flink.exception.FlinkInitFailedException
import org.apache.linkis.engineconnplugin.flink.setting.Settings
import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil}
import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, FlinkValueFormatUtil, ManagerUtil}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import org.apache.linkis.manager.engineplugin.common.creation.{
ExecutorFactory,
MultiExecutorEngineConnFactory
}
import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory}
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine._
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
import org.apache.linkis.protocol.utils.TaskUtils

import org.apache.commons.lang3.StringUtils
import org.apache.flink.configuration._
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.table.api.Expressions.e
import org.apache.flink.table.api.e
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}

import java.io.File
import java.io.{File, FileNotFoundException}
import java.net.URL
import java.text.MessageFormat
import java.time.Duration
import java.util
import java.util.{Collections, Locale}

import scala.collection.JavaConverters._

import scala.io.Source
import com.google.common.collect.{Lists, Sets}
import org.yaml.snakeyaml.Yaml

class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging {

Expand Down Expand Up @@ -174,7 +170,15 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots)
// set extra configs
options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach {
case (key, value) => flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value)
case (key, value) =>
var flinkConfigValue = value
if (
FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key
.equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)
) {
flinkConfigValue = getExtractJavaOpts(value)
}
flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue)
}
// set kerberos config
if (FLINK_KERBEROS_ENABLE.getValue(options)) {
Expand Down Expand Up @@ -232,6 +236,46 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
context
}

private def getExtractJavaOpts(envJavaOpts: String): String = {
var defaultJavaOpts = ""
val yamlFilePath = FLINK_CONF_DIR.getValue
val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
if (new File(yamlFile).exists()) {
val source = Source.fromFile(yamlFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
val inputStream = getClass.getResourceAsStream(yamlFile)
if (inputStream != null) {
val source = Source.fromInputStream(inputStream)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
throw new FileNotFoundException("YAML file not found in both file system and classpath.")
}
}
val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
merged
}



protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = {
val engineConnModeLabel = getEngineConnModeLabel(labels)
engineConnModeLabel != null && (EngineConnMode.toEngineConnMode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta
}
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel

import java.io.{File, FileInputStream}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.yaml.snakeyaml.Yaml

class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,55 @@ object FlinkValueFormatUtil {
case _ => null
}

def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = {
val patternX = """-XX:([^\s]+)=([^\s]+)""".r
val keyValueMapX = patternX.findAllMatchIn(envJavaOpts).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap

val patternD = """-D([^\s]+)=([^\s]+)""".r
val keyValueMapD = patternD.findAllMatchIn(envJavaOpts).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap
val xloggcPattern = """-Xloggc:[^\s]+""".r
val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString
val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString
var escapedXloggcValue = ""
var replaceStr1 = ""
var replaceStr2 = ""
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) {
escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>")
replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "")
}
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) {
escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>")
replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = envJavaOpts
}
if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) {
replaceStr1 = defaultJavaOpts
replaceStr2 = envJavaOpts
}
val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) =>
val (key, value) = entry
val oldValue = s"$key=[^\\s]+"
val newValue = key + "=" + value
result.replaceAll(oldValue, newValue)
}

val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) =>
val (key, value) = entry
val oldValue = s"$key=[^\\s]+"
val newValue = key + "=" + value
result.replaceAll(oldValue, newValue)
}
val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ")
javaOpts
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconnplugin.flink.factory

import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration.{FLINK_CONF_DIR, FLINK_CONF_YAML, FLINK_ENV_JAVA_OPTS}
import org.apache.linkis.engineconnplugin.flink.util.FlinkValueFormatUtil
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.yaml.snakeyaml.Yaml

import java.io.{File, FileNotFoundException}
import java.util
import scala.io.Source

class TestFlinkEngineConnFactory {

@Test
private def getExtractJavaOpts(envJavaOpts: String): String = {
var defaultJavaOpts = ""
val yamlFilePath = FLINK_CONF_DIR.getValue
val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
if (new File(yamlFile).exists()) {
val source = Source.fromFile(yamlFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
val inputStream = getClass.getResourceAsStream(yamlFile)
if (inputStream != null) {
val source = Source.fromInputStream(inputStream)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
throw new FileNotFoundException("YAML file not found in both file system and classpath.")
}
}
val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
merged
}

@Test
def testMergeAndDeduplicate: Unit = {
val defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b";
val envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c";
val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
assertEquals("-Da=1 -Db=4 -XXc=5 -Dk=a1=c -DjobName=0607_1 -Dlog4j.configuration=./log4j.properties", merged)
}
}