Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
## Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain)

Not filed in upstream, touches code for conda.

## What changes were proposed in this pull request?

rLibDir contains a sequence of possible paths for the SparkR package on the executor and is passed on to the R daemon with the SPARKR_RLIBDIR environment variable. This PR filters rLibDir for paths that exist before setting SPARKR_RLIBDIR, retaining existing functionality to preferentially choose a YARN or local SparkR install over conda if both are present.

See daemon.R: https://github.com/palantir/spark/blob/master/R/pkg/inst/worker/daemon.R#L23

Fixes apache-spark-on-k8s#456 

## How was this patch tested?

Manually testing cherry picked on older version

Please review http://spark.apache.org/contributing.html before opening a pull request.
  • Loading branch information
jeremyjliu authored and Robert Kruszewski committed Jan 6, 2019
1 parent 0d3f5cb commit 15345cb
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/api/r/RRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,14 @@ private[r] object RRunner {
val rConnectionTimeout = sparkConf.getInt(
"spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
val rOptions = "--vanilla"
val rLibDir = condaEnv.map { conda =>
RUtils.sparkRPackagePath(isDriver = false) :+ (conda.condaEnvDir + "/lib/R/library")
}.getOrElse(RUtils.sparkRPackagePath(isDriver = false))
val rExecScript = RUtils.sparkRInstallLocation(rLibDir, "/SparkR/worker/" + script)
val rLibDir = condaEnv.map(conda =>
RUtils.sparkRPackagePath(isDriver = false) :+ (conda.condaEnvDir + "/lib/R/library"))
.getOrElse(RUtils.sparkRPackagePath(isDriver = false))
.filter(dir => new File(dir).exists)
if (rLibDir.isEmpty) {
throw new SparkException("SparkR package is not installed on executor.")
}
val rExecScript = RUtils.getSparkRScript(rLibDir, "/SparkR/worker/" + script)
val pb = new ProcessBuilder(Arrays.asList(rCommand, rOptions, rExecScript))
// Activate the conda environment by setting the right env variables if applicable.
condaEnv.map(_.activatedEnvironment()).map(_.asJava).foreach(pb.environment().putAll)
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/api/r/RUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ private[spark] object RUtils {
}
}

/** Finds the rLibDir with SparkR installed on it. */
def sparkRInstallLocation(rLibDir: Seq[String], scriptPath: String): String = {
rLibDir.find(dir => new File(dir + scriptPath).exists)
.getOrElse(throw new SparkException("SparkR package not installed on executor.")) + scriptPath
/** Finds a script in a sequence of possible SparkR installation directories. */
def getSparkRScript(rLibDir: Seq[String], scriptPath: String): String = {
rLibDir.find(dir => new File(dir + scriptPath).exists).getOrElse(
throw new SparkException(
s"Script $scriptPath not found in any SparkR installation directory.")
) + scriptPath
}

/** Check if R is installed before running tests that use R commands. */
Expand Down

0 comments on commit 15345cb

Please sign in to comment.