Skip to content

Commit

Permalink
[Improve] submit flink job default file.encoding parameter improvement (
Browse files Browse the repository at this point in the history
apache#3838)

* [Improve]  flink on yarn-per-job mode bug fixed apache#3761
* [Improve] FlinkClientTrait improvement
* [Improve] SubmitRequest minor improvement
* [Improve] submit flink job file.encoding parameter improvement
  • Loading branch information
wolfboys authored Jul 6, 2024
1 parent ad0a7aa commit 7a511d5
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum FlinkDevelopmentMode {

/** Py flink Mode */
PYFLINK("Python Flink", 3);

private final String name;

private final Integer mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ case class SubmitRequest(

def hasProp(key: String): Boolean = properties.containsKey(key)

def getProp(key: String): Any = properties.get(key)

private[this] def getParameterMap(prefix: String = ""): Map[String, String] = {
if (this.appConf == null) {
return Map.empty[String, String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ trait FlinkClientTrait extends Logger {
// prepare flink config
val flinkConfig = prepareConfig(submitRequest)

// set JVMOptions..
setJvmOptions(submitRequest, flinkConfig)

setConfig(submitRequest, flinkConfig)

Try(doSubmit(submitRequest, flinkConfig)) match {
Expand Down Expand Up @@ -135,7 +132,7 @@ trait FlinkClientTrait extends Logger {
}
}

// set common parameter
// 1) set common parameter
flinkConfig
.safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
.safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
Expand All @@ -152,7 +149,7 @@ trait FlinkClientTrait extends Logger {
flinkConfig.safeSet(retainedOption, flinkDefaultConfiguration.get(retainedOption))
}

// set savepoint parameter
// 2) set savepoint parameter
if (StringUtils.isNotBlank(submitRequest.savePoint)) {
flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
flinkConfig.setBoolean(
Expand All @@ -166,30 +163,28 @@ trait FlinkClientTrait extends Logger {
}
}

flinkConfig
}

private[this] def setJvmOptions(
submitRequest: SubmitRequest,
flinkConfig: Configuration): Unit = {
// 4) set env.xx.opts parameter
if (MapUtils.isNotEmpty(submitRequest.properties)) {
submitRequest.properties.foreach(
x => {
val k = x._1.trim
val v = x._2.toString
if (k == CoreOptions.FLINK_JVM_OPTIONS.key()) {
flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, v)
} else if (k == CoreOptions.FLINK_JM_JVM_OPTIONS.key()) {
flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, v)
} else if (k == CoreOptions.FLINK_HS_JVM_OPTIONS.key()) {
flinkConfig.set(CoreOptions.FLINK_HS_JVM_OPTIONS, v)
} else if (k == CoreOptions.FLINK_TM_JVM_OPTIONS.key()) {
flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, v)
} else if (k == CoreOptions.FLINK_CLI_JVM_OPTIONS.key()) {
flinkConfig.set(CoreOptions.FLINK_CLI_JVM_OPTIONS, v)
}
})
// file.encoding...
if (submitRequest.hasProp(CoreOptions.FLINK_JVM_OPTIONS.key())) {
val jvmOpt = submitRequest.getProp(CoreOptions.FLINK_JVM_OPTIONS.key()).toString
if (!jvmOpt.contains("-Dfile.encoding=")) {
// set default file.encoding
val opt = s"-Dfile.encoding=UTF-8 $jvmOpt"
submitRequest.properties.put(CoreOptions.FLINK_JVM_OPTIONS.key(), opt)
}
}

submitRequest.properties
.filter(_._1.startsWith("env."))
.foreach(
x => {
logInfo(s"env opts: ${x._1}: ${x._2}")
flinkConfig.setString(x._1, x._2.toString)
})
}

flinkConfig
}

def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
Expand Down Expand Up @@ -358,7 +353,7 @@ trait FlinkClientTrait extends Logger {

private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
if (submitRequest.hasProp(KEY_FLINK_PARALLELISM())) {
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
Integer.valueOf(submitRequest.getProp(KEY_FLINK_PARALLELISM()).toString)
} else {
getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
.getInteger(CoreOptions.DEFAULT_PARALLELISM, CoreOptions.DEFAULT_PARALLELISM.defaultValue())
Expand Down Expand Up @@ -415,8 +410,8 @@ trait FlinkClientTrait extends Logger {
if (MapUtils.isNotEmpty(submitRequest.properties)) {
submitRequest.properties.foreach {
key =>
if (!key._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
logInfo(s"submit application dynamicProperties: ${key._1} :${key._2}")
if (!key._1.startsWith("env.")) {
logInfo(s"application dynamicProperties: ${key._1} :${key._2}")
array += s"-D${key._1}=${key._2}"
}
}
Expand Down

0 comments on commit 7a511d5

Please sign in to comment.