Skip to content
Chaz Chandler edited this page Mar 4, 2015 · 8 revisions

Welcome to the spark-perf wiki. This page lists several useful scripts, helper functions, and analysis tools for running spark-perf tests

Running tests

Automatically testing against multiple Spark versions

In config.py:

import os
SPARK_COMMIT_ID = os.environ["SPARK_COMMIT_ID"]

To run against multiple commits, use a shell script to repeatedly call bin/run with different environment variables:

#!/usr/bin/env bash

FWDIR="$(cd "`dirname "$0"`"; pwd)"

if [ "$#" -ne 4 ]; then
  echo "Test every Nth commit between two commits"
  echo "Usage: oldest_commit newest_commit N config_file"
  exit 1
fi

oldestCommit="$1"
newestCommit="$2"
NR="$3"
configFile="$4"

# Find the SHAs of every NRth commit between two git commits:
cd "$FWDIR/spark-build-cache/master"
git fetch
versions=($(git log --oneline "$oldestCommit..$newestCommit" | awk "NR == 0 || NR % $NR == 0" | cut -d ' ' -f1))
versions+=("$oldestCommit")
versions+=("$newestCommit")
cd "$FWDIR"

echo "Going to test against ${#versions[@]} commits using config file $configFile"
echo "Commits (every ${NR}th commit between $newestCommit and $oldestCommit):  ${versions[@]}"
read -p "Confirm? " -n 1 -r
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]
then
   echo "Exiting!"
   exit 1
fi

for version in ${versions[@]}
do
  echo "Testing against commit $version"
  export SPARK_COMMIT_ID="$version"
  ./bin/run --config $configFile
done

Analyzing results

Uploading logs from spark-ec2 clusters to S3

Upgrade to a newer version of the aws tool and configure AWS credentials:

sudo easy_install --upgrade awscli
aws configure
  # AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
  # AWS Secret Access Key [None]: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
  # Default region name [None]:
  # Default output format [None]:

Sync the results folder to an S3 bucket:

aws s3 cp --recursive /local/path/to/results/directory s3://bucket-name/resultsdir/

Analyzing JSON-formatted test results using Spark SQL

Many test suites report their results as JSON objects that contain information on the test environment and configuration, such as the Spark version / commit SHA, the contents of SparkConf, environment variables, etc. Using Spark SQL, we can create a SchemaRDD from these JSON objects in order to analyze test results.

When running spark-perf, it will produce a results directory that contains one subdirectory per run of bin/run. Each subdirectory contains several log files; we're interested in the *.out files that contain test results. First, list the paths of those files:

  • In Databricks Cloud:

    Note: this step is only relevant to Databricks employees; the rest of this tutorial should work in any environment, though:

    import dbutils.fs
    val resultsDir = "/mount/path/in/dbfs"
    fs.mount("s3n://AWSKEY:AWSSECRETKEY@bucket-name/bucket-filder/", resultsDir)
    val logFiles: Seq[String] = for (
      logDir: String <- fs.ls(resultsDir).filter(_.name.endsWith("_logs/")).map(d => s"$resultsDir/${d.name}");
      logFiles: Seq[String] = fs.ls(logDir).map(f => s"$logDir/${f.name}").filter(_.endsWith(".out"));
      logFile: String <- logFiles
    ) yield logFile
  • On a local filesystem:

    import java.io.File
    
    val resultsDir = "/mount/path/in/filesystem"
    val logFiles: Seq[String] = for (
      logDir: String <- new File(resultsDir).list.filter(_.endsWith("_logs")).map(d => s"$resultsDir/${d}");
      logFiles: Seq[String] = new File(logDir).list.map(f => s"$logDir/${f}").filter(_.endsWith(".out"));
      logFile: String <- logFiles
    ) yield logFile

Next, create an RDD that loads these files, extracts the results lines (which start with "result: "), and grabs the JSON objects (which are written on a single line):

import org.apache.spark.rdd.RDD
val allLogLines: RDD[String] = sc.union(logFiles.map(f => sc.textFile(f)))
val allResultJsons = allLogLines.filter(_.startsWith("results:")).map(_.stripPrefix("results: "))

Using SparkSQL, create a SchemaRDD from these logs. In the JSON output that spark-perf writes, some properties have names that contain dots (such as Java system properties like spark.shuffle.manager). Spark SQL does not currently support column names with dots (see SPARK-2775), so we'll need to post-process the inferred schema to convert dots into underscores:

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.applySchema
import org.apache.spark.sql._

/** Replaces . in column names with _ (underscore) */
def cleanSchema(dataType: DataType): DataType = dataType match {
  case StructType(fields) =>
    StructType(
      fields.map(f =>
        f.copy(name = f.name.replaceAllLiterally(".", "_"), dataType = cleanSchema(f.dataType))))
  case ArrayType(elemType, nullable) => ArrayType(cleanSchema(elemType), nullable)
  case other => other
}

/** Replaces . in column names with _ */
def cleanSchema(schemaRDD: SchemaRDD): SchemaRDD = {
  applySchema(schemaRDD, cleanSchema(schemaRDD.schema).asInstanceOf[StructType])
}

val resultsRdd = cleanSchema(sqlContext.jsonRDD(allResultJsons))
resultsRdd.registerTempTable("results")

Now, we have a SQL table with a schema that looks something like this:

root
 |-- options: struct (nullable = true)
 |    |-- closure-size: string (nullable = true)
 |    |-- inter-trial-wait: string (nullable = true)
 |    |-- key-length: string (nullable = true)
 |    |-- num-jobs: string (nullable = true)
 |    |-- num-partitions: string (nullable = true)
 |    |-- num-records: string (nullable = true)
      [...]
 |-- results: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- time: double (nullable = true)
 |-- sparkConf: struct (nullable = true)
 |    |-- spark_app_id: string (nullable = true)
 |    |-- spark_app_name: string (nullable = true)
 |    |-- spark_driver_host: string (nullable = true)
 |    |-- spark_driver_memory: string (nullable = true)
 |    |-- spark_driver_port: string (nullable = true)
      [...]
 |-- sparkVersion: string (nullable = true)
 |-- systemProperties: struct (nullable = true)
 |    |-- SPARK_SUBMIT: string (nullable = true)
 |    |-- awt_toolkit: string (nullable = true)
 |    |-- file_encoding: string (nullable = true)
      [...]
 |-- testName: string (nullable = true)

In this schema, results is an array of per-test-run times (or other outcome metrics); the rest of the schema describes the configuration used by those runs. To analyze these results using SQL, we can use Hive's lateral view explode to un-nest this data and produce one row per test run. For example

SELECT result.time, results.systemProperties.spark_shuffle_blockTransferService as bts, testName, options, sparkVersion, results.systemProperties.sparkperf_commitSHA as commit
FROM results LATERAL VIEW EXPLODE(results) r as result
WHERE testName = "count";