Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark finished after the first stage and returned an empty dataframe #10

Open
i-Hun opened this issue Jan 19, 2023 · 1 comment
Open

Comments

@i-Hun
Copy link

i-Hun commented Jan 19, 2023

Thank for your work, but I wasn't able successfully run this plugin. The query finished after the first stage and returned an empty dataframe without any error.

Code to reproduce.

  1. I installed spark in a docker image python:3.8-bullseye with openjdk_version="17" like this
ARG scala_version="2.12"

ENV APACHE_SPARK_VERSION="3.3.0" \
    HADOOP_VERSION="3" \
    SPARK_HOME=/usr/local/spark \
    SPARK_OPTS="--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info" \
    PATH="${PATH}:${SPARK_HOME}/bin"

WORKDIR /tmp
RUN wget -q "https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
    tar xzf "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" -C /usr/local --owner root --group root --no-same-owner && \
    rm "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
    ln -s "/usr/local/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" $SPARK_HOME

WORKDIR /usr/local

# to read s3a
RUN wget -P "${SPARK_HOME}/jars" https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.2/hadoop-aws-3.2.2.jar && \
    wget -P "${SPARK_HOME}/jars" https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar

RUN wget -P "${SPARK_HOME}/jars" https://github.com/IBM/spark-s3-shuffle/releases/download/v0.5/spark-s3-shuffle_${scala_version}-${APACHE_SPARK_VERSION}_0.5.jar

# Add a link in the before_notebook hook in order to source automatically PYTHONPATH
RUN mkdir -p /usr/local/bin/before-notebook.d && \
    ln -s "${SPARK_HOME}/sbin/spark-config.sh" /usr/local/bin/before-notebook.d/spark-config.sh

# Fix Spark installation for Java 11 and Apache Arrow library
# see: https://github.com/apache/spark/pull/27356, https://spark.apache.org/docs/latest/#downloading
RUN cp -p "${SPARK_HOME}/conf/spark-defaults.conf.template" "${SPARK_HOME}/conf/spark-defaults.conf" && \
    echo $'\n\
spark.driver.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true\n\
spark.executor.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true\n\
spark.driver.memory 200g\n\
spark.kryoserializer.buffer.max 2047\n\
spark.sql.shuffle.partitions 300\n\
spark.sql.execution.arrow.pyspark.fallback.enabled true\n\
spark.driver.maxResultSize 120g' >> "${SPARK_HOME}/conf/spark-defaults.conf"

RUN pip install pyspark

Configured spark in python:

from os import getenv

# set env vars in jupyter notebook
%env AWS_ACCESS_KEY_ID={getenv('S3_SMM_TEST_ACCESS_ID')}
%env AWS_SECRET_ACCESS_KEY={getenv('S3_SMM_TEST_ACCESS_KEY')}
%env S3_ENDPOINT_URL={getenv('S3_ADVANCED_ENDPOINT')}
%env S3_SHUFFLE_ROOT=s3a://smm-test/personal/s3-shuffle

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("name").master('local[*]').config(
    "spark.sql.execution.arrow.pyspark.enabled", "false"
).config(
    "spark.shuffle.manager", "org.apache.spark.shuffle.S3ShuffleManager"
).config(
    "spark.shuffle.sort.io.plugin.class", "org.apache.spark.shuffle.S3ShuffleDataIO"
).config(
    "spark.shuffle.s3.rootDir", "s3a://smm-test/personal/s3-shuffle"
).config(
    "spark.dynamicAllocation.enabled", "true"
).config(
    "spark.dynamicAllocation.shuffleTracking.enabled", "true"
).config(
    "spark.fs.s3a.path.style.access", "true"
).config(
    "spark.fs.s3a.fast.upload", "true"
).config(
    "spark.driver.extraClassPath", f'{getenv("SPARK_HOME")}/jars/aws-java-sdk-bundle-1.11.375.jar,/opt/spark/jars/hadoop-aws-3.2.2.jar'
).config(
    "spark.executor.extraClassPath", f'{getenv("SPARK_HOME")}/jars/aws-java-sdk-bundle-1.11.375.jar,/opt/spark/jars/hadoop-aws-3.2.2.jar'
).config(
    "spark.shuffle.s3.sort.cloneRecords", "true"
).config(
    "spark.shuffle.s3.useBlockManager", "true"
).config(
    "spark.shuffle.s3.forceBatchFetch", "true"
).config(
    "spark.shuffle.s3.supportsUnbuffer", "true"
).config(
    'spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider'
).getOrCreate()
config_spark_s3 = {
    'access_id': getenv("S3_SMM_TEST_ACCESS_ID"),
    'access_key': getenv("S3_SMM_TEST_ACCESS_KEY"),
    'impl': 'org.apache.hadoop.fs.s3a.S3AFileSystem',
    'endpoint': getenv("S3_ADVANCED_ENDPOINT")
}
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", config_spark_s3["impl"])
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", config_spark_s3["endpoint"])
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", config_spark_s3["access_id"])
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config_spark_s3["access_key"])

and tried to execute a heavy query.

It has been ended in a couple of minutes and result was like
image

despite the fact that without spark-s3-shuffle it runs through many stages in a hour and returns a massive dataframe. The spark.shuffle.s3.rootDir was filled with a couple GBs of data but I would expected much more data.

Do you have any thoughts what can I do to make it work?
Thanks in advance!

@i-Hun i-Hun changed the title Spark finishing after first stage and returns empty dataframe Spark finished after first stage and returned an empty dataframe Jan 19, 2023
@i-Hun i-Hun changed the title Spark finished after first stage and returned an empty dataframe Spark finished after the first stage and returned an empty dataframe Jan 19, 2023
@pspoerri
Copy link
Contributor

Hi

Can you retry with spark-s3-shuffle version 0.7? The plugin has gotten a couple of improvements since version 0.5:

  • Improvements how Shuffle files are managed: The plugin now relies on S3ShuffleDataIO and does no longer try to serialize the shuffle data on its own.
  • Prefixing the shuffle files to increase the performance on S3.
  • Prefetching of multiple shuffle files.

Thank you
Pascal

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants