Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add block sizes to benchmarks and use an array-backed buffer for uploading to S3. #80

Merged
merged 2 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 51 additions & 69 deletions examples/run_benchmarks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,57 +9,19 @@ set -euo pipefail
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
cd "${SCRIPT_DIR}/"

./terasort/build.sh
./sql/build.sh
./terasort/build.sh

REPEAT=${REPEAT:-20}
export CHECKSUM_ENABLED=${CHECKSUM_ENABLED:-false}
export USE_FALLBACK_FETCH=false
export USE_S3_SHUFFLE=0
export USE_NFS_SHUFFLE=0

# TeraSort experiments
export INSTANCES=4
TERASORT_SIZES=(
1g
10g
100g
BLOCK_SIZES=(
32
128
)
for size in "${TERASORT_SIZES[@]}";
do
for ((i = 0 ; i < ${REPEAT} ; i++));
do
export SIZE=$size
export USE_FALLBACK_FETCH=false

export USE_S3_SHUFFLE=0
export USE_NFS_SHUFFLE=0
./terasort/run.sh || true
mc rm -r --force zac/zrlio-tmp

export USE_S3_SHUFFLE=0
export USE_NFS_SHUFFLE=1
./terasort/run.sh || true
mc rm -r --force zac/zrlio-tmp

export USE_S3_SHUFFLE=1
export USE_NFS_SHUFFLE=0
./terasort/run.sh || true
mc rm -r --force zac/zrlio-tmp

# # Enable fallback fetch
# export USE_FALLBACK_FETCH=true

# export USE_S3_SHUFFLE=0
# export USE_NFS_SHUFFLE=1
# ./terasort/run.sh || true
# mc rm -r --force zac/zrlio-tmp

# export USE_S3_SHUFFLE=1
# export USE_NFS_SHUFFLE=0
# ./terasort/run.sh || true
# mc rm -r --force zac/zrlio-tmp
done
done

# SQL experiments
export INSTANCES=12
Expand All @@ -71,32 +33,52 @@ SQL_QUERIES=(
q67 # 66 GB shuffle data
)

for ((i = 0 ; i < ${REPEAT} ; i++));
do
for query in "${SQL_QUERIES[@]}"; do
export USE_FALLBACK_FETCH=false

export USE_S3_SHUFFLE=0
export USE_NFS_SHUFFLE=0
./sql/run_single_query.sh $query || true

export USE_S3_SHUFFLE=0
export USE_NFS_SHUFFLE=1
./sql/run_single_query.sh $query || true

export USE_S3_SHUFFLE=1
export USE_NFS_SHUFFLE=0
./sql/run_single_query.sh $query || true

# # Enable fallback fetch.
# export USE_FALLBACK_FETCH=true
for ((i = 0 ; i < ${REPEAT} ; i++)); do
for blockSize in "${BLOCK_SIZES[@]}"; do
export BLOCK_SIZE=${blockSize}
for query in "${SQL_QUERIES[@]}"; do
export USE_S3_SHUFFLE=0
export USE_NFS_SHUFFLE=0
./sql/run_single_query.sh $query || true

export USE_S3_SHUFFLE=0
export USE_NFS_SHUFFLE=1
./sql/run_single_query.sh $query || true

export USE_S3_SHUFFLE=1
export USE_NFS_SHUFFLE=0
./sql/run_single_query.sh $query || true
done
done
done

# export USE_S3_SHUFFLE=0
# export USE_NFS_SHUFFLE=1
# ./sql/run_single_query.sh $query || true
# TeraSort experiments
export INSTANCES=4
TERASORT_SIZES=(
1g
10g
100g
)

# export USE_S3_SHUFFLE=1
# export USE_NFS_SHUFFLE=0
# ./sql/run_single_query.sh $query || true
for size in "${TERASORT_SIZES[@]}"; do
for ((i = 0 ; i < ${REPEAT} ; i++)); do
for blockSize in "${BLOCK_SIZES[@]}"; do
export BLOCK_SIZE=${blockSize}
export SIZE=$size
export USE_FALLBACK_FETCH=false
export USE_TRANSFER_TO=1

export USE_S3_SHUFFLE=0
export USE_NFS_SHUFFLE=0
./terasort/run.sh || true

export USE_S3_SHUFFLE=0
export USE_NFS_SHUFFLE=1
./terasort/run.sh || true

export USE_S3_SHUFFLE=1
export USE_NFS_SHUFFLE=0
./terasort/run.sh || true
done
done
done
done
33 changes: 24 additions & 9 deletions examples/sql/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ export SPARK_EXECUTOR_CORES=$EXECUTOR_CPU
export SPARK_DRIVER_MEMORY=$DRIVER_MEM
export SPARK_EXECUTOR_MEMORY=$EXECUTOR_MEM

BLOCK_SIZE=${BLOCK_SIZE:-128}
LOGGING=(
--conf spark.eventLog.enabled=true
--conf spark.eventLog.dir=file:///spark-logs
)

SPARK_HADOOP_S3A_CONFIG=(
# Required
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
Expand All @@ -44,19 +50,17 @@ SPARK_HADOOP_S3A_CONFIG=(
--conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT}
--conf spark.hadoop.fs.s3a.path.style.access=true
--conf spark.hadoop.fs.s3a.fast.upload=true
--conf spark.hadoop.fs.s3a.block.size=$((128*1024*1024))
--conf spark.hadoop.fs.s3a.block.size=$(($BLOCK_SIZE * 1024 * 1024))
--conf spark.hadoop.fs.s3a.fast.upload.buffer=array
)

SPARK_S3_SHUFFLE_CONFIG=(
--conf spark.hadoop.fs.s3a.access.key=${S3A_ACCESS_KEY}
--conf spark.hadoop.fs.s3a.secret.key=${S3A_SECRET_KEY}
--conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT}
--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.S3ShuffleManager"
--conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO
--conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED}
--conf spark.shuffle.s3.rootDir=${SHUFFLE_DESTINATION}
--conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH}
--conf spark.storage.decommission.fallbackStorage.path=${SHUFFLE_DESTINATION}
# --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH}
# --conf spark.storage.decommission.fallbackStorage.path=${SHUFFLE_DESTINATION}
)

if (( "$USE_S3_SHUFFLE" == 0 )); then
Expand All @@ -83,8 +87,8 @@ if (( "$USE_NFS_SHUFFLE" == 1 )); then
--conf spark.kubernetes.executor.podTemplateFile=${SCRIPT_DIR}/../templates/executor_nfs.yml
--conf spark.kubernetes.driver.podTemplateFile=${SCRIPT_DIR}/../templates/driver_nfs.yml
--conf spark.hadoop.fs.file.block.size=$((128*1024*1024))
--conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH}
--conf spark.storage.decommission.fallbackStorage.path=file:///nfs/
# --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH}
# --conf spark.storage.decommission.fallbackStorage.path=file:///nfs/
)

SPARK_KUBERNETES_TEMPLATES=(
Expand All @@ -97,6 +101,15 @@ if [[ "${USE_FALLBACK_FETCH}" == "true" ]]; then
PROCESS_TAG="${PROCESS_TAG}-fallback"
fi

EXTRA_OPTIONS=(
)
if (( "${USE_TRANSFER_TO:-1}" == 0 )); then
PROCESS_TAG="${PROCESS_TAG}-transferTo_false"
EXTRA_OPTIONS=(
--conf spark.file.transferTo=false \
)
fi

USE_PROFILER=${USE_PROFILER:-0}
if (( "${USE_PROFILER}" == 1 )); then
PROFILER_CONFIG="reporter=com.uber.profiling.reporters.InfluxDBOutputReporter,configProvider=com.uber.profiling.YamlConfigProvider,configFile=/profiler_config.yml,metricInterval=5000,sampleInterval=5000,ioProfiling=true"
Expand All @@ -117,10 +130,12 @@ ${SPARK_HOME}/bin/spark-submit \
--conf "spark.driver.extraJavaOptions=${DRIVER_JAVA_OPTIONS}" \
--conf "spark.executor.extraJavaOptions=${EXECUTOR_JAVA_OPTIONS}" \
\
--name ce-${PROCESS_TAG}-${INSTANCES}x${EXECUTOR_CPU}--${EXECUTOR_MEM} \
--name ce-${PROCESS_TAG}-bs${BLOCK_SIZE}MiB-${INSTANCES}x${EXECUTOR_CPU}--${EXECUTOR_MEM} \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
--conf spark.kryoserializer.buffer=128mb \
--conf spark.executor.instances=$INSTANCES \
"${EXTRA_OPTIONS[@]}" \
"${LOGGING[@]}" \
"${SPARK_HADOOP_S3A_CONFIG[@]}" \
"${SPARK_S3_SHUFFLE_CONFIG[@]}" \
"${SPARK_KUBERNETES_TEMPLATES[@]}" \
Expand Down
12 changes: 10 additions & 2 deletions examples/templates/driver.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ spec:
image: will-be-overwritten
resources:
limits:
ephemeral-storage: 16G
ephemeral-storage: 32G
requests:
ephemeral-storage: 16G
ephemeral-storage: 32G
volumeMounts:
- name: spark-logs
mountPath: /spark-logs
volumes:
- name: spark-logs
nfs:
server: 10.40.1.32
path: /spark-logs
6 changes: 6 additions & 0 deletions examples/templates/driver_nfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ spec:
volumeMounts:
- name: nfs
mountPath: /nfs
- name: spark-logs
mountPath: /spark-logs
volumes:
- name: nfs
nfs:
server: 10.40.1.32
path: /shuffle
- name: spark-logs
nfs:
server: 10.40.1.32
path: /spark-logs
12 changes: 10 additions & 2 deletions examples/templates/executor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ spec:
image: will-be-overwritten
resources:
limits:
ephemeral-storage: 16G
ephemeral-storage: 32G
requests:
ephemeral-storage: 16G
ephemeral-storage: 32G
volumeMounts:
- name: spark-logs
mountPath: /spark-logs
volumes:
- name: spark-logs
nfs:
server: 10.40.1.32
path: /spark-logs
6 changes: 6 additions & 0 deletions examples/templates/executor_nfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ spec:
volumeMounts:
- name: nfs
mountPath: /nfs
- name: spark-logs
mountPath: /spark-logs
volumes:
- name: nfs
nfs:
server: 10.40.1.32
path: /shuffle
- name: spark-logs
nfs:
server: 10.40.1.32
path: /spark-logs
38 changes: 26 additions & 12 deletions examples/terasort/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ export SPARK_EXECUTOR_CORES=$EXECUTOR_CPU
export SPARK_DRIVER_MEMORY=$DRIVER_MEM
export SPARK_EXECUTOR_MEMORY=$EXECUTOR_MEM

LOGGING=(
--conf spark.eventLog.enabled=true
--conf spark.eventLog.dir=file:///spark-logs
)

BLOCK_SIZE=${BLOCK_SIZE:-128}

SPARK_HADOOP_S3A_CONFIG=(
# Required
Expand All @@ -45,20 +51,18 @@ SPARK_HADOOP_S3A_CONFIG=(
--conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT}
--conf spark.hadoop.fs.s3a.path.style.access=true
--conf spark.hadoop.fs.s3a.fast.upload=true
--conf spark.hadoop.fs.s3a.block.size=$((128*1024*1024))
--conf spark.hadoop.fs.s3a.block.size=$(($BLOCK_SIZE * 1024 * 1024))
--conf spark.hadoop.fs.s3a.fast.upload.buffer=array
)


SPARK_S3_SHUFFLE_CONFIG=(
--conf spark.hadoop.fs.s3a.access.key=${S3A_ACCESS_KEY}
--conf spark.hadoop.fs.s3a.secret.key=${S3A_SECRET_KEY}
--conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT}
--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.S3ShuffleManager"
--conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO
--conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED}
--conf spark.shuffle.s3.rootDir=${SHUFFLE_DESTINATION}
--conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH}
--conf spark.storage.decommission.fallbackStorage.path=${SHUFFLE_DESTINATION}
# --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH}
# --conf spark.storage.decommission.fallbackStorage.path=${SHUFFLE_DESTINATION}
)

if (( "$USE_S3_SHUFFLE" == 0 )); then
Expand All @@ -82,9 +86,10 @@ if (( "$USE_NFS_SHUFFLE" == 1 )); then
--conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO
--conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED}
--conf spark.shuffle.s3.rootDir=file:///nfs/
--conf spark.hadoop.fs.file.block.size=$((128*1024*1024))
--conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH}
--conf spark.storage.decommission.fallbackStorage.path=file:///nfs/
--conf spark.hadoop.fs.file.block.size=$(($BLOCK_SIZE*1024*1024))
# --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH}
# --conf spark.storage.decommission.fallbackStorage.path=file:///nfs/
# --conf spark.shuffle.s3.maxConcurrencyTask=5
)

SPARK_KUBERNETES_TEMPLATES=(
Expand All @@ -97,6 +102,15 @@ if [[ "${USE_FALLBACK_FETCH}" == "true" ]]; then
PROCESS_TAG="${PROCESS_TAG}-fallback"
fi

EXTRA_OPTIONS=(
)
if (( "${USE_TRANSFER_TO:-1}" == 0 )); then
PROCESS_TAG="${PROCESS_TAG}-transferTo_false"
EXTRA_OPTIONS=(
--conf spark.file.transferTo=false \
)
fi

USE_PROFILER=${USE_PROFILER:-0}
if (( "${USE_PROFILER}" == 1 )); then
PROFILER_CONFIG="reporter=com.uber.profiling.reporters.InfluxDBOutputReporter,configProvider=com.uber.profiling.YamlConfigProvider,configFile=/profiler_config.yml,metricInterval=5000,sampleInterval=5000,ioProfiling=true"
Expand All @@ -117,10 +131,11 @@ ${SPARK_HOME}/bin/spark-submit \
--conf "spark.driver.extraJavaOptions=${DRIVER_JAVA_OPTIONS}" \
--conf "spark.executor.extraJavaOptions=${EXECUTOR_JAVA_OPTIONS}" \
\
--name ce-terasort-${SIZE}${PROCESS_TAG}-${INSTANCES}x${EXECUTOR_CPU}--${EXECUTOR_MEM} \
--name ce-terasort-${SIZE}${PROCESS_TAG}-bs${BLOCK_SIZE}MiB-${INSTANCES}x${EXECUTOR_CPU}--${EXECUTOR_MEM} \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
--conf spark.kryoserializer.buffer=128mb \
--conf spark.executor.instances=$INSTANCES \
"${EXTRA_OPTIONS[@]}" \
"${LOGGING[@]}" \
"${SPARK_HADOOP_S3A_CONFIG[@]}" \
"${SPARK_S3_SHUFFLE_CONFIG[@]}" \
"${SPARK_KUBERNETES_TEMPLATES[@]}" \
Expand Down Expand Up @@ -154,7 +169,6 @@ ${SPARK_HOME}/bin/spark-submit \
--conf spark.executor.instances=$INSTANCES \
--conf spark.jars.ivy=/tmp/.ivy \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer=128mb \
"${SPARK_HADOOP_S3A_CONFIG[@]}" \
--conf spark.ui.prometheus.enabled=true \
--conf spark.network.timeout=10000 \
Expand Down
Loading