From c8e645734b0cee93627741c24d3e9d8ff16a4e11 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 14 May 2024 09:54:57 +0800 Subject: [PATCH] [KYUUBI #6344] FlinkProcessBuilder prioritizes user configurations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes #6344 `FlinkProcessBuilder` specifies `yarn.ship-files`, `yarn.application.name` and `yarn.tags` configurations of kyuubi platform. Sometimes we also need to customize these configurations, so we should prioritize these user configurations. ## Describe Your Solution ๐Ÿ”ง FlinkProcessBuilder prioritizes user configurations. ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests added new unit test --- # Checklist ๐Ÿ“ - [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6342 from wForget/hotfix2. Closes #6344 feca972ca [wforget] address comment 17df0844d [wforget] fix test and add flink constant ece91cc0c [wforget] FlinkProcessBuilder prioritizes user configurations Authored-by: wforget <643348094@qq.com> Signed-off-by: wforget <643348094@qq.com> --- .../engine/KyuubiApplicationManager.scala | 5 ++- .../engine/flink/FlinkProcessBuilder.scala | 17 ++++++++-- .../flink/FlinkProcessBuilderSuite.scala | 31 +++++++++++++++++++ 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index 0e914987c36..f2887b3e976 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -115,7 +115,10 @@ object KyuubiApplicationManager { } private def setupFlinkYarnTag(tag: String, conf: KyuubiConf): Unit = { - val originalTag = conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY).map(_ + ",").getOrElse("") + val originalTag = conf + .getOption(s"${FlinkProcessBuilder.FLINK_CONF_PREFIX}.${FlinkProcessBuilder.YARN_TAG_KEY}") + .orElse(conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY)) + .map(_ + ",").getOrElse("") val newTag = s"${originalTag}KYUUBI" + Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("") conf.set(FlinkProcessBuilder.YARN_TAG_KEY, newTag) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index 4ae714deefa..241b7ec7872 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -115,10 +115,15 @@ class FlinkProcessBuilder( flinkExtraJars += s"$hiveConfFile" } + val customFlinkConf = conf.getAllWithPrefix(FLINK_CONF_PREFIX, "") + // add custom yarn.ship-files + flinkExtraJars ++= customFlinkConf.get(YARN_SHIP_FILES_KEY) + val yarnAppName = customFlinkConf.get(YARN_APPLICATION_NAME_KEY) + .orElse(conf.getOption(APP_KEY)) buffer += "-t" buffer += "yarn-application" buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}" - buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}" + buffer += s"-Dyarn.application.name=${yarnAppName.get}" buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}" buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=." @@ -126,8 +131,10 @@ class FlinkProcessBuilder( buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=." } - val customFlinkConf = conf.getAllWithPrefix("flink", "") - customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) => + customFlinkConf.filter { case (k, _) => + !Seq("app.name", YARN_SHIP_FILES_KEY, YARN_APPLICATION_NAME_KEY, YARN_TAG_KEY) + .contains(k) + }.foreach { case (k, v) => buffer += s"-D$k=$v" } @@ -213,8 +220,12 @@ class FlinkProcessBuilder( object FlinkProcessBuilder { final val FLINK_EXEC_FILE = "flink" + final val FLINK_CONF_PREFIX = "flink" final val APP_KEY = "flink.app.name" final val YARN_TAG_KEY = "yarn.tags" + final val YARN_SHIP_FILES_KEY = "yarn.ship-files" + final val YARN_APPLICATION_NAME_KEY = "yarn.application.name" + final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH" final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER" } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala index 39fee116399..952f71c087b 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala @@ -167,4 +167,35 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { } matchActualAndExpectedApplicationMode(builder) } + + test("user configuration takes priority") { + val customShipFiles = "testFile1.jar;testFile2.jar" + val customAppName = "testAppName" + val customYarnTags = "testTag1,testTag2" + val builderConf = applicationModeConf + builderConf.set("flink.yarn.ship-files", customShipFiles) + builderConf.set("flink.yarn.application.name", customAppName) + builderConf.set("flink.yarn.tags", customYarnTags) + val builder = new FlinkProcessBuilder("test", true, builderConf) { + override def env: Map[String, String] = envWithAllHadoop + } + val actualCommands = builder.toString + // scalastyle:off line.size.limit + val expectedCommands = + escapePaths( + s"""${builder.flinkExecutable} run-application \\\\ + |\\t-t yarn-application \\\\ + |\\t-Dyarn.ship-files=.*flink-sql-client.*jar;.*flink-sql-gateway.*jar;$tempUdfJar;.*hive-site.xml;$customShipFiles \\\\ + |\\t-Dyarn.application.name=$customAppName \\\\ + |\\t-Dyarn.tags=$customYarnTags,KYUUBI \\\\ + |\\t-Dcontainerized.master.env.FLINK_CONF_DIR=. \\\\ + |\\t-Dcontainerized.master.env.HIVE_CONF_DIR=. \\\\ + |\\t-Dexecution.target=yarn-application \\\\ + |\\t-c org.apache.kyuubi.engine.flink.FlinkSQLEngine .*kyuubi-flink-sql-engine_.*jar""".stripMargin + + "(?: \\\\\\n\\t--conf \\S+=\\S+)+") + // scalastyle:on line.size.limit + val regex = new Regex(expectedCommands) + val matcher = regex.pattern.matcher(actualCommands) + assert(matcher.matches()) + } }