Skip to content

Commit

Permalink
Merge pull request #220 from daniel-thom/apache-spark-clusters
Browse files Browse the repository at this point in the history
Add scripts to configure Apache Spark clusters
  • Loading branch information
yandthj authored Dec 14, 2022
2 parents d128e0f + ca73fbc commit fbaac09
Show file tree
Hide file tree
Showing 31 changed files with 1,598 additions and 0 deletions.
247 changes: 247 additions & 0 deletions applications/spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
# Running Apache Spark Clusters on an HPC

The scripts in this directory create ephemeral Apache Spark clusters on HPC compute nodes.

## Prerequisites
The scripts require the Spark software to be installed in a Singularity container. The
`docker` directory includes Dockerfiles that build images derived from the base Apache
Spark Python and R images. The files have instructions on how to convert the Docker images to
Singularity.

Existing Singularity containers on Eagle:
- Spark 3.3.1 and Python 3.9 is at `/datasets/images/apache_spark/spark_py39.sif`.
This image includes the packages `ipython`, `jupyter`, `numpy`, `pandas`, and `pyarrow`.
- Spark 3.3.1 and R 4.0.4 is at `/datasets/images/apache_spark/spark_r.sif`.
This image includes the packages `tidyverse`, `sparklyr`, `data.table`, `here`, `janitor`, and
`skimr`.

## Setup

1. Clone the repository:
```
$ git clone https://github.com/NREL/HPC.git
```
2. Change to a directory in `/scratch/$USER`.
3. Add the `spark_scripts` directory to your path.
```
$ export PATH=$PATH:<your-repo-path>/HPC/applications/spark/spark_scripts
```
4. Copy the `config` file and `conf` directory with the command below. Specify an alternate
destination directory with `-d <directory>`. This can be done on the login node or compute node.
```
$ create_config.sh -c <path-to-spark-container>
```

5. Edit the `config` file if necessary. Note that the rest of this page relies on the setting
`container_instance_name = spark`.
6. Consider what type of compute nodes to acquire. If you will be performing large shuffles
then you must get nodes with fast local storage. `bigmem` and `gpu` nodes have local SSDs that
can read/write at 2 GB/s. The standard nodes have spinning disks that can only read/write at
~130 MB/s. Your jobs will fail if you use those nodes. You can consider specifying a RAM disk
as Spark local storage (`/dev/shm`), but you must be sure you have enough space.
7. Decide how and when you want to configure your Spark application parameters.

- Manually specify global settings in `conf`. Note that worker settings in `conf/spark-env.sh`
must be set before starting the cluster.
- Auto-configure global settings with `configure_spark.sh`. You can run this script after
acquiring compute nodes and it will apply settings based on the hardware resources (memory/CPU)
of those nodes. Run `configure_spark.sh --help` to see available options.
- At runtime when you run `spark-submit` or `pyspark`. Refer to the CLI help.

Here are some parameters in the `conf` files to consider editing:

**log4j2.properties**:
- `rootLogger.level`: Spark is verbose when the log level is `info`. Change the level to
`warn` if desired.

**spark-env.sh**:
- `SPARK_LOG_DIR`: The Spark processes will log to this directory.
- `SPARK_LOCAL_DIRS`: Spark will write temporary files here. It must be fast. Set it to `/dev/shm`
if you want to use a RAM disk. Note that Eagle nodes allow use of half of system memory. Adjust
other parameters accordingly.
- `SPARK_WORKER_DIR`: The Spark worker processes will log to this directory
and use it for scratch space. It is configured to go to `/tmp/scratch` by default. Change it
or copy the files before relinquishing the nodes if you want to preserve the files. They can
be useful for debugging errors.

**spark-defaults.conf**:
- `spark.executor.cores`: Online recommendations say that there is minimal parallelization benefit
if the value is greater than 5. It should be configured in tandem with `spark.executor.memory`
so that you maxmize the number of executors on each worker node. 7 executors work well on Eagle
nodes (35 out of 36 available cores).
- `spark.executor.memory`: Adjust as necessary depending on the type of nodes you acquire. Make
it big enough for 7 executors after adjusting for overhead for OS and management processes.
- `spark.driver.memory`: Adjust as necessary depending on how much data you will pull from Spark
into your application.
- `spark.eventLog.dir` and `spark.history.fs.logDirectory`: These directories must exist and
will be used to store Spark history. If this is enabled, you can start a Spark history server
after your jobs finish and review all jobs in the Spark UI. Disable these and
`spark.eventLog.enabled` if you don't want to preserve the history.
- `spark.sql.execution.arrow.pyspark.enabled`: Set it to `true` if you will use Python and
convert Spark DataFrames to Pandas DataFrames.


## Usage
These instructions assume that you are running in a directory that contains the configuration
files and directories, and that you've added the `spark_scripts` directory to your `PATH`
environment variable.

The start script takes one or more SLURM job IDs as inputs. The script will detect the nodes and
start the container on each.

### Manual mode

1. Allocate nodes however you'd like (`salloc`, `sbatch`, `srun`).

**Note**: The best way to test this functionality is with an interactive session on bigmem nodes
in the debug partition.

Example command (2 nodes):
```
$ salloc -t 01:00:00 -N2 --account=<your-account> --partition=debug --mem=730G
```

2. Login to the first compute node if not already there.
3. Optional: Run `configure_spark.sh` to apply settings based on actual compute node resources.
The script will check for the environment variable `SLURM_JOB_ID`, which is set by `SLURM` when it
allocates a node to you. If you ssh'd into the compute node then that variable won't be set.
Choose the option below that is appropriate for your environment.
```
$ configure_spark.sh
```
```
$ configure_spark.sh <SLURM_JOB_ID>
```
```
$ configure_spark.sh <SLURM_JOB_ID1> <SLURM_JOB_ID2>
```
4. Start the Spark cluster. Similar to `configure_spark.sh`, this script will check for the
environment variable `SLURM_JOB_ID`.

Choose the option below that is appropriate for your environment.
```
$ start_spark_cluster.sh
```
```
$ start_spark_cluster.sh <SLURM_JOB_ID1>
```
```
$ start_spark_cluster.sh <SLURM_JOB_ID1> <SLURM_JOB_ID2>
```

5. Load the Singularity environment if you want to run with its software. You can also run in your
own environment as long as you have the same versions of Spark and Python or R.
```
$ module load singularity-container
```

6. If you run in your own environment and want to use the configuration settings created by the
scripts, set the environment variable `SPARK_CONF_DIR`.
```
$ export SPARK_CONF_DIR=$(pwd)/conf
```

7. Start a Spark process.

Refer to Python instructions [here](python.md).

Refer to R instructions [here](r.md).

### Batched execution
This directory includes sbatch script examples for each of the above execution types.

Refer to the scripts in the `slurm_scripts` directory.

### Mounts
The configuration scripts mount the following directories inside the container, and so you should
be able to load data files in any of them:
- `/lustre`
- `/projects`
- `/scratch`
- `/datasets`


## Debugging problems
Open the Spark web UI to observe what's happening with your jobs. You will have to forward ports
8080 and 4040 of the master node (first node in your SLURM allocation) through an ssh tunnel.

Open your browser to http://localhost:4040 after configuring the tunnel to access the application UI.

Before inspecting job details you may first want to confirm that the correct Spark configuration
settings are in effect by looking at the `Environment` tab.

If you enable the history server then you can open this UI after you relinquish the nodes. Here
is an example of how to start it:

```
$ singularity exec \
--env SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=./events" \
instance://spark \
start-history-server.sh
```

**Note**: Be sure to cleanly shutdown the cluster with `stop_spark_cluster.sh` if you intend
to look at the history.

## Performance monitoring
Tuning a Spark application and cluster can be difficult. It is advisable to monitor hardware
resource utilization on your compute nodes. You need to ensure that you are using all available
CPUs and are not bottle-necked by the storage.

Here is an example of how to run `htop` on multiple nodes simulataneously with `tmux`.

Download this script: https://raw.githubusercontent.com/johnko/ssh-multi/master/bin/ssh-multi

Run it like this:
```
$ ./ssh-multi node1 node2 nodeN
```
It will start `tmux` with one pane for each node and synchonize mode enabled. Typing in one pane
types everywhere.
```
$ htop
```
Ensure that you are using all CPUs.

You can monitor memory usage with
```
$ watch free -g
```

You can monitor disk usage with
```
$ iostat -t 1 -xm
```

Note that this directory includes a helper script to convert SLURM job IDs to node names.
Here is a shortcut for the above if you are on a node acquired with `salloc`:
```
$ ./ssh-multi $(./scripts/get-node-names $SLURM_JOB_ID)
```

### Automated performance monitoring
You may want to run performance monitoring for the duration of your job and inspect the results
later. This section describes one way to do that with a separate tool.

1. Configure a Python virtual environment.
2. Install `jade`.
```
$ pip install NREL-jade
```
This package includes a tool that collects resource utilization data. You can run it like this:
```
$ jade stats collect --interval=1 --output=my-stats
```
The tool generates a Parquet file for each resource type as well as HTML plots.

3. Configure your `sbatch` script to run this tool on each node. Refer to the scripts in
`slurm_scripts_with_resource_monitoring`. The output directories will contain HTML plots for
each comupte node.


## Resources
- Spark cluster overview: https://spark.apache.org/docs/latest/cluster-overview.html
- Spark Python APIs: https://spark.apache.org/docs/latest/api/python/getting_started/index.html
- Spark tuning guide from Apache: https://spark.apache.org/docs/latest/tuning.html
- Spark tuning guide from Amazon: https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
- Performance recommendations: https://www.youtube.com/watch?v=daXEp4HmS-E&t=4251s
70 changes: 70 additions & 0 deletions applications/spark/conf/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
rootLogger.level = info
rootLogger.appenderRef.stdout.ref = console

# In the pattern layout configuration below, we specify an explicit `%ex` conversion
# pattern for logging Throwables. If this was omitted, then (by default) Log4J would
# implicitly add an `%xEx` conversion pattern which logs stacktraces with additional
# class packaging information. That extra information can sometimes add a substantial
# performance overhead, so we disable it in our default logging config.
# For more information, see SPARK-39361.
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn

logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
logger.thriftserver.level = warn

# Settings to quiet third party logs that are too verbose
logger.jetty1.name = org.sparkproject.jetty
logger.jetty1.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.replexprTyper.level = warn
logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.replSparkILoopInterpreter.level = warn
logger.parquet1.name = org.apache.parquet
logger.parquet1.level = error
logger.parquet2.name = parquet
logger.parquet2.level = error

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.RetryingHMSHandler.level = fatal
logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.FunctionRegistry.level = error

# For deploying Spark ThriftServer
# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
appender.console.filter.1.type = RegexFilter
appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
appender.console.filter.1.onMatch = deny
appender.console.filter.1.onMismatch = neutral

54 changes: 54 additions & 0 deletions applications/spark/conf/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
# spark.driver.memory 5g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

# This needs to be as large as how much data you will pull from Spark into your application,
# such as if you convert a Spark DataFrame to pandas DataFrame.
spark.driver.memory 2g
# This should be <= spark.driver.memory
spark.driver.maxResultSize 2g
spark.sql.execution.arrow.pyspark.enabled = false

# Many online sources say to
# 1. Use no more than 5 cores for one executor.
# 2. Eagle nodes have 36 cores and can fit 7 executors.
# The total memory needs to >= num_executors * spark.executor.memory
spark.executor.cores 5
# Based on standard Eagle nodes with 92G memory.
spark.executor.memory 11G
# spark.sql.shuffle.partitions 200

# These settings can help even in standalone mode in some circumstances.
# If you observe huge numbers of tasks getting created or more executors
# starting than are helpful, try them out.
# Refer to Spark documentation.
#spark.dynamicAllocation.enabled true
#spark.dynamicAllocation.shuffleTracking.enabled true
#spark.shuffle.service.enabled true
#spark.shuffle.service.db.enabled = true
# spark.worker.cleanup.enabled = true

Loading

0 comments on commit fbaac09

Please sign in to comment.